kfp.dsl

The kfp.dsl module contains domain-specific language objects used to compose pipelines.

Data:

Input

Type generic used to represent an input artifact of type T, where T is an artifact class.

Output

A type generic used to represent an output artifact of type T, where T is an artifact class.

PIPELINE_JOB_NAME_PLACEHOLDER

A placeholder used to obtain a pipeline job name within a task at pipeline runtime.

PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER

A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.

PIPELINE_JOB_ID_PLACEHOLDER

A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.

PIPELINE_TASK_NAME_PLACEHOLDER

A placeholder used to obtain a task name within a task at pipeline runtime.

PIPELINE_TASK_ID_PLACEHOLDER

A placeholder used to obtain a task ID within a task at pipeline runtime.

PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER

A placeholder used to obtain the path to the executor_output.json file within the task container.

PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER

A placeholder used to obtain executor input message passed to the task.

PIPELINE_ROOT_PLACEHOLDER

A placeholder used to obtain the pipeline root.

PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER

A placeholder used to obtain the time that a pipeline job was created.

PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER

A placeholder used to obtain the time for which a pipeline job is scheduled.

Classes:

InputPath([type])

Type annotation used in component definitions for indicating a parameter is a path to an input.

OutputPath([type])

Type annotation used in component definitions for indicating a parameter is a path to an output.

PipelineTaskFinalStatus(state, ...)

A final status of a pipeline task.

Artifact([name, uri, metadata])

Represents a generic machine learning artifact.

ClassificationMetrics([name, uri, metadata])

An artifact for storing classification metrics.

Dataset([name, uri, metadata])

An artifact representing a machine learning dataset.

HTML([name, uri, metadata])

An artifact representing an HTML file.

Markdown([name, uri, metadata])

An artifact representing a markdown file.

Metrics([name, uri, metadata])

An artifact for storing key-value scalar metrics.

Model([name, uri, metadata])

An artifact representing a machine learning model.

SlicedClassificationMetrics([name, uri, ...])

An artifact for storing sliced classification metrics.

ContainerSpec(image[, command, args])

Container definition.

Condition(condition[, name])

Deprecated.

If(condition[, name])

A class for creating a conditional control flow "if" block within a pipeline.

Elif(condition[, name])

A class for creating a conditional control flow "else if" block within a pipeline.

Else([name])

A class for creating a conditional control flow "else" block within a pipeline.

OneOf(*channels)

For collecting mutually exclusive outputs from conditional branches into a single pipeline channel.

ExitHandler(exit_task[, name])

A class for setting an exit handler task that is invoked upon exiting a group of other tasks.

ParallelFor(items[, name, parallelism])

A class for creating parallelized for loop control flow over a static set of items within a pipeline definition.

Collected(output)

For collecting into a list the output from a task in dsl.ParallelFor loops.

IfPresentPlaceholder(input_name, then[, else_])

Placeholder for handling cases where an input may or may not be passed.

ConcatPlaceholder(items)

Placeholder for concatenating multiple strings.

PipelineTask(component_spec, args[, ...])

Represents a pipeline task (instantiated component).

PipelineConfig()

PipelineConfig contains pipeline-level config options.

Functions:

get_uri([suffix])

Gets the task root URI, a unique object storage URI associated with the current task.

component([func, base_image, target_image, ...])

Decorator for Python-function based components.

container_component(func)

Decorator for container-based components in KFP v2.

pipeline([func, name, description, ...])

Decorator used to construct a pipeline.

importer(artifact_uri, artifact_class[, ...])

Imports an existing artifact for use in a downstream component.

kfp.dsl.Input(*args, **kwargs)

Type generic used to represent an input artifact of type T, where T is an artifact class.

Use Input[Artifact] or Output[Artifact] to indicate whether the enclosed artifact is a component input or output.

Parameters
T

The type of the input artifact.

Example

@dsl.component
def artifact_producer(model: Output[Artifact]):
    with open(model.path, 'w') as f:
        f.write('my model')

@dsl.component
def artifact_consumer(model: Input[Artifact]):
    print(model)

@dsl.pipeline
def my_pipeline():
    producer_task = artifact_producer()
    artifact_consumer(model=producer_task.output)

alias of T[T]

kfp.dsl.Output(*args, **kwargs)

A type generic used to represent an output artifact of type T, where T is an artifact class. The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).

Use Input[Artifact] or Output[Artifact] to indicate whether the enclosed artifact is a component input or output.

Parameters
T

The type of the output artifact.

Example

@dsl.component
def artifact_producer(model: Output[Artifact]):
    with open(model.path, 'w') as f:
        f.write('my model')

@dsl.component
def artifact_consumer(model: Input[Artifact]):
    print(model)

@dsl.pipeline
def my_pipeline():
    producer_task = artifact_producer()
    artifact_consumer(model=producer_task.output)

alias of T[T]

class kfp.dsl.InputPath(type=None)[source]

Bases: object

