kfp.dsl

The kfp.dsl module contains domain-specific language objects used to compose pipelines.

kfp.dsl.Input

Type generic used to represent an input artifact of type T, where T is an artifact class.

Use Input[Artifact] or Output[Artifact] to indicate whether the enclosed artifact is a component input or output.

Parameters

T – The type of the input artifact.

Example

@dsl.component
def artifact_producer(model: Output[Artifact]):
    with open(model.path, 'w') as f:
        f.write('my model')

@dsl.component
def artifact_consumer(model: Input[Artifact]):
    print(model)

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    producer_task = artifact_producer()
    artifact_consumer(model=producer_task.output)

alias of T[T]

kfp.dsl.Output

A type generic used to represent an output artifact of type T, where T is an artifact class. The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).

Use Input[Artifact] or Output[Artifact] to indicate whether the enclosed artifact is a component input or output.

Parameters

T – The type of the output artifact.

Example

@dsl.component
def artifact_producer(model: Output[Artifact]):
    with open(model.path, 'w') as f:
        f.write('my model')

@dsl.component
def artifact_consumer(model: Input[Artifact]):
    print(model)

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    producer_task = artifact_producer()
    artifact_consumer(model=producer_task.output)

alias of T[T]

class kfp.dsl.Artifact(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: object

Represents a generic machine learning artifact.

This class and all artifact classes store the name, uri, and metadata for a machine learning artifact. Use this artifact type when an artifact does not fit into another more specific artifact type (e.g., Model, Dataset).

Parameters
  • name – Name of the artifact.

  • uri – The artifact’s location on disk or cloud storage.

  • metadata – Arbitrary key-value pairs about the artifact.

Example

from kfp import dsl
from kfp.dsl import Output, Artifact, Input


@dsl.component
def create_artifact(
    data: str,
    output_artifact: Output[Artifact],
):
    with open(output_artifact.path, 'w') as f:
        f.write(data)


@dsl.component
def use_artifact(input_artifact: Input[Artifact]):
    with open(input_artifact.path) as input_file:
        artifact_contents = input_file.read()
        print(artifact_contents)


@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my/storage')
def my_pipeline():
    create_task = create_artifact(data='my data')
    use_artifact(input_artifact=create_task.outputs['output_artifact'])

Note: Other artifacts are used similarly to the usage of Artifact in the example above (within Input[] and Output[]).

TYPE_NAME = 'system.Artifact'
VERSION = '0.0.1'
property path: str
class kfp.dsl.ClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact for storing classification metrics.

Parameters
  • name – Name of the metrics artifact.

  • uri – The metrics artifact’s location on disk or cloud storage.

  • metadata – The key-value scalar metrics.

TYPE_NAME = 'system.ClassificationMetrics'
log_confusion_matrix(categories: List[str], matrix: List[List[int]]) None[source]

Logs a confusion matrix to metadata.

Parameters
  • categories – List of the category names.

  • matrix – Complete confusion matrix.

Raises

ValueError – If the length of categories does not match number of rows or columns of matrix.

log_confusion_matrix_cell(row_category: str, col_category: str, value: int) None[source]

Logs a cell in the confusion matrix to metadata.

Parameters
  • row_category – String representing the name of the row category.

  • col_category – String representing the name of the column category.

  • value – Value of the cell.

Raises

ValueError – If row_category or col_category is not in the list of categories set in set_categories.

log_confusion_matrix_row(row_category: str, row: List[float]) None[source]

Logs a confusion matrix row to metadata.

Parameters
  • row_category – Category to which the row belongs.

  • row – List of integers specifying the values for the row.

Raises

ValueError – If row_category is not in the list of categories set in set_categories call.

log_roc_curve(fpr: List[float], tpr: List[float], threshold: List[float]) None[source]

Logs an ROC curve to metadata.

Parameters
  • fpr – List of false positive rate values.

  • tpr – List of true positive rate values.

  • threshold – List of threshold values.

Raises

ValueError – If the lists fpr, tpr and threshold are not the same length.

log_roc_data_point(fpr: float, tpr: float, threshold: float) None[source]

Logs a single data point in the ROC curve to metadata.

Parameters
  • fpr – False positive rate value of the data point.

  • tpr – True positive rate value of the data point.

  • threshold – Threshold value for the data point.

set_confusion_matrix_categories(categories: List[str]) None[source]

Stores confusion matrix categories to metadata.

Parameters

categories – List of strings specifying the categories.

class kfp.dsl.Condition(condition: ConditionOperator, name: Optional[str] = None)[source]

Bases: TasksGroup

A class for creating conditional control flow within a pipeline definition.

Parameters
  • condition – The condition expression. Can be constructed using constants or outputs from upstream tasks.

  • name – The name of the condition group.

Example

with Condition(param1=='pizza', '[param1 is pizza]'):
    task1 = MyComponent1(...)
    task2 = MyComponent2(...)
class kfp.dsl.Dataset(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact representing a machine learning dataset.

Parameters
  • name – Name of the dataset.

  • uri – The dataset’s location on disk or cloud storage.

  • metadata – Arbitrary key-value pairs about the dataset.

TYPE_NAME = 'system.Dataset'
class kfp.dsl.ExitHandler(exit_task: PipelineTask, name: Optional[str] = None)[source]

Bases: TasksGroup

A class for setting an exit handler task that is invoked upon exiting a group of other tasks.

Parameters
  • exit_task – The task that is invoked after exiting a group of other tasks.

  • name – The name of the exit handler group.

Example

exit_task = ExitComponent(...)
with ExitHandler(exit_task):
    task1 = MyComponent1(...)
    task2 = MyComponent2(...)
class kfp.dsl.HTML(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact representing an HTML file.

Parameters
  • name – Name of the HTML file.

  • uri – The HTML file’s location on disk or cloud storage.

  • metadata – Arbitrary key-value pairs about the HTML file.

TYPE_NAME = 'system.HTML'
class kfp.dsl.InputPath(type=None)[source]

Bases: object

Type annotation used in component definitions for indicating a parameter is a path to an input.

Example

@dsl.component
def create_dataset(dataset_path: OutputPath('Dataset'),):
    import json
    dataset = {'my_dataset': [[1, 2, 3], [4, 5, 6]]}
    with open(dataset_path, 'w') as f:
        json.dump(dataset, f)


@dsl.component
def consume_dataset(dataset: InputPath('Dataset')):
    print(dataset)


@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket')
def my_pipeline():
    create_dataset_op = create_dataset()
    consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])
class kfp.dsl.Markdown(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact representing a markdown file.

Parameters
  • name – Name of the markdown file.

  • uri – The markdown file’s location on disk or cloud storage.

  • metadata – Arbitrary key-value pairs about the markdown file.

TYPE_NAME = 'system.Markdown'
class kfp.dsl.Metrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact for storing key-value scalar metrics.

Parameters
  • name – Name of the metrics artifact.

  • uri – The metrics artifact’s location on disk or cloud storage.

  • metadata – Key-value scalar metrics.

TYPE_NAME = 'system.Metrics'
log_metric(metric: str, value: float) None[source]

Sets a custom scalar metric in the artifact’s metadata.

Parameters
  • metric – The metric key.

  • value – The metric value.

class kfp.dsl.Model(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact representing a machine learning model.

Parameters
  • name – Name of the model.

  • uri – The model’s location on disk or cloud storage.

  • metadata – Arbitrary key-value pairs about the model.

TYPE_NAME = 'system.Model'
property framework: str
class kfp.dsl.OutputPath(type=None)[source]

Bases: object

Type annotation used in component definitions for indicating a parameter is a path to an output. The path parameter typed with this annotation can be treated as a locally accessible filepath within the component body.

The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).

Parameters

type – The type of the value written to the output path.

Example

@dsl.component
def create_parameter(
        message: str,
        output_parameter_path: OutputPath(str),
):
    with open(output_parameter_path, 'w') as f:
        f.write(message)


@dsl.component
def consume_parameter(message: str):
    print(message)


@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket')
def my_pipeline(message: str = 'default message'):
    create_param_op = create_parameter(message=message)
    consume_parameter(message=create_param_op.outputs['output_parameter_path'])
kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}'

A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    print_op(
        msg='Job ID:',
        value=dsl.PIPELINE_JOB_ID_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}'

A placeholder used to obtain a pipeline job name within a task at pipeline runtime.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    print_op(
        msg='Job name:',
        value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}'

A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    print_op(
        msg='Job resource name:',
        value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}'

A placeholder used to obtain a task ID within a task at pipeline runtime.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    print_op(
        msg='Task ID:',
        value=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}'

A placeholder used to obtain a task name within a task at pipeline runtime.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    print_op(
        msg='Task name:',
        value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER,
    )
class kfp.dsl.ParallelFor(items: Union[List[Union[int, float, str, Dict[str, Any]]], PipelineChannel], name: Optional[str] = None)[source]

Bases: TasksGroup

A class for creating parallelized for loop control flow over a static set of items within a pipeline definition.

Parameters
  • items – The items to loop over. It can be either a constant Python list or a list output from an upstream task.

  • name – The name of the for loop group.

Example

with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
    task1 = MyComponent(..., item.a)
    task2 = MyComponent(..., item.b)

In the example, task1 would be executed twice, once with case args=['echo 1'] and once with case args=['echo 2'].

class kfp.dsl.PipelineTask(component_spec: ComponentSpec, args: Mapping[str, Any])[source]

Bases: object

Represents a pipeline task (instantiated component).

Note: PipelineTask should not be constructed by pipeline authors directly, but instead obtained via an instantiated component (see example).

Replaces ContainerOp from kfp v1. Holds operations available on a task object, such as .after(), .set_memory_limit(), .enable_caching(), etc.

Parameters
  • component_spec – The component definition.

  • args – The dictionary of arguments on which the component was called to instantiate this task.

Example

@dsl.component
def identity(message: str) -> str:
    return message

@dsl.pipeline(name='my_pipeline')
def my_pipeline():
    # task is an instance of PipelineTask
    task = identity(message='my string')
add_node_selector_constraint(accelerator: str) PipelineTask[source]

Sets accelerator type to use when executing this task.

Parameters

value – The name of the accelerator. Available values include 'NVIDIA_TESLA_K80' and 'TPU_V3'.

Returns

Self return to allow chained setting calls.

after(*tasks) PipelineTask[source]

Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion.

Parameters

*tasks – Tasks after which this task should be executed.

Returns

Self return to allow chained setting calls.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    task1 = my_component(text='1st task')
    task2 = my_component(text='2nd task').after(task1)
property dependent_tasks: List[str]

A list of the dependent task names.

property inputs: List[Union[str, int, float, bool, dict, list, PipelineChannel]]

The list of actual inputs passed to the task.

property name: str

The name of the task.

Unique within its parent group.

property output: PipelineChannel

The single output of the task.

Used when a task has exactly one output parameter.

property outputs: Mapping[str, PipelineChannel]

The dictionary of outputs of the task.

Used when a task has more the one output or uses an OutputPath or Output[Artifact] type annotation.

set_caching_options(enable_caching: bool) PipelineTask[source]

Sets caching options for the task.

Parameters

enable_caching – Whether to enable caching.

Returns

Self return to allow chained setting calls.

set_cpu_limit(cpu: str) PipelineTask[source]

Sets CPU limit (maximum) for the task.

Parameters

cpu – Maximum CPU requests allowed. This string should be a number or a number followed by an “m” to indicate millicores (1/1000). For more information, see Specify a CPU Request and a CPU Limit.

Returns

Self return to allow chained setting calls.

set_display_name(name: str) PipelineTask[source]

Sets display name for the task.

Parameters

name – Display name.

Returns

Self return to allow chained setting calls.

set_env_variable(name: str, value: str) PipelineTask[source]

Sets environment variable for the task.

Parameters
  • name – Environment variable name.

  • value – Environment variable value.

Returns

Self return to allow chained setting calls.

set_gpu_limit(gpu: str) PipelineTask[source]

Sets GPU limit (maximum) for the task.

Parameters

gpu – The maximum GPU reuqests allowed. This string should be a positive integer number of GPUs.

Returns

Self return to allow chained setting calls.

set_memory_limit(memory: str) PipelineTask[source]

Sets memory limit (maximum) for the task.

Parameters

memory – The maximum memory requests allowed. This string should be a number or a number followed by one of “E”, “Ei”, “P”, “Pi”, “T”, “Ti”, “G”, “Gi”, “M”, “Mi”, “K”, or “Ki”.

Returns

Self return to allow chained setting calls.

set_retry(num_retries: int, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None) PipelineTask[source]

Sets task retry parameters.

Parameters
  • num_retries – Number of times to retry on failure.

  • backoff_duration – Number of seconds to wait before triggering a retry. Defaults to '0s' (immediate retry).

  • backoff_factor – Exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then again after 120, 240, and so on. Defaults to 2.0.

  • backoff_max_duration – Maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to '3600s'.

Returns

Self return to allow chained setting calls.

class kfp.dsl.PipelineTaskFinalStatus(state: str, pipeline_job_resource_name: str, pipeline_task_name: str, error_code: Optional[int], error_message: Optional[str])[source]

Bases: object

A final status of a pipeline task. Annotate a component parameter with this class to obtain a handle to a task’s status (see example).

This is the Python representation of the proto message PipelineTaskFinalStatus.

Examples

@dsl.component
def task_status(user_input: str, status: PipelineTaskFinalStatus):
    print('Pipeline status: ', status.state)
    print('Job resource name: ', status.pipeline_job_resource_name)
    print('Pipeline task name: ', status.pipeline_task_name)
    print('Error code: ', status.error_code)
    print('Error message: ', status.error_message)

@dsl.pipeline(name='my_pipeline')
def my_pipeline():
    task = task_status(user_input='my_input')
error_code: Optional[int]

The google.rpc.Code in case of error. If state is 'SUCCEEDED', this is None.

error_message: Optional[str]

In case of error, the detailed error message. If state is 'SUCCEEDED', this is None.

pipeline_job_resource_name: str

Pipeline job resource name, in the format of projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}.

