kfp.dsl package

kfp.dsl.RUN_ID_PLACEHOLDER
kfp.dsl.EXECUTION_ID_PLACEHOLDER
class kfp.dsl.Condition(condition, name=None)[source]

Bases: kfp.dsl._ops_group.OpsGroup

Represents an condition group with a condition.

Example usage: ```python with Condition(param1==’pizza’, ‘[param1 is pizza]’):

op1 = ContainerOp(…) op2 = ContainerOp(…)

```

class kfp.dsl.ContainerOp(name: str, image: str, command: Union[str, List[str]] = None, arguments: Union[str, List[str]] = None, init_containers: List[kfp.dsl._container_op.UserContainer] = None, sidecars: List[kfp.dsl._container_op.Sidecar] = None, container_kwargs: Dict[KT, VT] = None, artifact_argument_paths: List[kfp.dsl._container_op.InputArgumentPath] = None, file_outputs: Dict[str, str] = None, output_artifact_paths: Dict[str, str] = None, is_exit_handler=False, pvolumes: Dict[str, kubernetes.client.models.v1_volume.V1Volume] = None)[source]

Bases: kfp.dsl._container_op.BaseOp

Represents an op implemented by a container image.

Example:

from kfp import dsl
from kubernetes.client.models import V1EnvVar, V1SecretKeySelector


@dsl.pipeline(
    name='foo',
    description='hello world')
def foo_pipeline(tag: str, pull_image_policy: str):

    # any attributes can be parameterized (both serialized string or actual PipelineParam)
    op = dsl.ContainerOp(name='foo',
                        image='busybox:%s' % tag,
                        # pass in init_container list
                        init_containers=[dsl.UserContainer('print', 'busybox:latest', command='echo "hello"')],
                        # pass in sidecars list
                        sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')],
                        # pass in k8s container kwargs
                        container_kwargs={'env': [V1EnvVar('foo', 'bar')]},
    )

    # set `imagePullPolicy` property for `container` with `PipelineParam`
    op.container.set_image_pull_policy(pull_image_policy)

    # add sidecar with parameterized image tag
    # sidecar follows the argo sidecar swagger spec
    op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always'))
add_pvolumes(pvolumes: Dict[str, kubernetes.client.models.v1_volume.V1Volume] = None)[source]

Updates the existing pvolumes dict, extends volumes and volume_mounts and redefines the pvolume attribute.

Parameters:pvolumes – Dictionary. Keys are mount paths, values are Kubernetes volumes or inherited types (e.g. PipelineVolumes).
arguments
command
container

Container object that represents the container property in io.argoproj.workflow.v1alpha1.Template. Can be used to update the container configurations.

Example

import kfp.dsl as dsl from kubernetes.client.models import V1EnvVar

@dsl.pipeline(name=’example_pipeline’) def immediate_value_pipeline():

op1 = (dsl.ContainerOp(name=’example’, image=’nginx:alpine’)
.container
.add_env_variable(V1EnvVar(name=’HOST’, value=’foo.bar’)) .add_env_variable(V1EnvVar(name=’PORT’, value=‘80’)) .parent # return the parent ContainerOp

)

env_variables
image
class kfp.dsl.ExitHandler(exit_op: kfp.dsl._container_op.ContainerOp)[source]

Bases: kfp.dsl._ops_group.OpsGroup

Represents an exit handler that is invoked upon exiting a group of ops.

Example usage: ```python exit_op = ContainerOp(…) with ExitHandler(exit_op):

op1 = ContainerOp(…) op2 = ContainerOp(…)

```

class kfp.dsl.InputArgumentPath(argument, input=None, path=None)[source]

Bases: object

class kfp.dsl.ParallelFor(loop_args: Union[List[Union[int, float, str, Dict[str, Any]]], kfp.dsl._pipeline_param.PipelineParam])[source]

Bases: kfp.dsl._ops_group.OpsGroup

Represents a parallel for loop over a static set of items.

Example usage: ```python with dsl.ParallelFor([{‘a’: 1, ‘b’: 10}, {‘a’: 2, ‘b’: 20}]) as item:

op1 = ContainerOp(…, args=[‘echo {}’.format(item.a)]) op2 = ContainerOp(…, args=[‘echo {}’.format(item.b])

``` and op1 would be executed twice, once with args=[‘echo 1’] and once with args=[‘echo 2’]

TYPE_NAME = 'for_loop'
class kfp.dsl.PipelineConf[source]

Bases: object

PipelineConf contains pipeline level settings

add_op_transformer(transformer)[source]

Configures the op_transformers which will be applied to all ops in the pipeline.