Type annotation used in component definitions for indicating a parameter is a path to an input.

Example

@dsl.component
def create_dataset(dataset_path: OutputPath('Dataset'),):
    import json
    dataset = {'my_dataset': [[1, 2, 3], [4, 5, 6]]}
    with open(dataset_path, 'w') as f:
        json.dump(dataset, f)


@dsl.component
def consume_dataset(dataset: InputPath('Dataset')):
    print(dataset)


@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket')
def my_pipeline():
    create_dataset_op = create_dataset()
    consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])
class kfp.dsl.OutputPath(type=None)[source]

Bases: object

Type annotation used in component definitions for indicating a parameter is a path to an output. The path parameter typed with this annotation can be treated as a locally accessible filepath within the component body.

The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).

Parameters
type=None

The type of the value written to the output path.

Example

@dsl.component
def create_parameter(
        message: str,
        output_parameter_path: OutputPath(str),
):
    with open(output_parameter_path, 'w') as f:
        f.write(message)


@dsl.component
def consume_parameter(message: str):
    print(message)


@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket')
def my_pipeline(message: str = 'default message'):
    create_param_op = create_parameter(message=message)
    consume_parameter(message=create_param_op.outputs['output_parameter_path'])
class kfp.dsl.PipelineTaskFinalStatus(state: str, pipeline_job_resource_name: str, pipeline_task_name: str, error_code: int | None, error_message: str | None)[source]

Bases: object

A final status of a pipeline task. Annotate a component parameter with this class to obtain a handle to a task’s status (see example).

This is the Python representation of the proto message PipelineTaskFinalStatus.

Examples

@dsl.component
def task_status(user_input: str, status: PipelineTaskFinalStatus):
    print('Pipeline status: ', status.state)
    print('Job resource name: ', status.pipeline_job_resource_name)
    print('Pipeline task name: ', status.pipeline_task_name)
    print('Error code: ', status.error_code)
    print('Error message: ', status.error_message)

@dsl.pipeline(name='my_pipeline')
def my_pipeline():
    task = task_status(user_input='my_input')

Attributes:

state

Final state of the task.

pipeline_job_resource_name

Pipeline job resource name, in the format of projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}.

pipeline_task_name

Name of the task that produced this status.

error_code

The google.rpc.Code in case of error.

error_message

In case of error, the detailed error message.

state : str

Final state of the task. The value could be one of 'SUCCEEDED', 'FAILED' or 'CANCELLED'.

pipeline_job_resource_name : str

Pipeline job resource name, in the format of projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}.

pipeline_task_name : str

Name of the task that produced this status.

error_code : int | None

The google.rpc.Code in case of error. If state is 'SUCCEEDED', this is None.

error_message : str | None

In case of error, the detailed error message. If state is 'SUCCEEDED', this is None.