pipeline_task_name: str

Name of the task that produced this status.

state: str

Final state of the task. The value could be one of 'SUCCEEDED', 'FAILED' or 'CANCELLED'.

class kfp.dsl.SlicedClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: Artifact

An artifact for storing sliced classification metrics.

Similar to ClassificationMetrics, tasks using this class are expected to use log methods of the class to log metrics with the difference being each log method takes a slice to associate the ClassificationMetrics.

Parameters
  • name – Name of the metrics artifact.

  • uri – The metrics artifact’s location on disk or cloud storage.

  • metadata – Arbitrary key-value pairs about the metrics artifact.

TYPE_NAME = 'system.SlicedClassificationMetrics'
load_confusion_matrix(slice: str, categories: List[str], matrix: List[List[int]]) None[source]

Bulk loads the whole confusion matrix for a slice.

Parameters
  • slice – String representing slice label.

  • categories – List of the category names.

  • matrix – Complete confusion matrix.

load_roc_readings(slice: str, readings: List[List[float]]) None[source]

Bulk loads ROC curve readings for a slice.

Parameters
  • slice – String representing slice label.

  • readings – A 2-dimensional list providing ROC curve data points. The expected order of the data points is: threshold, true positive rate, false positive rate.

log_confusion_matrix_cell(slice: str, row_category: str, col_category: str, value: int) None[source]

