kfp.dsl
The kfp.dsl module contains domain-specific language objects used to compose pipelines.
- kfp.dsl.Input
Type generic used to represent an input artifact of type
T
, whereT
is an artifact class.Use
Input[Artifact]
orOutput[Artifact]
to indicate whether the enclosed artifact is a component input or output.- Parameters
T – The type of the input artifact.
Example
@dsl.component def artifact_producer(model: Output[Artifact]): with open(model.path, 'w') as f: f.write('my model') @dsl.component def artifact_consumer(model: Input[Artifact]): print(model) @dsl.pipeline(name='my-pipeline') def my_pipeline(): producer_task = artifact_producer() artifact_consumer(model=producer_task.output)
alias of
T
[T
]
- kfp.dsl.Output
A type generic used to represent an output artifact of type
T
, whereT
is an artifact class. The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).Use
Input[Artifact]
orOutput[Artifact]
to indicate whether the enclosed artifact is a component input or output.- Parameters
T – The type of the output artifact.
Example
@dsl.component def artifact_producer(model: Output[Artifact]): with open(model.path, 'w') as f: f.write('my model') @dsl.component def artifact_consumer(model: Input[Artifact]): print(model) @dsl.pipeline(name='my-pipeline') def my_pipeline(): producer_task = artifact_producer() artifact_consumer(model=producer_task.output)
alias of
T
[T
]
- class kfp.dsl.Artifact(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
object
Represents a generic machine learning artifact.
This class and all artifact classes store the name, uri, and metadata for a machine learning artifact. Use this artifact type when an artifact does not fit into another more specific artifact type (e.g.,
Model
,Dataset
).- Parameters
name – Name of the artifact.
uri – The artifact’s location on disk or cloud storage.
metadata – Arbitrary key-value pairs about the artifact.
Example
from kfp import dsl from kfp.dsl import Output, Artifact, Input @dsl.component def create_artifact( data: str, output_artifact: Output[Artifact], ): with open(output_artifact.path, 'w') as f: f.write(data) @dsl.component def use_artifact(input_artifact: Input[Artifact]): with open(input_artifact.path) as input_file: artifact_contents = input_file.read() print(artifact_contents) @dsl.pipeline(name='my-pipeline', pipeline_root='gs://my/storage') def my_pipeline(): create_task = create_artifact(data='my data') use_artifact(input_artifact=create_task.outputs['output_artifact'])
Note: Other artifacts are used similarly to the usage of
Artifact
in the example above (withinInput[]
andOutput[]
).- TYPE_NAME = 'system.Artifact'
- VERSION = '0.0.1'
- property path: str
- class kfp.dsl.ClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact for storing classification metrics.
- Parameters
name – Name of the metrics artifact.
uri – The metrics artifact’s location on disk or cloud storage.
metadata – The key-value scalar metrics.
- TYPE_NAME = 'system.ClassificationMetrics'
- log_confusion_matrix(categories: List[str], matrix: List[List[int]]) None [source]
Logs a confusion matrix to metadata.
- Parameters
categories – List of the category names.
matrix – Complete confusion matrix.
- Raises
ValueError – If the length of
categories
does not match number of rows or columns ofmatrix
.
- log_confusion_matrix_cell(row_category: str, col_category: str, value: int) None [source]
Logs a cell in the confusion matrix to metadata.
- Parameters
row_category – String representing the name of the row category.
col_category – String representing the name of the column category.
value – Value of the cell.
- Raises
ValueError – If
row_category
orcol_category
is not in the list of categories set inset_categories
.
- log_confusion_matrix_row(row_category: str, row: List[float]) None [source]
Logs a confusion matrix row to metadata.
- Parameters
row_category – Category to which the row belongs.
row – List of integers specifying the values for the row.
- Raises
ValueError – If
row_category
is not in the list of categories set inset_categories
call.
- log_roc_curve(fpr: List[float], tpr: List[float], threshold: List[float]) None [source]
Logs an ROC curve to metadata.
- Parameters
fpr – List of false positive rate values.
tpr – List of true positive rate values.
threshold – List of threshold values.
- Raises
ValueError – If the lists
fpr
,tpr
andthreshold
are not the same length.
- log_roc_data_point(fpr: float, tpr: float, threshold: float) None [source]
Logs a single data point in the ROC curve to metadata.
- Parameters
fpr – False positive rate value of the data point.
tpr – True positive rate value of the data point.
threshold – Threshold value for the data point.
- set_confusion_matrix_categories(categories: List[str]) None [source]
Stores confusion matrix categories to metadata.
- Parameters
categories – List of strings specifying the categories.
- class kfp.dsl.Condition(condition: ConditionOperator, name: Optional[str] = None)[source]
Bases:
TasksGroup
A class for creating conditional control flow within a pipeline definition.
- Parameters
condition – The condition expression. Can be constructed using constants or outputs from upstream tasks.
name – The name of the condition group.
Example
with Condition(param1=='pizza', '[param1 is pizza]'): task1 = MyComponent1(...) task2 = MyComponent2(...)
- class kfp.dsl.Dataset(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact representing a machine learning dataset.
- Parameters
name – Name of the dataset.
uri – The dataset’s location on disk or cloud storage.
metadata – Arbitrary key-value pairs about the dataset.
- TYPE_NAME = 'system.Dataset'
- class kfp.dsl.ExitHandler(exit_task: PipelineTask, name: Optional[str] = None)[source]
Bases:
TasksGroup
A class for setting an exit handler task that is invoked upon exiting a group of other tasks.
- Parameters
exit_task – The task that is invoked after exiting a group of other tasks.
name – The name of the exit handler group.
Example
exit_task = ExitComponent(...) with ExitHandler(exit_task): task1 = MyComponent1(...) task2 = MyComponent2(...)
- class kfp.dsl.HTML(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact representing an HTML file.
- Parameters
name – Name of the HTML file.
uri – The HTML file’s location on disk or cloud storage.
metadata – Arbitrary key-value pairs about the HTML file.
- TYPE_NAME = 'system.HTML'
- class kfp.dsl.InputPath(type=None)[source]
Bases:
object
Type annotation used in component definitions for indicating a parameter is a path to an input.
Example
@dsl.component def create_dataset(dataset_path: OutputPath('Dataset'),): import json dataset = {'my_dataset': [[1, 2, 3], [4, 5, 6]]} with open(dataset_path, 'w') as f: json.dump(dataset, f) @dsl.component def consume_dataset(dataset: InputPath('Dataset')): print(dataset) @dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket') def my_pipeline(): create_dataset_op = create_dataset() consume_dataset(dataset=create_dataset_op.outputs['dataset_path'])
- class kfp.dsl.Markdown(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact representing a markdown file.
- Parameters
name – Name of the markdown file.
uri – The markdown file’s location on disk or cloud storage.
metadata – Arbitrary key-value pairs about the markdown file.
- TYPE_NAME = 'system.Markdown'
- class kfp.dsl.Metrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact for storing key-value scalar metrics.
- Parameters
name – Name of the metrics artifact.
uri – The metrics artifact’s location on disk or cloud storage.
metadata – Key-value scalar metrics.
- TYPE_NAME = 'system.Metrics'
- log_metric(metric: str, value: float) None [source]
Sets a custom scalar metric in the artifact’s metadata.
- Parameters
metric – The metric key.
value – The metric value.
- class kfp.dsl.Model(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact representing a machine learning model.
- Parameters
name – Name of the model.
uri – The model’s location on disk or cloud storage.
metadata – Arbitrary key-value pairs about the model.
- TYPE_NAME = 'system.Model'
- property framework: str
- class kfp.dsl.OutputPath(type=None)[source]
Bases:
object
Type annotation used in component definitions for indicating a parameter is a path to an output. The path parameter typed with this annotation can be treated as a locally accessible filepath within the component body.
The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).
- Parameters
type – The type of the value written to the output path.
Example
@dsl.component def create_parameter( message: str, output_parameter_path: OutputPath(str), ): with open(output_parameter_path, 'w') as f: f.write(message) @dsl.component def consume_parameter(message: str): print(message) @dsl.pipeline(name='my-pipeline', pipeline_root='gs://my-bucket') def my_pipeline(message: str = 'default message'): create_param_op = create_parameter(message=message) consume_parameter(message=create_param_op.outputs['output_parameter_path'])
- kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}'
A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): print_op( msg='Job ID:', value=dsl.PIPELINE_JOB_ID_PLACEHOLDER, )
- kfp.dsl.PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}'
A placeholder used to obtain a pipeline job name within a task at pipeline runtime.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): print_op( msg='Job name:', value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER, )
- kfp.dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}'
A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): print_op( msg='Job resource name:', value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER, )
- kfp.dsl.PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}'
A placeholder used to obtain a task ID within a task at pipeline runtime.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): print_op( msg='Task ID:', value=dsl.PIPELINE_TASK_ID_PLACEHOLDER, )
- kfp.dsl.PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}'
A placeholder used to obtain a task name within a task at pipeline runtime.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): print_op( msg='Task name:', value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER, )
- class kfp.dsl.ParallelFor(items: Union[List[Union[int, float, str, Dict[str, Any]]], PipelineChannel], name: Optional[str] = None)[source]
Bases:
TasksGroup
A class for creating parallelized for loop control flow over a static set of items within a pipeline definition.
- Parameters
items – The items to loop over. It can be either a constant Python list or a list output from an upstream task.
name – The name of the for loop group.
Example
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item: task1 = MyComponent(..., item.a) task2 = MyComponent(..., item.b)
In the example,
task1
would be executed twice, once with caseargs=['echo 1']
and once with caseargs=['echo 2']
.
- class kfp.dsl.PipelineTask(component_spec: ComponentSpec, args: Mapping[str, Any])[source]
Bases:
object
Represents a pipeline task (instantiated component).
Note:
PipelineTask
should not be constructed by pipeline authors directly, but instead obtained via an instantiated component (see example).Replaces
ContainerOp
fromkfp
v1. Holds operations available on a task object, such as.after()
,.set_memory_limit()
,.enable_caching()
, etc.- Parameters
component_spec – The component definition.
args – The dictionary of arguments on which the component was called to instantiate this task.
Example
@dsl.component def identity(message: str) -> str: return message @dsl.pipeline(name='my_pipeline') def my_pipeline(): # task is an instance of PipelineTask task = identity(message='my string')
- add_node_selector_constraint(accelerator: str) PipelineTask [source]
Sets accelerator type to use when executing this task.
- Parameters
value – The name of the accelerator. Available values include
'NVIDIA_TESLA_K80'
and'TPU_V3'
.- Returns
Self return to allow chained setting calls.
- after(*tasks) PipelineTask [source]
Specifies an explicit dependency on other tasks by requiring this task be executed after other tasks finish completion.
- Parameters
*tasks – Tasks after which this task should be executed.
- Returns
Self return to allow chained setting calls.
Example
@dsl.pipeline(name='my-pipeline') def my_pipeline(): task1 = my_component(text='1st task') task2 = my_component(text='2nd task').after(task1)
- property dependent_tasks: List[str]
A list of the dependent task names.
- property inputs: List[Union[str, int, float, bool, dict, list, PipelineChannel]]
The list of actual inputs passed to the task.
- property name: str
The name of the task.
Unique within its parent group.
- property output: PipelineChannel
The single output of the task.
Used when a task has exactly one output parameter.
- property outputs: Mapping[str, PipelineChannel]
The dictionary of outputs of the task.
Used when a task has more the one output or uses an
OutputPath
orOutput[Artifact]
type annotation.
- set_caching_options(enable_caching: bool) PipelineTask [source]
Sets caching options for the task.
- Parameters
enable_caching – Whether to enable caching.
- Returns
Self return to allow chained setting calls.
- set_cpu_limit(cpu: str) PipelineTask [source]
Sets CPU limit (maximum) for the task.
- Parameters
cpu – Maximum CPU requests allowed. This string should be a number or a number followed by an “m” to indicate millicores (1/1000). For more information, see Specify a CPU Request and a CPU Limit.
- Returns
Self return to allow chained setting calls.
- set_display_name(name: str) PipelineTask [source]
Sets display name for the task.
- Parameters
name – Display name.
- Returns
Self return to allow chained setting calls.
- set_env_variable(name: str, value: str) PipelineTask [source]
Sets environment variable for the task.
- Parameters
name – Environment variable name.
value – Environment variable value.
- Returns
Self return to allow chained setting calls.
- set_gpu_limit(gpu: str) PipelineTask [source]
Sets GPU limit (maximum) for the task.
- Parameters
gpu – The maximum GPU reuqests allowed. This string should be a positive integer number of GPUs.
- Returns
Self return to allow chained setting calls.
- set_memory_limit(memory: str) PipelineTask [source]
Sets memory limit (maximum) for the task.
- Parameters
memory – The maximum memory requests allowed. This string should be a number or a number followed by one of “E”, “Ei”, “P”, “Pi”, “T”, “Ti”, “G”, “Gi”, “M”, “Mi”, “K”, or “Ki”.
- Returns
Self return to allow chained setting calls.
- set_retry(num_retries: int, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None) PipelineTask [source]
Sets task retry parameters.
- Parameters
num_retries – Number of times to retry on failure.
backoff_duration – Number of seconds to wait before triggering a retry. Defaults to
'0s'
(immediate retry).backoff_factor – Exponential backoff factor applied to
backoff_duration
. For example, ifbackoff_duration="60"
(60 seconds) andbackoff_factor=2
, the first retry will happen after 60 seconds, then again after 120, 240, and so on. Defaults to2.0
.backoff_max_duration – Maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to
'3600s'
.
- Returns
Self return to allow chained setting calls.
- class kfp.dsl.PipelineTaskFinalStatus(state: str, pipeline_job_resource_name: str, pipeline_task_name: str, error_code: Optional[int], error_message: Optional[str])[source]
Bases:
object
A final status of a pipeline task. Annotate a component parameter with this class to obtain a handle to a task’s status (see example).
This is the Python representation of the proto message PipelineTaskFinalStatus.
Examples
@dsl.component def task_status(user_input: str, status: PipelineTaskFinalStatus): print('Pipeline status: ', status.state) print('Job resource name: ', status.pipeline_job_resource_name) print('Pipeline task name: ', status.pipeline_task_name) print('Error code: ', status.error_code) print('Error message: ', status.error_message) @dsl.pipeline(name='my_pipeline') def my_pipeline(): task = task_status(user_input='my_input')
- error_code: Optional[int]
The google.rpc.Code in case of error. If state is
'SUCCEEDED'
, this isNone
.
- error_message: Optional[str]
In case of error, the detailed error message. If state is
'SUCCEEDED'
, this isNone
.
- pipeline_job_resource_name: str
Pipeline job resource name, in the format of
projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}
.
- pipeline_task_name: str
Name of the task that produced this status.
- state: str
Final state of the task. The value could be one of
'SUCCEEDED'
,'FAILED'
or'CANCELLED'
.
- class kfp.dsl.SlicedClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
Artifact
An artifact for storing sliced classification metrics.
Similar to
ClassificationMetrics
, tasks using this class are expected to use log methods of the class to log metrics with the difference being each log method takes a slice to associate theClassificationMetrics
.- Parameters
name – Name of the metrics artifact.
uri – The metrics artifact’s location on disk or cloud storage.
metadata – Arbitrary key-value pairs about the metrics artifact.
- TYPE_NAME = 'system.SlicedClassificationMetrics'
- load_confusion_matrix(slice: str, categories: List[str], matrix: List[List[int]]) None [source]
Bulk loads the whole confusion matrix for a slice.
- Parameters
slice – String representing slice label.
categories – List of the category names.
matrix – Complete confusion matrix.
- load_roc_readings(slice: str, readings: List[List[float]]) None [source]
Bulk loads ROC curve readings for a slice.
- Parameters
slice – String representing slice label.
readings – A 2-dimensional list providing ROC curve data points. The expected order of the data points is: threshold, true positive rate, false positive rate.
- log_confusion_matrix_cell(slice: str, row_category: str, col_category: str, value: int) None [source]
Logs a confusion matrix cell for a slice to metadata.
Cell is updated on the internal
metrics_utils.ConfusionMatrix
instance of the slice.- Parameters
slice – String representing slice label.
row_category – String representing the name of the row category.
col_category – String representing the name of the column category.
value – Value of the cell.
- log_confusion_matrix_row(slice: str, row_category: str, row: List[int]) None [source]
Logs a confusion matrix row for a slice to metadata.
Row is updated on the internal
metrics_utils.ConfusionMatrix
instance of the slice.- Parameters
slice – String representing slice label.
row_category – Category to which the row belongs.
row – List of integers specifying the values for the row.
- log_roc_reading(slice: str, threshold: float, tpr: float, fpr: float) None [source]
Logs a single data point in the ROC curve of a slice to metadata.
- Parameters
slice – String representing slice label.
threshold – Thresold value for the data point.
tpr – True positive rate value of the data point.
fpr – False positive rate value of the data point.
- set_confusion_matrix_categories(slice: str, categories: List[str]) None [source]
Logs confusion matrix categories for a slice to metadata.
Categories are stored in the internal
metrics_utils.ConfusionMatrix
instance of the slice.- Parameters
slice – String representing slice label.
categories – List of strings specifying the categories.
- kfp.dsl.component(func: Optional[Callable] = None, *, base_image: Optional[str] = None, target_image: Optional[str] = None, packages_to_install: Optional[List[str]] = None, pip_index_urls: Optional[List[str]] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None)[source]
Decorator for Python-function based components.
A KFP component can either be a lightweight component or a containerized component.
If
target_image
is not specified, this function creates a lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. In lightweight components,packages_to_install
will be used to install dependencies at runtime. The parametersinstall_kfp_package
andkfp_package_path
can be used to control how and from where KFP should be installed when the lightweight component is executed.If
target_image
is specified, this function creates a component definition based around thetarget_image
. The assumption is that the function infunc
will be packaged by KFP into thistarget_image
. You can use the KFP CLI’sbuild
command to package the function intotarget_image
.- Parameters
func – Python function from which to create a component. The function should have type annotations for all its arguments, indicating how each argument is intended to be used (e.g. as an input/output artifact, a plain parameter, or a path to a file).
base_image – Image to use when executing the Python function. It should contain a default Python interpreter that is compatible with KFP.
target_image – Image to when creating containerized components.
packages_to_install – List of packages to install before executing the Python function. These will always be installed at component runtime.
pip_index_urls – Python Package Index base URLs from which to install
packages_to_install
. Defaults to installing from only PyPI ('https://pypi.org/simple'
). For more information, see pip install docs.output_component_file –
If specified, this function will write a shareable/loadable version of the component spec into this file.
Warning: This compilation approach is deprecated.
install_kfp_package – Specifies if the KFP SDK should add the
kfp
Python package topackages_to_install
. Lightweight Python functions always require an installation of KFP inbase_image
to work. If you specify abase_image
that already contains KFP, you can set this toFalse
. This flag is ignored whentarget_image
is specified, which implies a choice to build a containerized component. Containerized components will always install KFP as part of the build process.kfp_package_path – Specifies the location from which to install KFP. By default, this will try to install from PyPI using the same version as that used when this component was created. Component authors can choose to override this to point to a GitHub pull request or other pip-compatible package server.
- Returns
A component task factory that can be used in pipeline definitions.
Example
from kfp import dsl @dsl.component def my_function_one(input: str, output: Output[Model]): ... @dsl.component( base_image='python:3.9', output_component_file='my_function.yaml' ) def my_function_two(input: Input[Mode])): ... @dsl.pipeline(name='my-pipeline', pipeline_root='...') def pipeline(): my_function_one_task = my_function_one(input=...) my_function_two_task = my_function_two(input=my_function_one_task.outputs)
- kfp.dsl.importer(artifact_uri: Union[PipelineParameterChannel, str], artifact_class: Type[Artifact], reimport: bool = False, metadata: Optional[Mapping[str, Any]] = None) PipelineTask [source]
Imports an existing artifact for use in a downstream component.
- Parameters
artifact_uri – The URI of the artifact to import.
artifact_class – The artifact class being imported.
reimport – Whether to reimport the artifact.
metadata – Properties of the artifact.
- Returns
A task with the artifact accessible via its
.output
attribute.
Examples:
@dsl.pipeline(name='pipeline-with-importer') def pipeline_with_importer(): importer1 = importer( artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt', artifact_class=Dataset, reimport=False) train(dataset=importer1.output)
- kfp.dsl.pipeline(name: Optional[str] = None, description: Optional[str] = None, pipeline_root: Optional[str] = None) Callable [source]
Decorator used to construct a pipeline.
- Example
@pipeline( name='my-pipeline', description='My ML Pipeline.' pipeline_root='gs://my-bucket/my-output-path' ) def my_pipeline(a: str, b: int): ...
- Parameters
name – The pipeline name. Defaults to a sanitized version of the decorated function name.
description – A human-readable description of the pipeline.
pipeline_root – The root directory from which to read input and output parameters and artifacts.