class kfp.dsl.Artifact(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: object

Represents a generic machine learning artifact.

This class and all artifact classes store the name, uri, and metadata for a machine learning artifact. Use this artifact type when an artifact does not fit into another more specific artifact type (e.g., Model, Dataset).

Parameters
name: str | None = None

Name of the artifact.

uri: str | None = None

The artifact’s location on disk or cloud storage.

metadata: dict | None = None

Arbitrary key-value pairs about the artifact.

Example

from kfp import dsl
from kfp.dsl import Output, Artifact, Input


@dsl.component
def create_artifact(
    data: str,
    output_artifact: Output[Artifact],
):
    with open(output_artifact.path, 'w') as f:
        f.write(data)


@dsl.component
def use_artifact(input_artifact: Input[Artifact]):
    with open(input_artifact.path) as input_file:
        artifact_contents = input_file.read()
        print(artifact_contents)


@dsl.pipeline(name='my-pipeline', pipeline_root='gs://my/storage')
def my_pipeline():
    create_task = create_artifact(data='my data')
    use_artifact(input_artifact=create_task.outputs['output_artifact'])

Note: Other artifacts are used similarly to the usage of Artifact in the example above (within Input[] and Output[]).

Attributes:

schema_title

schema_version

path

schema_title = 'system.Artifact'
schema_version = '0.0.1'
property path : str
class kfp.dsl.ClassificationMetrics(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact for storing classification metrics.

Parameters
name: str | None = None

Name of the metrics artifact.

uri: str | None = None

The metrics artifact’s location on disk or cloud storage.

metadata: dict | None = None

The key-value scalar metrics.

Attributes:

schema_title

Methods:

log_roc_data_point(fpr, tpr, threshold)

Logs a single data point in the ROC curve to metadata.

log_roc_curve(fpr, tpr, threshold)

Logs an ROC curve to metadata.

set_confusion_matrix_categories(categories)

Stores confusion matrix categories to metadata.

log_confusion_matrix_row(row_category, row)

Logs a confusion matrix row to metadata.

log_confusion_matrix_cell(row_category, ...)

Logs a cell in the confusion matrix to metadata.

log_confusion_matrix(categories, matrix)

Logs a confusion matrix to metadata.

schema_title = 'system.ClassificationMetrics'
log_roc_data_point(fpr: float, tpr: float, threshold: float) None[source]

Logs a single data point in the ROC curve to metadata.

Parameters
fpr: float

False positive rate value of the data point.

tpr: float

True positive rate value of the data point.

threshold: float

Threshold value for the data point.

log_roc_curve(fpr: list[float], tpr: list[float], threshold: list[float]) None[source]

Logs an ROC curve to metadata.

Parameters
fpr: list[float]

List of false positive rate values.

tpr: list[float]

List of true positive rate values.

threshold: list[float]

List of threshold values.

Raises

ValueError – If the lists fpr, tpr and threshold are not the same length.

set_confusion_matrix_categories(categories: list[str]) None[source]

Stores confusion matrix categories to metadata.

Parameters
categories: list[str]

List of strings specifying the categories.

log_confusion_matrix_row(row_category: str, row: list[float]) None[source]

Logs a confusion matrix row to metadata.

Parameters
row_category: str

Category to which the row belongs.

row: list[float]

List of integers specifying the values for the row.

Raises

ValueError – If row_category is not in the list of categories set in set_categories call.

log_confusion_matrix_cell(row_category: str, col_category: str, value: int) None[source]

Logs a cell in the confusion matrix to metadata.

Parameters
row_category: str

String representing the name of the row category.

col_category: str

String representing the name of the column category.

value: int

Value of the cell.

Raises

ValueError – If row_category or col_category is not in the list of categories set in set_categories.

log_confusion_matrix(categories: list[str], matrix: list[list[int]]) None[source]

Logs a confusion matrix to metadata.

Parameters
categories: list[str]

List of the category names.

matrix: list[list[int]]

Complete confusion matrix.

Raises

ValueError – If the length of categories does not match number of rows or columns of matrix.

class kfp.dsl.Dataset(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact representing a machine learning dataset.

Parameters
name: str | None = None

Name of the dataset.

uri: str | None = None

The dataset’s location on disk or cloud storage.

metadata: dict | None = None

Arbitrary key-value pairs about the dataset.

Attributes:

schema_title

schema_title = 'system.Dataset'
class kfp.dsl.HTML(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact representing an HTML file.

Parameters
name: str | None = None

Name of the HTML file.

uri: str | None = None

The HTML file’s location on disk or cloud storage.

metadata: dict | None = None

Arbitrary key-value pairs about the HTML file.

Attributes:

schema_title

schema_title = 'system.HTML'
class kfp.dsl.Markdown(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact representing a markdown file.

Parameters
name: str | None = None

Name of the markdown file.

uri: str | None = None

The markdown file’s location on disk or cloud storage.

metadata: dict | None = None

Arbitrary key-value pairs about the markdown file.

Attributes:

schema_title

schema_title = 'system.Markdown'
class kfp.dsl.Metrics(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact for storing key-value scalar metrics.

Parameters
name: str | None = None

Name of the metrics artifact.

uri: str | None = None

The metrics artifact’s location on disk or cloud storage.

metadata: dict | None = None

Key-value scalar metrics.

Attributes:

schema_title

Methods:

log_metric(metric, value)

Sets a custom scalar metric in the artifact's metadata.

schema_title = 'system.Metrics'
log_metric(metric: str, value: float) None[source]

Sets a custom scalar metric in the artifact’s metadata.

Parameters
metric: str

The metric key.

value: float

The metric value.

class kfp.dsl.Model(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact representing a machine learning model.

Parameters
name: str | None = None

Name of the model.

uri: str | None = None

The model’s location on disk or cloud storage.

metadata: dict | None = None

Arbitrary key-value pairs about the model.

Attributes:

schema_title

framework

schema_title = 'system.Model'
property framework : str
class kfp.dsl.SlicedClassificationMetrics(name: str | None = None, uri: str | None = None, metadata: dict | None = None)[source]

Bases: Artifact

An artifact for storing sliced classification metrics.

Similar to ClassificationMetrics, tasks using this class are expected to use log methods of the class to log metrics with the difference being each log method takes a slice to associate the ClassificationMetrics.

Parameters
name: str | None = None

Name of the metrics artifact.

uri: str | None = None

The metrics artifact’s location on disk or cloud storage.

metadata: dict | None = None

Arbitrary key-value pairs about the metrics artifact.

Attributes:

schema_title

Methods:

log_roc_reading(slice, threshold, tpr, fpr)

Logs a single data point in the ROC curve of a slice to metadata.

load_roc_readings(slice, readings)

Bulk loads ROC curve readings for a slice.

set_confusion_matrix_categories(slice, ...)

Logs confusion matrix categories for a slice to metadata.

log_confusion_matrix_row(slice, ...)

Logs a confusion matrix row for a slice to metadata.

log_confusion_matrix_cell(slice, ...)

Logs a confusion matrix cell for a slice to metadata.

load_confusion_matrix(slice, categories, matrix)

Bulk loads the whole confusion matrix for a slice.

schema_title = 'system.SlicedClassificationMetrics'
log_roc_reading(slice: str, threshold: float, tpr: float, fpr: float) None[source]

Logs a single data point in the ROC curve of a slice to metadata.

Parameters
slice: str

String representing slice label.

threshold: float

Thresold value for the data point.

tpr: float

True positive rate value of the data point.

fpr: float

False positive rate value of the data point.

load_roc_readings(slice: str, readings: list[list[float]]) None[source]

Bulk loads ROC curve readings for a slice.

Parameters
slice: str

String representing slice label.

readings: list[list[float]]

A 2-dimensional list providing ROC curve data points. The expected order of the data points is: threshold, true positive rate, false positive rate.

set_confusion_matrix_categories(slice: str, categories: list[str]) None[source]

Logs confusion matrix categories for a slice to metadata.

Categories are stored in the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
slice: str

String representing slice label.

categories: list[str]

List of strings specifying the categories.

log_confusion_matrix_row(slice: str, row_category: str, row: list[int]) None[source]

Logs a confusion matrix row for a slice to metadata.

Row is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
slice: str

String representing slice label.

row_category: str

Category to which the row belongs.

row: list[int]

List of integers specifying the values for the row.

log_confusion_matrix_cell(slice: str, row_category: str, col_category: str, value: int) None[source]

Logs a confusion matrix cell for a slice to metadata.

Cell is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
slice: str

String representing slice label.

row_category: str

String representing the name of the row category.

col_category: str

String representing the name of the column category.

value: int

Value of the cell.

load_confusion_matrix(slice: str, categories: list[str], matrix: list[list[int]]) None[source]

Bulk loads the whole confusion matrix for a slice.

Parameters
slice: str

String representing slice label.

categories: list[str]

List of the category names.

matrix: list[list[int]]

Complete confusion matrix.

kfp.dsl.get_uri(suffix: str = 'Output') str[source]

Gets the task root URI, a unique object storage URI associated with the current task. This function may only be called at task runtime.

Returns an empty string if the task root cannot be inferred from the runtime environment.

Parameters
suffix: str = 'Output'

A suffix to append to the URI. This is a helpful for creating unique subdirectories when the component has multiple outputs.

Returns

The URI or empty string.

kfp.dsl.PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}'

A placeholder used to obtain a pipeline job name within a task at pipeline runtime.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Job name:',
        value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}'

A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Job resource name:',
        value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}'

A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Job ID:',
        value=dsl.PIPELINE_JOB_ID_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}'

A placeholder used to obtain a task name within a task at pipeline runtime.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Task name:',
        value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}'

A placeholder used to obtain a task ID within a task at pipeline runtime.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Task ID:',
        value=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER = '{{$.outputs.output_file}}'

A placeholder used to obtain the path to the executor_output.json file within the task container.

Example

@dsl.pipeline
def my_pipeline():
    create_artifact_with_metadata(
        metadata={'foo': 'bar'},
        executor_output_destination=dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER = '{{$}}'

A placeholder used to obtain executor input message passed to the task.

Example

@dsl.pipeline
def my_pipeline():
    custom_container_op(
        executor_input=dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
    )
kfp.dsl.PIPELINE_ROOT_PLACEHOLDER = '{{$.pipeline_root}}'

A placeholder used to obtain the pipeline root.

Example

@dsl.pipeline
def my_pipeline():
    store_model(
        tmp_dir=dsl.PIPELINE_ROOT_PLACEHOLDER+'/tmp',
    )
kfp.dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER = '{{$.pipeline_job_create_time_utc}}'

A placeholder used to obtain the time that a pipeline job was created.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Job created at:',
        value=dsl.PIPELINE_JOB_CREATE_TIME_UTC,
    )
kfp.dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER = '{{$.pipeline_job_schedule_time_utc}}'

A placeholder used to obtain the time for which a pipeline job is scheduled.

Example

@dsl.pipeline
def my_pipeline():
    print_op(
        msg='Job scheduled at:',
        value=dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC,
    )
kfp.dsl.component(func: Callable | None = None, *, base_image: str | None = None, target_image: str | None = None, packages_to_install: list[str] | None = None, pip_index_urls: list[str] | None = None, output_component_file: str | None = None, install_kfp_package: bool = True, kfp_package_path: str | None = None, pip_trusted_hosts: list[str] | None = None, use_venv: bool = False)[source]

Decorator for Python-function based components.

A KFP component can either be a lightweight component or a containerized component.

If target_image is not specified, this function creates a lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. In lightweight components, packages_to_install will be used to install dependencies at runtime. The parameters install_kfp_package and kfp_package_path can be used to control how and from where KFP should be installed when the lightweight component is executed.

If target_image is specified, this function creates a component definition based around the target_image. The assumption is that the function in func will be packaged by KFP into this target_image. You can use the KFP CLI’s build command to package the function into target_image.

Parameters
func: Callable | None = None

Python function from which to create a component. The function should have type annotations for all its arguments, indicating how each argument is intended to be used (e.g. as an input/output artifact, a plain parameter, or a path to a file).

base_image: str | None = None

Image to use when executing the Python function. It should contain a default Python interpreter that is compatible with KFP.

target_image: str | None = None

Image to when creating containerized components.

packages_to_install: list[str] | None = None

List of packages to install before executing the Python function. These will always be installed at component runtime.

pip_index_urls: list[str] | None = None

Python Package Index base URLs from which to install packages_to_install. Defaults to installing from only PyPI ('https://pypi.org/simple'). For more information, see pip install docs.

output_component_file: str | None = None

If specified, this function will write a shareable/loadable version of the component spec into this file.

Warning: This compilation approach is deprecated.

install_kfp_package: bool = True

Specifies if the KFP SDK should add the kfp Python package to packages_to_install. Lightweight Python functions always require an installation of KFP in base_image to work. If you specify a base_image that already contains KFP, you can set this to False. This flag is ignored when target_image is specified, which implies a choice to build a containerized component. Containerized components will always install KFP as part of the build process.

kfp_package_path: str | None = None

Specifies the location from which to install KFP. By default, this will try to install from PyPI using the same version as that used when this component was created. Component authors can choose to override this to point to a GitHub pull request or other pip-compatible package server.

use_venv: bool = False

Specifies if the component should be executed in a virtual environment. The environment will be created in a temporary directory and will inherit the system site packages. This is useful in restricted environments where most of the system is read-only.

Returns

A component task factory that can be used in pipeline definitions.

Example

from kfp import dsl

@dsl.component
def my_function_one(input: str, output: Output[Model]):
    ...

@dsl.component(
base_image='python:3.9',
output_component_file='my_function.yaml'
)
def my_function_two(input: Input[Mode])):
    ...

@dsl.pipeline(name='my-pipeline', pipeline_root='...')
def pipeline():
    my_function_one_task = my_function_one(input=...)
    my_function_two_task = my_function_two(input=my_function_one_task.outputs)
kfp.dsl.container_component(func: Callable) ContainerComponent[source]

Decorator for container-based components in KFP v2.

Parameters
func: Callable

The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file).