Logs a confusion matrix cell for a slice to metadata.

Cell is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
  • slice – String representing slice label.

  • row_category – String representing the name of the row category.

  • col_category – String representing the name of the column category.

  • value – Value of the cell.

log_confusion_matrix_row(slice: str, row_category: str, row: List[int]) None[source]

Logs a confusion matrix row for a slice to metadata.

Row is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
  • slice – String representing slice label.

  • row_category – Category to which the row belongs.

  • row – List of integers specifying the values for the row.

log_roc_reading(slice: str, threshold: float, tpr: float, fpr: float) None[source]

Logs a single data point in the ROC curve of a slice to metadata.

Parameters
  • slice – String representing slice label.

  • threshold – Thresold value for the data point.

  • tpr – True positive rate value of the data point.

  • fpr – False positive rate value of the data point.

set_confusion_matrix_categories(slice: str, categories: List[str]) None[source]

Logs confusion matrix categories for a slice to metadata.

Categories are stored in the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
  • slice – String representing slice label.

  • categories – List of strings specifying the categories.

kfp.dsl.component(func: Optional[Callable] = None, *, base_image: Optional[str] = None, target_image: Optional[str] = None, packages_to_install: Optional[List[str]] = None, pip_index_urls: Optional[List[str]] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None)[source]