Parameters:transformer – a function that takes a ContainOp as input and returns a ContainerOp
set_default_pod_node_selector(label_name: str, value: str)[source]
Add a constraint for nodeSelector for a pipeline. Each constraint is a key-value pair label. For the
container to be eligible to run on a node, the node must have each of the constraints appeared as labels.
Parameters:
  • label_name – The name of the constraint label.
  • value – The value of the constraint label.
set_image_pull_policy(policy: str)[source]

Configures the default image pull policy

Parameters:
set_image_pull_secrets(image_pull_secrets)[source]

Configures the pipeline level imagepullsecret

Parameters:
  • image_pull_secrets – a list of Kubernetes V1LocalObjectReference
  • detailed description, check Kubernetes V1LocalObjectReference definition (For) –
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md
set_parallelism(max_num_pods: int)[source]

Configures the max number of total parallel pods that can execute at the same time in a workflow.

Parameters:max_num_pods (int) – max number of total parallel pods.
set_timeout(seconds: int)[source]

Configures the pipeline level timeout

Parameters:seconds – number of seconds for timeout
set_ttl_seconds_after_finished(seconds: int)[source]

Configures the ttl after the pipeline has finished.

Parameters:seconds – number of seconds for the workflow to be garbage collected after it is finished.
class kfp.dsl.PipelineParam(name: str, op_name: str = None, value: str = None, param_type: Union[str, Dict[KT, VT]] = None, pattern: str = None)[source]

Bases: object

Representing a future value that is passed between pipeline components.

A PipelineParam object can be used as a pipeline function argument so that it will be a pipeline parameter that shows up in ML Pipelines system UI. It can also represent an intermediate value passed between components.

full_name

Unique name in the argo yaml for the PipelineParam

ignore_type()[source]

ignore_type ignores the type information such that type checking would also pass

to_struct()[source]
class kfp.dsl.PipelineVolume(pvc: str = None, volume: kubernetes.client.models.v1_volume.V1Volume = None, **kwargs)[source]

Bases: kubernetes.client.models.v1_volume.V1Volume

Representing a volume that is passed between pipeline operators and is to be mounted by a ContainerOp or its inherited type.

A PipelineVolume object can be used as an extention of the pipeline function’s filesystem. It may then be passed between ContainerOps, exposing dependencies.

after(*ops)[source]

Creates a duplicate of self with the required dependecies excluding the redundant dependenices. :param *ops: Pipeline operators to add as dependencies

class kfp.dsl.ResourceOp(k8s_resource=None, action: str = 'create', merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, attribute_outputs: Dict[str, str] = None, **kwargs)[source]

Bases: kfp.dsl._container_op.BaseOp

Represents an op which will be translated into a resource template

delete()[source]

Returns a ResourceOp which deletes the resource.

resource

Resource object that represents the resource property in io.argoproj.workflow.v1alpha1.Template.

class kfp.dsl.Sidecar(name: str, image: str, command: Union[str, List[str]] = None, args: Union[str, List[str]] = None, mirror_volume_mounts: bool = None, **kwargs)[source]

Bases: kfp.dsl._container_op.UserContainer

class kfp.dsl.UserContainer(name: str, image: str, command: Union[str, List[str]] = None, args: Union[str, List[str]] = None, mirror_volume_mounts: bool = None, **kwargs)[source]

Bases: kfp.dsl._container_op.Container

Represents an argo workflow UserContainer (io.argoproj.workflow.v1alpha1.UserContainer) to be used in UserContainer property in argo’s workflow template (io.argoproj.workflow.v1alpha1.Template).

UserContainer inherits from Container class with an addition of mirror_volume_mounts attribute (mirrorVolumeMounts property).

See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json

Example

from kfp.dsl import ContainerOp, UserContainer

# creates a ContainerOp and adds a redis init container op = (ContainerOp(name=’foo-op’, image=’busybox:latest’)

.add_initContainer(
UserContainer(name=’redis’, image=’redis:alpine’)))
attribute_map = {'args': 'args', 'command': 'command', 'env': 'env', 'env_from': 'envFrom', 'image': 'image', 'image_pull_policy': 'imagePullPolicy', 'lifecycle': 'lifecycle', 'liveness_probe': 'livenessProbe', 'mirror_volume_mounts': 'mirrorVolumeMounts', 'name': 'name', 'ports': 'ports', 'readiness_probe': 'readinessProbe', 'resources': 'resources', 'security_context': 'securityContext', 'stdin': 'stdin', 'stdin_once': 'stdinOnce', 'termination_message_path': 'terminationMessagePath', 'termination_message_policy': 'terminationMessagePolicy', 'tty': 'tty', 'volume_devices': 'volumeDevices', 'volume_mounts': 'volumeMounts', 'working_dir': 'workingDir'}
inputs