Example

from kfp.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output

@container_component
def my_component(
    dataset_path: InputPath(Dataset),
    model: Output[Model],
    num_epochs: int,
    output_parameter: OutputPath(str),
):
    return ContainerSpec(
        image='gcr.io/my-image',
        command=['sh', 'my_component.sh'],
        args=[
        '--dataset_path', dataset_path,
        '--model_path', model.path,
        '--output_parameter_path', output_parameter,
    ]
)
kfp.dsl.pipeline(func: Callable | None = None, *, name: str | None = None, description: str | None = None, pipeline_root: str | None = None, display_name: str | None = None, pipeline_config: PipelineConfig | None = None) Callable[source]

Decorator used to construct a pipeline.

Example
@pipeline(
  name='my-pipeline',
  description='My ML Pipeline.'
  pipeline_root='gs://my-bucket/my-output-path'
)
def my_pipeline(a: str, b: int):
  ...
Parameters
func: Callable | None = None

The Python function that defines a pipeline.

name: str | None = None

The pipeline name. Defaults to a sanitized version of the decorated function name.

description: str | None = None

A human-readable description of the pipeline.

pipeline_root: str | None = None

The root directory from which to read input and output parameters and artifacts.

display_name: str | None = None

A human-readable name for the pipeline.

pipeline_config: PipelineConfig | None = None