Decorator for Python-function based components.

A KFP component can either be a lightweight component or a containerized component.

If target_image is not specified, this function creates a lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. In lightweight components, packages_to_install will be used to install dependencies at runtime. The parameters install_kfp_package and kfp_package_path can be used to control how and from where KFP should be installed when the lightweight component is executed.

If target_image is specified, this function creates a component definition based around the target_image. The assumption is that the function in func will be packaged by KFP into this target_image. You can use the KFP CLI’s build command to package the function into target_image.

Parameters
  • func – Python function from which to create a component. The function should have type annotations for all its arguments, indicating how each argument is intended to be used (e.g. as an input/output artifact, a plain parameter, or a path to a file).

  • base_image – Image to use when executing the Python function. It should contain a default Python interpreter that is compatible with KFP.

  • target_image – Image to when creating containerized components.

  • packages_to_install – List of packages to install before executing the Python function. These will always be installed at component runtime.

  • pip_index_urls – Python Package Index base URLs from which to install packages_to_install. Defaults to installing from only PyPI ('https://pypi.org/simple'). For more information, see pip install docs.

  • output_component_file

    If specified, this function will write a shareable/loadable version of the component spec into this file.

    Warning: This compilation approach is deprecated.

  • install_kfp_package – Specifies if the KFP SDK should add the kfp Python package to packages_to_install. Lightweight Python functions always require an installation of KFP in base_image to work. If you specify a base_image that already contains KFP, you can set this to False. This flag is ignored when target_image is specified, which implies a choice to build a containerized component. Containerized components will always install KFP as part of the build process.

  • kfp_package_path – Specifies the location from which to install KFP. By default, this will try to install from PyPI using the same version as that used when this component was created. Component authors can choose to override this to point to a GitHub pull request or other pip-compatible package server.

