kfp.dsl¶
The kfp.dsl module contains domain-specific language objects used to compose pipelines.
Data:
Type generic used to represent an input artifact of type |
|
A type generic used to represent an output artifact of type |
|
A placeholder used to obtain a pipeline job name within a task at pipeline runtime. |
|
A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime. |
|
A placeholder used to obtain a pipeline job ID within a task at pipeline runtime. |
|
A placeholder used to obtain a task name within a task at pipeline runtime. |
|
A placeholder used to obtain a task ID within a task at pipeline runtime. |
|
A placeholder used to obtain the path to the executor_output.json file within the task container. |
|
A placeholder used to obtain executor input message passed to the task. |
|
A placeholder used to obtain the pipeline root. |
|
A placeholder used to obtain the time that a pipeline job was created. |
|
A placeholder used to obtain the time for which a pipeline job is scheduled. |
Classes:
|
Type annotation used in component definitions for indicating a parameter is a path to an input. |
|
Type annotation used in component definitions for indicating a parameter is a path to an output. |
|
A final status of a pipeline task. |
|
Represents a generic machine learning artifact. |
|
An artifact for storing classification metrics. |
|
An artifact representing a machine learning dataset. |
|
An artifact representing an HTML file. |
|
An artifact representing a markdown file. |
|
An artifact for storing key-value scalar metrics. |
|
An artifact representing a machine learning model. |
|
An artifact for storing sliced classification metrics. |
|
Container definition. |
|
Deprecated. |
|
A class for creating a conditional control flow "if" block within a pipeline. |
|
A class for creating a conditional control flow "else if" block within a pipeline. |
|
A class for creating a conditional control flow "else" block within a pipeline. |
|
For collecting mutually exclusive outputs from conditional branches into a single pipeline channel. |
|
A class for setting an exit handler task that is invoked upon exiting a group of other tasks. |
|
A class for creating parallelized for loop control flow over a static set of items within a pipeline definition. |
|
For collecting into a list the output from a task in dsl.ParallelFor loops. |
|
Placeholder for handling cases where an input may or may not be passed. |
|
Placeholder for concatenating multiple strings. |
|
Represents a pipeline task (instantiated component). |
PipelineConfig contains pipeline-level config options. |
Functions:
|
Gets the task root URI, a unique object storage URI associated with the current task. |
|
Decorator for Python-function based components. |
|
Decorator for container-based components in KFP v2. |
|
Decorator used to construct a pipeline. |
|
Imports an existing artifact for use in a downstream component. |
- kfp.dsl.Input(*args, **kwargs)¶
Type generic used to represent an input artifact of type
T
, whereT
is an artifact class.Use
Input[Artifact]
orOutput[Artifact]
to indicate whether the enclosed artifact is a component input or output.- Parameters¶
- T
The type of the input artifact.
Example
@dsl.component def artifact_producer(model: Output[Artifact]): with open(model.path, 'w') as f: f.write('my model') @dsl.component def artifact_consumer(model: Input[Artifact]): print(model) @dsl.pipeline def my_pipeline(): producer_task = artifact_producer() artifact_consumer(model=producer_task.output)
alias of
T
[T
]
- kfp.dsl.Output(*args, **kwargs)¶
A type generic used to represent an output artifact of type
T
, whereT
is an artifact class. The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).Use
Input[Artifact]
orOutput[Artifact]
to indicate whether the enclosed artifact is a component input or output.- Parameters¶
- T
The type of the output artifact.
Example
@dsl.component def artifact_producer(model: Output[Artifact]): with open(model.path, 'w') as f: f.write('my model') @dsl.component def artifact_consumer(model: Input[Artifact]): print(model) @dsl.pipeline def my_pipeline(): producer_task = artifact_producer() artifact_consumer(model=producer_task.output)
alias of
T
[T
]
-
class kfp.dsl.InputPath(type=
None
)[source]¶ Bases:
object
Type annotation used in component definitions for indicating a parameter is a path to an input.
Example
@dsl.component def create_dataset(dataset_path: OutputPath('Dataset'),): import json dataset = {'my_dataset': [[1, 2, 3], [4, 5, 6]]} with open(dataset_path, 'w') as f: json.dump(dataset, f) @dsl.component def consume_dataset(dataset: InputPath('Dataset')): print(dataset) @dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket') def my_pipeline(): create_dataset_op = create_dataset() consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])
-
class kfp.dsl.OutputPath(type=
None
)[source]¶ Bases:
object
Type annotation used in component definitions for indicating a parameter is a path to an output. The path parameter typed with this annotation can be treated as a locally accessible filepath within the component body.
The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).
Example
@dsl.component def create_parameter( message: str, output_parameter_path: OutputPath(str), ): with open(output_parameter_path, 'w') as f: f.write(message) @dsl.component def consume_parameter(message: str): print(message) @dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket') def my_pipeline(message: str = 'default message'): create_param_op = create_parameter(message=message) consume_parameter(message=create_param_op.outputs['output_parameter_path'])
- class kfp.dsl.PipelineTaskFinalStatus(state: str, pipeline_job_resource_name: str, pipeline_task_name: str, error_code: int | None, error_message: str | None)[source]¶
Bases:
object
A final status of a pipeline task. Annotate a component parameter with this class to obtain a handle to a task’s status (see example).
This is the Python representation of the proto message PipelineTaskFinalStatus.
Examples
@dsl.component def task_status(user_input: str, status: PipelineTaskFinalStatus): print('Pipeline status: ', status.state) print('Job resource name: ', status.pipeline_job_resource_name) print('Pipeline task name: ', status.pipeline_task_name) print('Error code: ', status.error_code) print('Error message: ', status.error_message) @dsl.pipeline(name='my_pipeline') def my_pipeline(): task = task_status(user_input='my_input')
Attributes:
Final state of the task.
Pipeline job resource name, in the format of
projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}
.Name of the task that produced this status.
The google.rpc.Code in case of error.
In case of error, the detailed error message.
- state : str¶
Final state of the task. The value could be one of
'SUCCEEDED'
,'FAILED'
or'CANCELLED'
.
- pipeline_job_resource_name : str¶
Pipeline job resource name, in the format of
projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}
.
- pipeline_task_name : str¶
Name of the task that produced this status.
- error_code : int | None¶
The google.rpc.Code in case of error. If state is
'SUCCEEDED'
, this isNone
.
- error_message : str | None¶
In case of error, the detailed error message. If state is
'SUCCEEDED'
, this isNone
.
-
class kfp.dsl.Artifact(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
object
Represents a generic machine learning artifact.
This class and all artifact classes store the name, uri, and metadata for a machine learning artifact. Use this artifact type when an artifact does not fit into another more specific artifact type (e.g.,
Model
,Dataset
).- Parameters¶
Example
from kfp import dsl from kfp.dsl import Output, Artifact, Input @dsl.component def create_artifact( data: str, output_artifact: Output[Artifact], ): with open(output_artifact.path, 'w') as f: f.write(data) @dsl.component def use_artifact(input_artifact: Input[Artifact]): with open(input_artifact.path) as input_file: artifact_contents = input_file.read() print(artifact_contents) @dsl.pipeline(name='my-pipeline', pipeline_root='gs://my/storage') def my_pipeline(): create_task = create_artifact(data='my data') use_artifact(input_artifact=create_task.outputs['output_artifact'])
Note: Other artifacts are used similarly to the usage of
Artifact
in the example above (withinInput[]
andOutput[]
).Attributes:
-
schema_title =
'system.Artifact'
¶
-
schema_version =
'0.0.1'
¶
- property path : str¶
-
class kfp.dsl.ClassificationMetrics(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact for storing classification metrics.
- Parameters¶
Attributes:
Methods:
log_roc_data_point
(fpr, tpr, threshold)Logs a single data point in the ROC curve to metadata.
log_roc_curve
(fpr, tpr, threshold)Logs an ROC curve to metadata.
set_confusion_matrix_categories
(categories)Stores confusion matrix categories to metadata.
log_confusion_matrix_row
(row_category, row)Logs a confusion matrix row to metadata.
log_confusion_matrix_cell
(row_category, ...)Logs a cell in the confusion matrix to metadata.
log_confusion_matrix
(categories, matrix)Logs a confusion matrix to metadata.
-
schema_title =
'system.ClassificationMetrics'
¶
- log_roc_data_point(fpr: float, tpr: float, threshold: float) None [source]¶
Logs a single data point in the ROC curve to metadata.
- log_roc_curve(fpr: list[float], tpr: list[float], threshold: list[float]) None [source]¶
Logs an ROC curve to metadata.
- set_confusion_matrix_categories(categories: list[str]) None [source]¶
Stores confusion matrix categories to metadata.
- log_confusion_matrix_row(row_category: str, row: list[float]) None [source]¶
Logs a confusion matrix row to metadata.
- log_confusion_matrix_cell(row_category: str, col_category: str, value: int) None [source]¶
Logs a cell in the confusion matrix to metadata.
- log_confusion_matrix(categories: list[str], matrix: list[list[int]]) None [source]¶
Logs a confusion matrix to metadata.
-
class kfp.dsl.Dataset(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact representing a machine learning dataset.
- Parameters¶
Attributes:
-
schema_title =
'system.Dataset'
¶
-
class kfp.dsl.HTML(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact representing an HTML file.
- Parameters¶
Attributes:
-
schema_title =
'system.HTML'
¶
-
class kfp.dsl.Markdown(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact representing a markdown file.
- Parameters¶
Attributes:
-
schema_title =
'system.Markdown'
¶
-
class kfp.dsl.Metrics(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact for storing key-value scalar metrics.
- Parameters¶
Attributes:
Methods:
log_metric
(metric, value)Sets a custom scalar metric in the artifact's metadata.
-
schema_title =
'system.Metrics'
¶
-
class kfp.dsl.Model(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact representing a machine learning model.
- Parameters¶
Attributes:
-
schema_title =
'system.Model'
¶
- property framework : str¶
-
class kfp.dsl.SlicedClassificationMetrics(name: str | None =
None
, uri: str | None =None
, metadata: dict | None =None
)[source]¶ Bases:
Artifact
An artifact for storing sliced classification metrics.
Similar to
ClassificationMetrics
, tasks using this class are expected to use log methods of the class to log metrics with the difference being each log method takes a slice to associate theClassificationMetrics
.- Parameters¶
Attributes:
Methods:
log_roc_reading
(slice, threshold, tpr, fpr)Logs a single data point in the ROC curve of a slice to metadata.
load_roc_readings
(slice, readings)Bulk loads ROC curve readings for a slice.
set_confusion_matrix_categories
(slice, ...)Logs confusion matrix categories for a slice to metadata.
log_confusion_matrix_row
(slice, ...)Logs a confusion matrix row for a slice to metadata.
log_confusion_matrix_cell
(slice, ...)Logs a confusion matrix cell for a slice to metadata.
load_confusion_matrix
(slice, categories, matrix)Bulk loads the whole confusion matrix for a slice.
-
schema_title =
'system.SlicedClassificationMetrics'
¶
- log_roc_reading(slice: str, threshold: float, tpr: float, fpr: float) None [source]¶
Logs a single data point in the ROC curve of a slice to metadata.
- load_roc_readings(slice: str, readings: list[list[float]]) None [source]¶
Bulk loads ROC curve readings for a slice.
- set_confusion_matrix_categories(slice: str, categories: list[str]) None [source]¶
Logs confusion matrix categories for a slice to metadata.
Categories are stored in the internal
metrics_utils.ConfusionMatrix
instance of the slice.
- log_confusion_matrix_row(slice: str, row_category: str, row: list[int]) None [source]¶
Logs a confusion matrix row for a slice to metadata.
Row is updated on the internal
metrics_utils.ConfusionMatrix
instance of the slice.
- log_confusion_matrix_cell(slice: str, row_category: str, col_category: str, value: int) None [source]¶
Logs a confusion matrix cell for a slice to metadata.
Cell is updated on the internal
metrics_utils.ConfusionMatrix
instance of the slice.
- load_confusion_matrix(slice: str, categories: list[str], matrix: list[list[int]]) None [source]¶
Bulk loads the whole confusion matrix for a slice.
-
kfp.dsl.get_uri(suffix: str =
'Output'
) str [source]¶ Gets the task root URI, a unique object storage URI associated with the current task. This function may only be called at task runtime.
Returns an empty string if the task root cannot be inferred from the runtime environment.
-
kfp.dsl.PIPELINE_JOB_NAME_PLACEHOLDER =
'{{$.pipeline_job_name}}'
¶ A placeholder used to obtain a pipeline job name within a task at pipeline runtime.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Job name:', value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER =
'{{$.pipeline_job_resource_name}}'
¶ A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Job resource name:', value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER =
'{{$.pipeline_job_uuid}}'
¶ A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Job ID:', value=dsl.PIPELINE_JOB_ID_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_TASK_NAME_PLACEHOLDER =
'{{$.pipeline_task_name}}'
¶ A placeholder used to obtain a task name within a task at pipeline runtime.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Task name:', value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_TASK_ID_PLACEHOLDER =
'{{$.pipeline_task_uuid}}'
¶ A placeholder used to obtain a task ID within a task at pipeline runtime.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Task ID:', value=dsl.PIPELINE_TASK_ID_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER =
'{{$.outputs.output_file}}'
¶ A placeholder used to obtain the path to the executor_output.json file within the task container.
Example
@dsl.pipeline def my_pipeline(): create_artifact_with_metadata( metadata={'foo': 'bar'}, executor_output_destination=dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER =
'{{$}}'
¶ A placeholder used to obtain executor input message passed to the task.
Example
@dsl.pipeline def my_pipeline(): custom_container_op( executor_input=dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER, )
-
kfp.dsl.PIPELINE_ROOT_PLACEHOLDER =
'{{$.pipeline_root}}'
¶ A placeholder used to obtain the pipeline root.
Example
@dsl.pipeline def my_pipeline(): store_model( tmp_dir=dsl.PIPELINE_ROOT_PLACEHOLDER+'/tmp', )
-
kfp.dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER =
'{{$.pipeline_job_create_time_utc}}'
¶ A placeholder used to obtain the time that a pipeline job was created.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Job created at:', value=dsl.PIPELINE_JOB_CREATE_TIME_UTC, )
-
kfp.dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER =
'{{$.pipeline_job_schedule_time_utc}}'
¶ A placeholder used to obtain the time for which a pipeline job is scheduled.
Example
@dsl.pipeline def my_pipeline(): print_op( msg='Job scheduled at:', value=dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC, )
-
kfp.dsl.component(func: Callable | None =
None
, *, base_image: str | None =None
, target_image: str | None =None
, packages_to_install: list[str] | None =None
, pip_index_urls: list[str] | None =None
, output_component_file: str | None =None
, install_kfp_package: bool =True
, kfp_package_path: str | None =None
, pip_trusted_hosts: list[str] | None =None
, use_venv: bool =False
)[source]¶ Decorator for Python-function based components.
A KFP component can either be a lightweight component or a containerized component.
If
target_image
is not specified, this function creates a lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. In lightweight components,packages_to_install
will be used to install dependencies at runtime. The parametersinstall_kfp_package
andkfp_package_path
can be used to control how and from where KFP should be installed when the lightweight component is executed.If
target_image
is specified, this function creates a component definition based around thetarget_image
. The assumption is that the function infunc
will be packaged by KFP into thistarget_image
. You can use the KFP CLI’sbuild
command to package the function intotarget_image
.- Parameters¶
- func: Callable | None =
None
¶ Python function from which to create a component. The function should have type annotations for all its arguments, indicating how each argument is intended to be used (e.g. as an input/output artifact, a plain parameter, or a path to a file).
- base_image: str | None =
None
¶ Image to use when executing the Python function. It should contain a default Python interpreter that is compatible with KFP.
- target_image: str | None =
None
¶ Image to when creating containerized components.
- packages_to_install: list[str] | None =
None
¶ List of packages to install before executing the Python function. These will always be installed at component runtime.
- pip_index_urls: list[str] | None =
None
¶ Python Package Index base URLs from which to install
packages_to_install
. Defaults to installing from only PyPI ('https://pypi.org/simple'
). For more information, see pip install docs.- output_component_file: str | None =
None
¶ If specified, this function will write a shareable/loadable version of the component spec into this file.
Warning: This compilation approach is deprecated.
- install_kfp_package: bool =
True
¶ Specifies if the KFP SDK should add the
kfp
Python package topackages_to_install
. Lightweight Python functions always require an installation of KFP inbase_image
to work. If you specify abase_image
that already contains KFP, you can set this toFalse
. This flag is ignored whentarget_image
is specified, which implies a choice to build a containerized component. Containerized components will always install KFP as part of the build process.- kfp_package_path: str | None =
None
¶ Specifies the location from which to install KFP. By default, this will try to install from PyPI using the same version as that used when this component was created. Component authors can choose to override this to point to a GitHub pull request or other pip-compatible package server.
- use_venv: bool =
False
¶ Specifies if the component should be executed in a virtual environment. The environment will be created in a temporary directory and will inherit the system site packages. This is useful in restricted environments where most of the system is read-only.
- func: Callable | None =
- Returns¶
A component task factory that can be used in pipeline definitions.
Example
from kfp import dsl @dsl.component def my_function_one(input: str, output: Output[Model]): ... @dsl.component( base_image='python:3.9', output_component_file='my_function.yaml' ) def my_function_two(input: Input[Mode])): ... @dsl.pipeline(name='my-pipeline', pipeline_root='...') def pipeline(): my_function_one_task = my_function_one(input=...) my_function_two_task = my_function_two(input=my_function_one_task.outputs)
- kfp.dsl.container_component(func: Callable) ContainerComponent [source]¶
Decorator for container-based components in KFP v2.
- Parameters¶
- func: Callable¶
The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file).
Example
from kfp.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output @container_component def my_component( dataset_path: InputPath(Dataset), model: Output[Model], num_epochs: int, output_parameter: OutputPath(str), ): return ContainerSpec( image='gcr.io/my-image', command=['sh', 'my_component.sh'], args=[ '--dataset_path', dataset_path, '--model_path', model.path, '--output_parameter_path', output_parameter, ] )
-
kfp.dsl.pipeline(func: Callable | None =
None
, *, name: str | None =None
, description: str | None =None
, pipeline_root: str | None =None
, display_name: str | None =None
, pipeline_config: PipelineConfig | None =None
) Callable [source]¶ Decorator used to construct a pipeline.
- Example
@pipeline( name='my-pipeline', description='My ML Pipeline.' pipeline_root='gs://my-bucket/my-output-path' ) def my_pipeline(a: str, b: int): ...
- Parameters¶
- func: Callable | None =
None
¶ The Python function that defines a pipeline.
- name: str | None =
None
¶ The pipeline name. Defaults to a sanitized version of the decorated function name.
- description: str | None =
None
¶ A human-readable description of the pipeline.
- pipeline_root: str | None =
None
¶ The root directory from which to read input and output parameters and artifacts.
- display_name: str | None =
None
¶ A human-readable name for the pipeline.
- pipeline_config: PipelineConfig | None =
None
¶ Pipeline-level config options.
- func: Callable | None =
-
kfp.dsl.importer(artifact_uri: PipelineParameterChannel | str, artifact_class: type[Artifact], reimport: bool =
False
, metadata: Mapping[str, Any] | None =None
) PipelineTask [source]¶ Imports an existing artifact for use in a downstream component.
Examples:
@dsl.pipeline(name='pipeline-with-importer') def pipeline_with_importer(): importer1 = importer( artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt', artifact_class=Dataset, reimport=False) train(dataset=importer1.output)
-
class kfp.dsl.ContainerSpec(image: str, command: list[str | Placeholder] | None =
None
, args: list[str | Placeholder] | None =None
)[source]¶ Bases:
object
Container definition.
This is only used for pipeline authors when constructing a containerized component using @container_component decorator.
Examples
@container_component def container_with_artifact_output( num_epochs: int, # built-in types are parsed as inputs model: Output[Model], model_config_path: OutputPath(str), ): return ContainerSpec( image='gcr.io/my-image', command=['sh', 'run.sh'], args=[ '--epochs', num_epochs, '--model_path', model.uri, '--model_config_path', model_config_path, ])
Attributes:
Container image.
Container entrypoint.
Arguments to the container entrypoint.
- image : str¶
Container image.
-
command : list[str | Placeholder] | None =
None
¶ Container entrypoint.
-
args : list[str | Placeholder] | None =
None
¶ Arguments to the container entrypoint.
-
class kfp.dsl.Condition(condition, name: str | None =
None
)[source]¶ Bases:
If
Deprecated.
Use dsl.If instead.
-
class kfp.dsl.If(condition, name: str | None =
None
)[source]¶ Bases:
_ConditionBase
A class for creating a conditional control flow “if” block within a pipeline.
- Parameters¶
Example
task1 = my_component1(...) with dsl.If(task1.output=='pizza', 'pizza-condition'): task2 = my_component2(...)
-
class kfp.dsl.Elif(condition, name: str | None =
None
)[source]¶ Bases:
_ConditionBase
A class for creating a conditional control flow “else if” block within a pipeline. Can be used following an upstream dsl.If or dsl.Elif.
- Parameters¶
Example
task1 = my_component1(...) task2 = my_component2(...) with dsl.If(task1.output=='pizza', 'pizza-condition'): task3 = my_component3(...) with dsl.Elif(task2.output=='pasta', 'pasta-condition'): task4 = my_component4(...)
-
class kfp.dsl.Else(name: str | None =
None
)[source]¶ Bases:
_ConditionBase
A class for creating a conditional control flow “else” block within a pipeline. Can be used following an upstream dsl.If or dsl.Elif.
Example
task1 = my_component1(...) task2 = my_component2(...) with dsl.If(task1.output=='pizza', 'pizza-condition'): task3 = my_component3(...) with dsl.Elif(task2.output=='pasta', 'pasta-condition'): task4 = my_component4(...) with dsl.Else(): my_component5(...)
- class kfp.dsl.OneOf(*channels: PipelineParameterChannel | PipelineArtifactChannel)[source]¶
Bases:
object
For collecting mutually exclusive outputs from conditional branches into a single pipeline channel.
- Parameters¶
- *channels: PipelineParameterChannel | PipelineArtifactChannel¶
The channels to collect into a OneOf. Must be of the same type.
Example
@dsl.pipeline def flip_coin_pipeline() -> str: flip_coin_task = flip_coin() with dsl.If(flip_coin_task.output == 'heads'): print_task_1 = print_and_return(text='Got heads!') with dsl.Else(): print_task_2 = print_and_return(text='Got tails!') # use the output from the branch that gets executed oneof = dsl.OneOf(print_task_1.output, print_task_2.output) # consume it print_and_return(text=oneof) # return it return oneof
-
class kfp.dsl.ExitHandler(exit_task: PipelineTask, name: str | None =
None
)[source]¶ Bases:
TasksGroup
A class for setting an exit handler task that is invoked upon exiting a group of other tasks.
- Parameters¶
- exit_task: PipelineTask¶
The task that is invoked after exiting a group of other tasks.
- name: str | None =
None
¶ The name of the exit handler group.
Example
exit_task = ExitComponent(...) with ExitHandler(exit_task): task1 = my_component1(...) task2 = my_component2(...)
-
class kfp.dsl.ParallelFor(items: list[int | float | str | dict[str, Any]] | PipelineChannel, name: str | None =
None
, parallelism: int | None =None
)[source]¶ Bases:
TasksGroup
A class for creating parallelized for loop control flow over a static set of items within a pipeline definition.
- Parameters¶
- items: list[int | float | str | dict[str, Any]] | PipelineChannel¶
The items to loop over. It can be either a constant Python list or a list output from an upstream task.
- name: str | None =
None
¶ The name of the for loop group.
- parallelism: int | None =
None
¶ The maximum number of concurrent iterations that can be scheduled for execution. A value of 0 represents unconstrained parallelism (default is unconstrained).
Example
with dsl.ParallelFor( items=[{'a': 1, 'b': 10}, {'a': 2, 'b': 20}], parallelism=1 ) as item: task1 = my_component(..., number=item.a) task2 = my_component(..., number=item.b)
In the example, the group of tasks containing
task1
andtask2
would be executed twice, once with caseargs=[{'a': 1, 'b': 10}]
and once with caseargs=[{'a': 2, 'b': 20}]
. Theparallelism=1
setting causes only 1 execution to be scheduled at a time.
- class kfp.dsl.Collected(output: PipelineChannel)[source]¶
Bases:
PipelineChannel
For collecting into a list the output from a task in dsl.ParallelFor loops.
Example
@dsl.pipeline def math_pipeline() -> int: with dsl.ParallelFor([1, 2, 3]) as x: t = double(num=x) return add(nums=dsl.Collected(t.output)).output
-
class kfp.dsl.IfPresentPlaceholder(input_name: str, then: str | Placeholder | list[str | Placeholder], else_: str | Placeholder | list[str | Placeholder] | None =
None
)[source]¶ Bases:
Placeholder
Placeholder for handling cases where an input may or may not be passed. May contain other placeholders.
- Parameters¶
- input_name: str¶
Name of the input/output.
- then: str | Placeholder | list[str | Placeholder]¶
If the input/output specified in name is present, the command-line argument will be replaced at run-time by the value of then.
- else
If the input/output specified in name is not present, the command-line argument will be replaced at run-time by the value of else_.
Examples
@container_component def container_with_if_placeholder(output_path: OutputPath(str), dataset: Output[Dataset], optional_input: str = 'default'): return ContainerSpec( image='python:3.9', command=[ 'my_program', IfPresentPlaceholder( input_name='optional_input', then=[optional_input], else_=['no_input']), '--dataset', IfPresentPlaceholder( input_name='optional_input', then=[dataset.uri], else_=['no_dataset']) ], args=['--output_path', output_path] )
- class kfp.dsl.ConcatPlaceholder(items: list[str | Placeholder])[source]¶
Bases:
Placeholder
Placeholder for concatenating multiple strings. May contain other placeholders.
Examples
@container_component def container_with_concat_placeholder(text1: str, text2: Output[Dataset], output_path: OutputPath(str)): return ContainerSpec( image='python:3.9', command=[ 'my_program', ConcatPlaceholder(['prefix-', text1, text2.uri]) ], args=['--output_path', output_path] )
-
class kfp.dsl.PipelineTask(component_spec: ComponentSpec, args: dict[str, Any], execute_locally: bool =
False
, execution_caching_default: bool =True
)[source]¶ Bases:
object
Represents a pipeline task (instantiated component).
Note:
PipelineTask
should not be constructed by pipeline authors directly, but instead obtained via an instantiated component (see example).Replaces
ContainerOp
fromkfp
v1. Holds operations available on a task object, such as.after()
,.set_memory_limit()
,.enable_caching()
, etc.- Parameters¶
Example
@dsl.component def identity(message: str) -> str: return message @dsl.pipeline(name='my_pipeline') def my_pipeline(): # task is an instance of PipelineTask task = identity(message='my string')
Attributes:
PlatformSpec for all tasks in the pipeline as task.
The name of the task.
The inputs passed to the task.
The single output of the task.
The dictionary of outputs of the task.
A list of the dependent task names.
Methods:
set_caching_options
(enable_caching)Sets caching options for the task.
set_cpu_request
(cpu)Sets CPU request (minimum) for the task.
set_cpu_limit
(cpu)Sets CPU limit (maximum) for the task.
set_accelerator_limit
(limit)Sets accelerator limit (maximum) for the task.
set_memory_request
(memory)Sets memory request (minimum) for the task.
set_memory_limit
(memory)Sets memory limit (maximum) for the task.
set_retry
(num_retries[, backoff_duration, ...])Sets task retry parameters.
add_node_selector_constraint
(accelerator)Sets accelerator type to use when executing this task.
set_accelerator_type
(accelerator)Sets accelerator type to use when executing this task.
set_display_name
(name)Sets display name for the task.
set_env_variable
(name, value)Sets environment variable for the task.
set_container_image
(name)Sets container type to use when executing this task.
after
(*tasks)Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion.
If called, the pipeline task will run when any specified upstream tasks complete, even if unsuccessful.
- property platform_spec : PlatformSpec¶
PlatformSpec for all tasks in the pipeline as task.
Only for use on tasks created from GraphComponents.
- property name : str¶
The name of the task.
Unique within its parent group.
- property inputs : dict[str, str | int | float | bool | dict | list | PipelineChannel]¶
The inputs passed to the task.
- property output : PipelineChannel¶
The single output of the task.
Used when a task has exactly one output parameter.
- property outputs : Mapping[str, PipelineChannel]¶
The dictionary of outputs of the task.
Used when a task has more the one output or uses an
OutputPath
orOutput[Artifact]
type annotation.
- property dependent_tasks : list[str]¶
A list of the dependent task names.
- set_caching_options(enable_caching: bool) PipelineTask [source]¶
Sets caching options for the task.
- set_cpu_request(cpu: str | PipelineChannel) PipelineTask [source]¶
Sets CPU request (minimum) for the task.
- Parameters¶
- cpu: str | PipelineChannel¶
Minimum CPU requests required. This string should be a number or a number followed by an “m” to indicate millicores (1/1000). For more information, see Specify a CPU Request and a CPU Limit.
- Returns¶
Self return to allow chained setting calls.
- set_cpu_limit(cpu: str | PipelineChannel) PipelineTask [source]¶
Sets CPU limit (maximum) for the task.
- Parameters¶
- cpu: str | PipelineChannel¶
Maximum CPU requests allowed. This string should be a number or a number followed by an “m” to indicate millicores (1/1000). For more information, see Specify a CPU Request and a CPU Limit.
- Returns¶
Self return to allow chained setting calls.
- set_accelerator_limit(limit: int | str | PipelineChannel) PipelineTask [source]¶
Sets accelerator limit (maximum) for the task. Only applies if accelerator type is also set via .set_accelerator_type().
- set_memory_request(memory: str | PipelineChannel) PipelineTask [source]¶
Sets memory request (minimum) for the task.
- set_memory_limit(memory: str | PipelineChannel) PipelineTask [source]¶
Sets memory limit (maximum) for the task.
-
set_retry(num_retries: int, backoff_duration: str | None =
None
, backoff_factor: float | None =None
, backoff_max_duration: str | None =None
) PipelineTask [source]¶ Sets task retry parameters.
- Parameters¶
- num_retries: int¶
Number of times to retry on failure.
- backoff_duration: str | None =
None
¶ Number of seconds to wait before triggering a retry. Defaults to
'0s'
(immediate retry).- backoff_factor: float | None =
None
¶ Exponential backoff factor applied to
backoff_duration
. For example, ifbackoff_duration="60"
(60 seconds) andbackoff_factor=2
, the first retry will happen after 60 seconds, then again after 120, 240, and so on. Defaults to2.0
.- backoff_max_duration: str | None =
None
¶ Maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to
'3600s'
.
- Returns¶
Self return to allow chained setting calls.
- add_node_selector_constraint(accelerator: str) PipelineTask [source]¶
Sets accelerator type to use when executing this task.
- set_accelerator_type(accelerator: str | PipelineChannel) PipelineTask [source]¶
Sets accelerator type to use when executing this task.
- set_display_name(name: str) PipelineTask [source]¶
Sets display name for the task.
- set_env_variable(name: str, value: str) PipelineTask [source]¶
Sets environment variable for the task.
- set_container_image(name: str | PipelineChannel) PipelineTask [source]¶
Sets container type to use when executing this task. Takes precedence over @component(base_image=…)
- after(*tasks) PipelineTask [source]¶
Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion.
- Parameters¶
- *tasks
Tasks after which this task should be executed.
- Returns¶
Self return to allow chained setting calls.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): task1 = my_component(text='1st task') task2 = my_component(text='2nd task').after(task1)
- ignore_upstream_failure() PipelineTask [source]¶
If called, the pipeline task will run when any specified upstream tasks complete, even if unsuccessful.
This method effectively turns the caller task into an exit task if the caller task has upstream dependencies.
If the task has no upstream tasks, either via data exchange or an explicit dependency via .after(), this method has no effect.
- Returns¶
Self return to allow chained setting calls.
Example
@dsl.pipeline() def my_pipeline(text: str = 'message'): task = fail_op(message=text) clean_up_task = print_op( message=task.output).ignore_upstream_failure()