Source code for kfp.gcp

# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Extension module for KFP on GCP deployment."""

from kubernetes.client import V1Toleration, V1Affinity, V1NodeAffinity, \
  V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement, V1PreferredSchedulingTerm


[docs]def use_gcp_secret(secret_name='user-gcp-sa', secret_file_path_in_volume=None, volume_name=None, secret_volume_mount_path='/secret/gcp-credentials'): """An operator that configures the container to use GCP service account by service account key stored in a Kubernetes secret. For cluster setup and alternatives to using service account key, check https://www.kubeflow.org/docs/gke/authentication-pipelines/. """ # permitted values for secret_name = ['admin-gcp-sa', 'user-gcp-sa'] if secret_file_path_in_volume is None: secret_file_path_in_volume = '/' + secret_name + '.json' if volume_name is None: volume_name = 'gcp-credentials-' + secret_name else: import warnings warnings.warn( 'The volume_name parameter is deprecated and will be removed in next release. The volume names are now generated automatically.', DeprecationWarning) def _use_gcp_secret(task): from kubernetes import client as k8s_client task = task.add_volume( k8s_client.V1Volume( name=volume_name, secret=k8s_client.V1SecretVolumeSource( secret_name=secret_name,))) task.container \ .add_volume_mount( k8s_client.V1VolumeMount( name=volume_name, mount_path=secret_volume_mount_path, ) ) \ .add_env_variable( k8s_client.V1EnvVar( name='GOOGLE_APPLICATION_CREDENTIALS', value=secret_volume_mount_path + secret_file_path_in_volume, ) ) \ .add_env_variable( k8s_client.V1EnvVar( name='CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE', value=secret_volume_mount_path + secret_file_path_in_volume, ) ) # Set GCloud Credentials by using the env var override. # TODO: Is there a better way for GCloud to pick up the credential? return task return _use_gcp_secret
[docs]def use_tpu(tpu_cores: int, tpu_resource: str, tf_version: str): """An operator that configures GCP TPU spec in a container op. Args: tpu_cores: Required. The number of cores of TPU resource. For example, the value can be '8', '32', '128', etc. Check more details at: https://cloud.google.com/tpu/docs/kubernetes-engine-setup#pod-spec. tpu_resource: Required. The resource name of the TPU resource. For example, the value can be 'v2', 'preemptible-v1', 'v3' or 'preemptible-v3'. Check more details at: https://cloud.google.com/tpu/docs/kubernetes-engine-setup#pod-spec. tf_version: Required. The TensorFlow version that the TPU nodes use. For example, the value can be '1.12', '1.11', '1.9' or '1.8'. Check more details at: https://cloud.google.com/tpu/docs/supported-versions. """ def _set_tpu_spec(task): task.add_pod_annotation('tf-version.cloud-tpus.google.com', tf_version) task.container.add_resource_limit( 'cloud-tpus.google.com/{}'.format(tpu_resource), str(tpu_cores)) return task return _set_tpu_spec
[docs]def use_preemptible_nodepool(toleration: V1Toleration = V1Toleration( effect='NoSchedule', key='preemptible', operator='Equal', value='true'), hard_constraint: bool = False): """An operator that configures the GKE preemptible in a container op. Args: toleration: toleration to pods, default is the preemptible label. hard_constraint: the constraint of scheduling the pods on preemptible nodepools is hard. (Default: False) """ def _set_preemptible(task): task.add_toleration(toleration) node_selector_term = V1NodeSelectorTerm(match_expressions=[ V1NodeSelectorRequirement( key='cloud.google.com/gke-preemptible', operator='In', values=['true']) ]) if hard_constraint: node_affinity = V1NodeAffinity( required_during_scheduling_ignored_during_execution=V1NodeSelector( node_selector_terms=[node_selector_term])) else: node_affinity = V1NodeAffinity( preferred_during_scheduling_ignored_during_execution=[ V1PreferredSchedulingTerm( preference=node_selector_term, weight=50) ]) affinity = V1Affinity(node_affinity=node_affinity) task.add_affinity(affinity=affinity) return task return _set_preemptible
[docs]def add_gpu_toleration(toleration: V1Toleration = V1Toleration( effect='NoSchedule', key='nvidia.com/gpu', operator='Equal', value='true')): """An operator that configures the GKE GPU nodes in a container op. Args: toleration: toleration to pods, default is the nvidia.com/gpu label. """ def _set_toleration(task): task.add_toleration(toleration) return _set_toleration