Pipeline-level config options.

kfp.dsl.importer(artifact_uri: PipelineParameterChannel | str, artifact_class: type[Artifact], reimport: bool = False, metadata: Mapping[str, Any] | None = None) PipelineTask[source]

Imports an existing artifact for use in a downstream component.

Parameters
artifact_uri: PipelineParameterChannel | str

The URI of the artifact to import.

artifact_class: type[Artifact]

The artifact class being imported.

reimport: bool = False

Whether to reimport the artifact.

metadata: Mapping[str, Any] | None = None

Properties of the artifact.

Returns

A task with the artifact accessible via its .output attribute.

Examples:

@dsl.pipeline(name='pipeline-with-importer')
def pipeline_with_importer():

    importer1 = importer(
        artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
        artifact_class=Dataset,
        reimport=False)
    train(dataset=importer1.output)
class kfp.dsl.ContainerSpec(image: str, command: list[str | Placeholder] | None = None, args: list[str | Placeholder] | None = None)[source]

Bases: object

Container definition.

This is only used for pipeline authors when constructing a containerized component using @container_component decorator.

Examples

@container_component
def container_with_artifact_output(
    num_epochs: int,  # built-in types are parsed as inputs
    model: Output[Model],
    model_config_path: OutputPath(str),
):
    return ContainerSpec(
        image='gcr.io/my-image',
        command=['sh', 'run.sh'],
        args=[
            '--epochs',
            num_epochs,
            '--model_path',
            model.uri,
            '--model_config_path',
            model_config_path,
        ])

