Source code for kfp.onprem

from typing import Dict, Optional
from kfp import dsl


[docs]def mount_pvc(pvc_name='pipeline-claim', volume_name='pipeline', volume_mount_path='/mnt/pipeline'): """Modifier function to apply to a Container Op to simplify volume, volume mount addition and enable better reuse of volumes, volume claims across container ops. Example: :: train = train_op(...) train.apply(mount_pvc('claim-name', 'pipeline', '/mnt/pipeline')) """ def _mount_pvc(task): from kubernetes import client as k8s_client # there can be other ops in a pipeline (e.g. ResourceOp, VolumeOp) # refer to #3906 if not hasattr(task, "add_volume") or not hasattr( task, "add_volume_mount"): return task local_pvc = k8s_client.V1PersistentVolumeClaimVolumeSource( claim_name=pvc_name) return (task.add_volume( k8s_client.V1Volume( name=volume_name, persistent_volume_claim=local_pvc)).add_volume_mount( k8s_client.V1VolumeMount( mount_path=volume_mount_path, name=volume_name))) return _mount_pvc
[docs]def use_k8s_secret( secret_name: str = 'k8s-secret', k8s_secret_key_to_env: Optional[Dict] = None, ): """An operator that configures the container to use k8s credentials. k8s_secret_key_to_env specifies a mapping from the name of the keys in the k8s secret to the name of the environment variables where the values will be added. The secret needs to be deployed manually a priori. Example: :: train = train_op(...) train.apply(use_k8s_secret(secret_name='s3-secret', k8s_secret_key_to_env={'secret_key': 'AWS_SECRET_ACCESS_KEY'})) This will load the value in secret 's3-secret' at key 'secret_key' and source it as the environment variable 'AWS_SECRET_ACCESS_KEY'. I.e. it will produce the following section on the pod: env: - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: s3-secret key: secret_key """ k8s_secret_key_to_env = k8s_secret_key_to_env or {} def _use_k8s_secret(task): from kubernetes import client as k8s_client for secret_key, env_var in k8s_secret_key_to_env.items(): task.container \ .add_env_variable( k8s_client.V1EnvVar( name=env_var, value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name=secret_name, key=secret_key ) ) ) ) return task return _use_k8s_secret
[docs]def add_default_resource_spec( memory_limit: Optional[str] = None, cpu_limit: Optional[str] = None, memory_request: Optional[str] = None, cpu_request: Optional[str] = None, ): """Add default resource requests and limits. For resource units, refer to https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes. Args: memory_limit: optional, memory limit. Format can be 512Mi, 2Gi etc. cpu_limit: optional, cpu limit. Format can be 0.5, 500m etc. memory_request: optional, defaults to memory limit. cpu_request: optional, defaults to cpu limit. """ if not memory_request: memory_request = memory_limit if not cpu_request: cpu_request = cpu_limit def _add_default_resource_spec(task): # Skip tasks which are not container ops. if not isinstance(task, dsl.ContainerOp): return task _apply_default_resource(task, 'cpu', cpu_request, cpu_limit) _apply_default_resource(task, 'memory', memory_request, memory_limit) return task return _add_default_resource_spec
def _apply_default_resource(task: dsl.ContainerOp, resource_name: str, default_request: Optional[str], default_limit: Optional[str]): if task.container.get_resource_limit(resource_name): # Do nothing. # Limit is set, request will default to limit if not set (Kubernetes default behavior), # so we do not need to further apply defaults. return if default_limit: task.container.add_resource_limit(resource_name, default_limit) if default_request: if not task.container.get_resource_request(resource_name): task.container.add_resource_request(resource_name, default_request)