A list of PipelineParam found in the UserContainer object.

openapi_types = {'args': 'list[str]', 'command': 'list[str]', 'env': 'list[V1EnvVar]', 'env_from': 'list[V1EnvFromSource]', 'image': 'str', 'image_pull_policy': 'str', 'lifecycle': 'V1Lifecycle', 'liveness_probe': 'V1Probe', 'mirror_volume_mounts': 'bool', 'name': 'str', 'ports': 'list[V1ContainerPort]', 'readiness_probe': 'V1Probe', 'resources': 'V1ResourceRequirements', 'security_context': 'V1SecurityContext', 'stdin': 'bool', 'stdin_once': 'bool', 'termination_message_path': 'str', 'termination_message_policy': 'str', 'tty': 'bool', 'volume_devices': 'list[V1VolumeDevice]', 'volume_mounts': 'list[V1VolumeMount]', 'working_dir': 'str'}
set_mirror_volume_mounts(mirror_volume_mounts=True)[source]

Setting mirrorVolumeMounts to true will mount the same volumes specified in the main container to the container (including artifacts), at the same mountPaths. This enables dind daemon to partially see the same filesystem as the main container in order to use features such as docker volume binding.

Parameters:mirror_volume_mounts – boolean flag
class kfp.dsl.VolumeOp(resource_name: str = None, size: str = None, storage_class: str = None, modes: List[str] = None, annotations: Dict[str, str] = None, data_source=None, volume_name=None, **kwargs)[source]

Bases: kfp.dsl._resource_op.ResourceOp

Represents an op which will be translated into a resource template which will be creating a PVC.

class kfp.dsl.VolumeSnapshotOp(resource_name: str = None, pvc: str = None, snapshot_class: str = None, annotations: Dict[str, str] = None, volume: kubernetes.client.models.v1_volume.V1Volume = None, **kwargs)[source]

Bases: kfp.dsl._resource_op.ResourceOp

Represents an op which will be translated into a resource template which will be creating a VolumeSnapshot.

At the time that this feature is written, VolumeSnapshots are an Alpha feature in Kubernetes. You should check with your Kubernetes Cluster admin if they have it enabled.

kfp.dsl.component(func)[source]

Decorator for component functions that returns a ContainerOp. This is useful to enable type checking in the DSL compiler

Usage: ```python @dsl.component def foobar(model: TFModel(), step: MLStep()):

return dsl.ContainerOp()
kfp.dsl.get_pipeline_conf()[source]

Configure the pipeline level setting to the current pipeline Note: call the function inside the user defined pipeline function.

kfp.dsl.graph_component(func)[source]

Decorator for graph component functions. This decorator returns an ops_group.

Usage: ```python # Warning: caching is tricky when recursion is involved. Please be careful and # set proper max_cache_staleness in case of infinite loop. import kfp.dsl as dsl @dsl.graph_component def flip_component(flip_result):

print_flip = PrintOp(flip_result) flipA = FlipCoinOp().after(print_flip) flipA.execution_options.caching_strategy.max_cache_staleness = “P0D” with dsl.Condition(flipA.output == ‘heads’):

flip_component(flipA.output)

return {‘flip_result’: flipA.output}

kfp.dsl.pipeline(name: str = None, description: str = None)[source]

Decorator of pipeline functions.

Usage: ```python @pipeline(

name=’my awesome pipeline’, description=’Is it really awesome?’

) def my_pipeline(a: PipelineParam, b: PipelineParam):

```

kfp.dsl.python_component(name, description=None, base_image=None, target_component_file: str = None)[source]
Decorator for Python component functions.

This decorator adds the metadata to the function object itself.

Args:
name: Human-readable name of the component description: Optional. Description of the component base_image: Optional. Docker container image to use as the base of the component. Needs to have Python 3.5+ installed. target_component_file: Optional. Local file to store the component definition. The file can then be used for sharing.
Returns:
The same function (with some metadata fields set).

Usage: ```python @dsl.python_component(

name=’my awesome component’, description=’Come, Let’s play’, base_image=’tensorflow/tensorflow:1.11.0-py3’,

) def my_component(a: str, b: int) -> str:

```

Deprecated since version 0.2.6: This decorator does not seem to be used, so we deprecate it. If you need this decorator, please create an issue at https://github.com/kubeflow/pipelines/issues