Attributes:

image

Container image.

command

Container entrypoint.

args

Arguments to the container entrypoint.

image : str

Container image.

command : list[str | Placeholder] | None = None

Container entrypoint.

args : list[str | Placeholder] | None = None

Arguments to the container entrypoint.

class kfp.dsl.Condition(condition, name: str | None = None)[source]

Bases: If

Deprecated.

Use dsl.If instead.

class kfp.dsl.If(condition, name: str | None = None)[source]

Bases: _ConditionBase

A class for creating a conditional control flow “if” block within a pipeline.

Parameters
condition

A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter.

name: str | None = None

The name of the condition group.

Example

task1 = my_component1(...)
with dsl.If(task1.output=='pizza', 'pizza-condition'):
    task2 = my_component2(...)
class kfp.dsl.Elif(condition, name: str | None = None)[source]

Bases: _ConditionBase

A class for creating a conditional control flow “else if” block within a pipeline. Can be used following an upstream dsl.If or dsl.Elif.

Parameters
condition

A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter.

name: str | None = None

The name of the condition group.

Example

task1 = my_component1(...)
task2 = my_component2(...)
with dsl.If(task1.output=='pizza', 'pizza-condition'):
    task3 = my_component3(...)

with dsl.Elif(task2.output=='pasta', 'pasta-condition'):
    task4 = my_component4(...)
class kfp.dsl.Else(name: str | None = None)[source]

Bases: _ConditionBase

A class for creating a conditional control flow “else” block within a pipeline. Can be used following an upstream dsl.If or dsl.Elif.

Parameters
name: str | None = None

The name of the condition group.

Example

task1 = my_component1(...)
task2 = my_component2(...)
with dsl.If(task1.output=='pizza', 'pizza-condition'):
    task3 = my_component3(...)

with dsl.Elif(task2.output=='pasta', 'pasta-condition'):
    task4 = my_component4(...)

with dsl.Else():
    my_component5(...)
class kfp.dsl.OneOf(*channels: PipelineParameterChannel | PipelineArtifactChannel)[source]

Bases: object

For collecting mutually exclusive outputs from conditional branches into a single pipeline channel.

Parameters
*channels: PipelineParameterChannel | PipelineArtifactChannel

The channels to collect into a OneOf. Must be of the same type.

Example

@dsl.pipeline
def flip_coin_pipeline() -> str:
    flip_coin_task = flip_coin()
    with dsl.If(flip_coin_task.output == 'heads'):
        print_task_1 = print_and_return(text='Got heads!')
    with dsl.Else():
        print_task_2 = print_and_return(text='Got tails!')

    # use the output from the branch that gets executed
    oneof = dsl.OneOf(print_task_1.output, print_task_2.output)

    # consume it
    print_and_return(text=oneof)

    # return it
    return oneof
class kfp.dsl.ExitHandler(exit_task: PipelineTask, name: str | None = None)[source]

Bases: TasksGroup

A class for setting an exit handler task that is invoked upon exiting a group of other tasks.

Parameters
exit_task: PipelineTask

The task that is invoked after exiting a group of other tasks.

name: str | None = None

The name of the exit handler group.

Example

exit_task = ExitComponent(...)
with ExitHandler(exit_task):
    task1 = my_component1(...)
    task2 = my_component2(...)
class kfp.dsl.ParallelFor(items: list[int | float | str | dict[str, Any]] | PipelineChannel, name: str | None = None, parallelism: int | None = None)[source]

Bases: TasksGroup

A class for creating parallelized for loop control flow over a static set of items within a pipeline definition.

Parameters
items: list[int | float | str | dict[str, Any]] | PipelineChannel

The items to loop over. It can be either a constant Python list or a list output from an upstream task.

name: str | None = None

The name of the for loop group.

parallelism: int | None = None

The maximum number of concurrent iterations that can be scheduled for execution. A value of 0 represents unconstrained parallelism (default is unconstrained).

Example

with dsl.ParallelFor(
  items=[{'a': 1, 'b': 10}, {'a': 2, 'b': 20}],
  parallelism=1
) as item:
    task1 = my_component(..., number=item.a)
    task2 = my_component(..., number=item.b)

In the example, the group of tasks containing task1 and task2 would be executed twice, once with case args=[{'a': 1, 'b': 10}] and once with case args=[{'a': 2, 'b': 20}]. The parallelism=1 setting causes only 1 execution to be scheduled at a time.

class kfp.dsl.Collected(output: PipelineChannel)[source]

Bases: PipelineChannel

For collecting into a list the output from a task in dsl.ParallelFor loops.

Parameters
output: PipelineChannel

The output of an upstream task within a dsl.ParallelFor loop.