Returns

A component task factory that can be used in pipeline definitions.

Example

from kfp import dsl

@dsl.component
def my_function_one(input: str, output: Output[Model]):
    ...

@dsl.component(
base_image='python:3.9',
output_component_file='my_function.yaml'
)
def my_function_two(input: Input[Mode])):
    ...

@dsl.pipeline(name='my-pipeline', pipeline_root='...')
def pipeline():
    my_function_one_task = my_function_one(input=...)
    my_function_two_task = my_function_two(input=my_function_one_task.outputs)
kfp.dsl.importer(artifact_uri: Union[PipelineParameterChannel, str], artifact_class: Type[Artifact], reimport: bool = False, metadata: Optional[Mapping[str, Any]] = None) PipelineTask[source]

Imports an existing artifact for use in a downstream component.

Parameters
  • artifact_uri – The URI of the artifact to import.

  • artifact_class – The artifact class being imported.

  • reimport – Whether to reimport the artifact.

  • metadata – Properties of the artifact.

Returns

A task with the artifact accessible via its .output attribute.

Examples:

@dsl.pipeline(name='pipeline-with-importer')
def pipeline_with_importer():

    importer1 = importer(
        artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
        artifact_class=Dataset,
        reimport=False)
    train(dataset=importer1.output)
kfp.dsl.pipeline(name: Optional[str] = None, description: Optional[str] = None, pipeline_root: Optional[str] = None) Callable[source]

Decorator used to construct a pipeline.

Example
@pipeline(
  name='my-pipeline',
  description='My ML Pipeline.'
  pipeline_root='gs://my-bucket/my-output-path'
)
def my_pipeline(a: str, b: int):
  ...
Parameters
  • name – The pipeline name. Defaults to a sanitized version of the decorated function name.

  • description – A human-readable description of the pipeline.

  • pipeline_root – The root directory from which to read input and output parameters and artifacts.