Example

@dsl.pipeline
def math_pipeline() -> int:
    with dsl.ParallelFor([1, 2, 3]) as x:
        t = double(num=x)

return add(nums=dsl.Collected(t.output)).output
class kfp.dsl.IfPresentPlaceholder(input_name: str, then: str | Placeholder | list[str | Placeholder], else_: str | Placeholder | list[str | Placeholder] | None = None)[source]

Bases: Placeholder

Placeholder for handling cases where an input may or may not be passed. May contain other placeholders.

Parameters
input_name: str

Name of the input/output.

then: str | Placeholder | list[str | Placeholder]

If the input/output specified in name is present, the command-line argument will be replaced at run-time by the value of then.

else

If the input/output specified in name is not present, the command-line argument will be replaced at run-time by the value of else_.

Examples

@container_component
def container_with_if_placeholder(output_path: OutputPath(str),
                                  dataset: Output[Dataset],
                                  optional_input: str = 'default'):
    return ContainerSpec(
            image='python:3.9',
            command=[
                'my_program',
                IfPresentPlaceholder(
                    input_name='optional_input',
                    then=[optional_input],
                    else_=['no_input']), '--dataset',
                IfPresentPlaceholder(
                    input_name='optional_input', then=[dataset.uri], else_=['no_dataset'])
            ],
            args=['--output_path', output_path]
        )
class kfp.dsl.ConcatPlaceholder(items: list[str | Placeholder])[source]

Bases: Placeholder

Placeholder for concatenating multiple strings. May contain other placeholders.

Parameters
items: list[str | Placeholder]

Elements to concatenate.

Examples

@container_component
def container_with_concat_placeholder(text1: str, text2: Output[Dataset],
                                      output_path: OutputPath(str)):
    return ContainerSpec(
        image='python:3.9',
        command=[
            'my_program',
            ConcatPlaceholder(['prefix-', text1, text2.uri])
        ],
        args=['--output_path', output_path]
    )
class kfp.dsl.PipelineTask(component_spec: ComponentSpec, args: dict[str, Any], execute_locally: bool = False, execution_caching_default: bool = True)[source]

Bases: object

Represents a pipeline task (instantiated component).

Note: PipelineTask should not be constructed by pipeline authors directly, but instead obtained via an instantiated component (see example).

Replaces ContainerOp from kfp v1. Holds operations available on a task object, such as .after(), .set_memory_limit(), .enable_caching(), etc.

Parameters
component_spec: ComponentSpec

The component definition.

args: dict[str, Any]

The dictionary of arguments on which the component was called to instantiate this task.

Example

@dsl.component
def identity(message: str) -> str:
    return message

@dsl.pipeline(name='my_pipeline')
def my_pipeline():
    # task is an instance of PipelineTask
    task = identity(message='my string')

Attributes:

platform_spec

PlatformSpec for all tasks in the pipeline as task.

name

The name of the task.

inputs

The inputs passed to the task.

output

The single output of the task.

outputs

The dictionary of outputs of the task.

dependent_tasks

A list of the dependent task names.

Methods:

set_caching_options(enable_caching)

Sets caching options for the task.

set_cpu_request(cpu)

Sets CPU request (minimum) for the task.

set_cpu_limit(cpu)

Sets CPU limit (maximum) for the task.

set_accelerator_limit(limit)

Sets accelerator limit (maximum) for the task.

set_memory_request(memory)

Sets memory request (minimum) for the task.

set_memory_limit(memory)

Sets memory limit (maximum) for the task.

set_retry(num_retries[, backoff_duration, ...])

Sets task retry parameters.

add_node_selector_constraint(accelerator)

Sets accelerator type to use when executing this task.

set_accelerator_type(accelerator)

Sets accelerator type to use when executing this task.

set_display_name(name)

Sets display name for the task.

set_env_variable(name, value)

Sets environment variable for the task.

set_container_image(name)

Sets container type to use when executing this task.

after(*tasks)

Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion.

ignore_upstream_failure()

If called, the pipeline task will run when any specified upstream tasks complete, even if unsuccessful.

property platform_spec : PlatformSpec

PlatformSpec for all tasks in the pipeline as task.

Only for use on tasks created from GraphComponents.

property name : str

The name of the task.

Unique within its parent group.

property inputs : dict[str, str | int | float | bool | dict | list | PipelineChannel]

The inputs passed to the task.

property output : PipelineChannel

The single output of the task.

Used when a task has exactly one output parameter.

property outputs : Mapping[str, PipelineChannel]

The dictionary of outputs of the task.

Used when a task has more the one output or uses an OutputPath or Output[Artifact] type annotation.

property dependent_tasks : list[str]

A list of the dependent task names.

set_caching_options(enable_caching: bool) PipelineTask[source]

Sets caching options for the task.

Parameters
enable_caching: bool

Whether to enable caching.

Returns

Self return to allow chained setting calls.

set_cpu_request(cpu: str | PipelineChannel) PipelineTask[source]

Sets CPU request (minimum) for the task.

Parameters
cpu: str | PipelineChannel

Minimum CPU requests required. This string should be a number or a number followed by an “m” to indicate millicores (1/1000). For more information, see Specify a CPU Request and a CPU Limit.

Returns

Self return to allow chained setting calls.

set_cpu_limit(cpu: str | PipelineChannel) PipelineTask[source]

Sets CPU limit (maximum) for the task.

Parameters
cpu: str | PipelineChannel

Maximum CPU requests allowed. This string should be a number or a number followed by an “m” to indicate millicores (1/1000). For more information, see Specify a CPU Request and a CPU Limit.

Returns

Self return to allow chained setting calls.

set_accelerator_limit(limit: int | str | PipelineChannel) PipelineTask[source]

Sets accelerator limit (maximum) for the task. Only applies if accelerator type is also set via .set_accelerator_type().

Parameters
limit: int | str | PipelineChannel

Maximum number of accelerators allowed.

Returns

Self return to allow chained setting calls.

set_memory_request(memory: str | PipelineChannel) PipelineTask[source]

Sets memory request (minimum) for the task.

Parameters
memory: str | PipelineChannel

The minimum memory requests required. This string should be a number or a number followed by one of “E”, “Ei”, “P”, “Pi”, “T”, “Ti”, “G”, “Gi”, “M”, “Mi”, “K”, or “Ki”.

Returns

Self return to allow chained setting calls.

set_memory_limit(memory: str | PipelineChannel) PipelineTask[source]

Sets memory limit (maximum) for the task.

Parameters
memory: str | PipelineChannel

The maximum memory requests allowed. This string should be a number or a number followed by one of “E”, “Ei”, “P”, “Pi”, “T”, “Ti”, “G”, “Gi”, “M”, “Mi”, “K”, or “Ki”.

Returns

Self return to allow chained setting calls.

set_retry(num_retries: int, backoff_duration: str | None = None, backoff_factor: float | None = None, backoff_max_duration: str | None = None) PipelineTask[source]

Sets task retry parameters.

Parameters
num_retries: int

Number of times to retry on failure.

backoff_duration: str | None = None

Number of seconds to wait before triggering a retry. Defaults to '0s' (immediate retry).

backoff_factor: float | None = None

Exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then again after 120, 240, and so on. Defaults to 2.0.

backoff_max_duration: str | None = None

Maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to '3600s'.

Returns

Self return to allow chained setting calls.

add_node_selector_constraint(accelerator: str) PipelineTask[source]

Sets accelerator type to use when executing this task.

Parameters
accelerator: str

The name of the accelerator, such as 'NVIDIA_TESLA_K80', 'TPU_V3', 'nvidia.com/gpu' or 'cloud-tpus.google.com/v3'.

Returns

Self return to allow chained setting calls.

set_accelerator_type(accelerator: str | PipelineChannel) PipelineTask[source]

Sets accelerator type to use when executing this task.

Parameters
accelerator: str | PipelineChannel

The name of the accelerator, such as 'NVIDIA_TESLA_K80', 'TPU_V3', 'nvidia.com/gpu' or 'cloud-tpus.google.com/v3'.

Returns

Self return to allow chained setting calls.

set_display_name(name: str) PipelineTask[source]

Sets display name for the task.

Parameters
name: str

Display name.

Returns

Self return to allow chained setting calls.

set_env_variable(name: str, value: str) PipelineTask[source]

Sets environment variable for the task.

Parameters
name: str

Environment variable name.

value: str

Environment variable value.

Returns

Self return to allow chained setting calls.

set_container_image(name: str | PipelineChannel) PipelineTask[source]

Sets container type to use when executing this task. Takes precedence over @component(base_image=…)

Parameters
name: str | PipelineChannel

The name of the image, e.g. “python:3.9-alpine”.

Returns

Self return to allow chained setting calls.

after(*tasks) PipelineTask[source]

Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion.

Parameters
*tasks

Tasks after which this task should be executed.

Returns

Self return to allow chained setting calls.

Example

@dsl.pipeline(name='my-pipeline')
def my_pipeline():
    task1 = my_component(text='1st task')
    task2 = my_component(text='2nd task').after(task1)
ignore_upstream_failure() PipelineTask[source]

If called, the pipeline task will run when any specified upstream tasks complete, even if unsuccessful.

This method effectively turns the caller task into an exit task if the caller task has upstream dependencies.

If the task has no upstream tasks, either via data exchange or an explicit dependency via .after(), this method has no effect.

Returns

Self return to allow chained setting calls.

Example

@dsl.pipeline()
def my_pipeline(text: str = 'message'):
    task = fail_op(message=text)
    clean_up_task = print_op(
        message=task.output).ignore_upstream_failure()
class kfp.dsl.PipelineConfig[source]

Bases: object

PipelineConfig contains pipeline-level config options.