kfp.client

The kfp.client module contains the KFP API client.

Classes:

Client([host, client_id, namespace, ...])

The KFP SDK client for the Kubeflow Pipelines backend API.

class kfp.client.Client(host: str | None = None, client_id: str | None = None, namespace: str = 'kubeflow', other_client_id: str | None = None, other_client_secret: str | None = None, existing_token: str | None = None, cookies: str | None = None, proxy: str | None = None, ssl_ca_cert: str | None = None, kube_context: str | None = None, credentials: TokenCredentialsBase | None = None, ui_host: str | None = None, verify_ssl: bool | None = None)[source]

Bases: object

The KFP SDK client for the Kubeflow Pipelines backend API.

Parameters
host: str | None = None

Host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster service DNS name will be used, which only works if the current environment is a pod in the same cluster (such as a Jupyter instance spawned by Kubeflow’s JupyterHub). (More information on connecting.)

client_id: str | None = None

Client ID used by Identity-Aware Proxy.

namespace: str = 'kubeflow'

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

other_client_id: str | None = None

Client ID used to obtain the auth codes and refresh tokens (reference).

other_client_secret: str | None = None

Client secret used to obtain the auth codes and refresh tokens.

existing_token: str | None = None

Authentication token to pass in directly. Used in cases where the token is generated from outside the SDK.

cookies: str | None = None

CookieJar object containing cookies that will be passed to the Pipelines API.

proxy: str | None = None

HTTP or HTTPS proxy server.

ssl_ca_cert: str | None = None

Certification for proxy.

kube_context: str | None = None

kubectl context to use. Must be a context listed in the kubeconfig file. Defaults to the current-context set within kubeconfig.

credentials: TokenCredentialsBase | None = None

TokenCredentialsBase object which provides the logic to populate the requests with credentials to authenticate against the API server.

ui_host: str | None = None

Base URL to use to open the Kubeflow Pipelines UI. This is used when running the client from a notebook to generate and print links.

verify_ssl: bool | None = None

Whether to verify the server’s TLS certificate.

Methods:

set_user_namespace(namespace)

Sets the namespace in the Kuberenetes cluster to use.

get_kfp_healthz([sleep_duration])

Gets healthz info for KFP deployment.

get_user_namespace()

Gets user namespace in context config.

create_experiment(name[, description, namespace])

Creates a new experiment.

get_pipeline_id(name)

Gets the ID of a pipeline by its name.

list_experiments([page_token, page_size, ...])

Lists experiments.

get_experiment([experiment_id, ...])

Gets details of an experiment.

archive_experiment(experiment_id)

Archives an experiment.

unarchive_experiment(experiment_id)

Unarchives an experiment.

delete_experiment(experiment_id)

Delete experiment.

list_pipelines([page_token, page_size, ...])

Lists pipelines.

run_pipeline(experiment_id, job_name[, ...])

Runs a specified pipeline.

archive_run(run_id)

Archives a run.

unarchive_run(run_id)

Restores an archived run.

delete_run(run_id)

Deletes a run.

terminate_run(run_id)

Terminates a run.

create_recurring_run(experiment_id, job_name)

Creates a recurring run.

create_run_from_pipeline_func(pipeline_func)

Runs pipeline on KFP-enabled Kubernetes cluster.

create_run_from_pipeline_package(pipeline_file)

Runs pipeline on KFP-enabled Kubernetes cluster.

delete_job(job_id)

Deletes a job (recurring run).

delete_recurring_run(recurring_run_id)

Deletes a recurring run.

disable_job(job_id)

Disables a job (recurring run).

disable_recurring_run(recurring_run_id)

Disables a recurring run.

enable_job(job_id)

Enables a job (recurring run).

enable_recurring_run(recurring_run_id)

Enables a recurring run.

list_runs([page_token, page_size, sort_by, ...])

List runs.

list_recurring_runs([page_token, page_size, ...])

Lists recurring runs.

get_recurring_run(recurring_run_id[, job_id])

Gets recurring run details.

get_run(run_id)

Gets run details.

wait_for_run_completion(run_id, timeout[, ...])

Waits for a run to complete.

upload_pipeline(pipeline_package_path[, ...])

Uploads a pipeline.

upload_pipeline_version(...[, pipeline_id, ...])

Uploads a new version of the pipeline.

get_pipeline(pipeline_id)

Gets pipeline details.

delete_pipeline(pipeline_id)

Deletes a pipeline.

list_pipeline_versions(pipeline_id[, ...])

Lists pipeline versions.

get_pipeline_version(pipeline_id, ...)

Gets a pipeline version.

delete_pipeline_version(pipeline_id, ...)

Deletes a pipeline version.p.

set_user_namespace(namespace: str) None[source]

Sets the namespace in the Kuberenetes cluster to use.

This function should only be used when Kubeflow Pipelines is in the multi-user mode.

Parameters
namespace: str

Namespace to use within the Kubernetes cluster (namespace containing the Kubeflow Pipelines deployment).

get_kfp_healthz(sleep_duration: int = 5) V2beta1GetHealthzResponse[source]

Gets healthz info for KFP deployment.

Parameters
sleep_duration: int = 5

Time in seconds between retries.

Returns

JSON response from the healthz endpoint.

get_user_namespace() str[source]

Gets user namespace in context config.

Returns

Kubernetes namespace from the local context file or empty if it wasn’t set.

create_experiment(name: str, description: str | None = None, namespace: str | None = None) V2beta1Experiment[source]

Creates a new experiment.

Parameters
name: str

Name of the experiment.

description: str | None = None

Description of the experiment.

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

Returns

V2beta1Experiment object.

get_pipeline_id(name: str) str | None[source]

Gets the ID of a pipeline by its name.

Parameters
name: str

Pipeline name.

Returns

The pipeline ID if a pipeline with the name exists.

list_experiments(page_token: str = '', page_size: int = 10, sort_by: str = '', namespace: str | None = None, filter: str | None = None) V2beta1ListExperimentsResponse[source]

Lists experiments.

Parameters
page_token: str = ''

Page token for obtaining page from paginated response.

page_size: int = 10

Size of the page.

sort_by: str = ''

Sort string of format '[field_name]', '[field_name] desc'. For example, 'display_name desc'.

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

filter: str | None = None

A url-encoded, JSON-serialized Filter protocol buffer (see filter.proto message). Example:

json.dumps({
    "predicates": [{
        "operation": "EQUALS",
        "key": "display_name",
        "stringValue": "my-name",
    }]
})

Returns

V2beta1ListExperimentsResponse object.

get_experiment(experiment_id: str | None = None, experiment_name: str | None = None, namespace: str | None = None) V2beta1Experiment[source]

Gets details of an experiment.

Either experiment_id or experiment_name is required.

Parameters
experiment_id: str | None = None

ID of the experiment.

experiment_name: str | None = None

Name of the experiment.

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

Returns

V2beta1Experiment object.

archive_experiment(experiment_id: str) dict[source]

Archives an experiment.

Parameters
experiment_id: str

ID of the experiment.

Returns

Empty dictionary.

unarchive_experiment(experiment_id: str) dict[source]

Unarchives an experiment.

Parameters
experiment_id: str

ID of the experiment.

Returns

Empty dictionary.

delete_experiment(experiment_id: str) dict[source]

Delete experiment.

Parameters
experiment_id: str

ID of the experiment.

Returns

Empty dictionary.

list_pipelines(page_token: str = '', page_size: int = 10, sort_by: str = '', filter: str | None = None, namespace: str | None = None) V2beta1ListPipelinesResponse[source]

Lists pipelines.

Parameters
page_token: str = ''

Page token for obtaining page from paginated response.

page_size: int = 10

Size of the page.

sort_by: str = ''

Sort string of format '[field_name]', '[field_name] desc'. For example, 'display_name desc'.

filter: str | None = None

A url-encoded, JSON-serialized Filter protocol buffer (see filter.proto message). Example:

json.dumps({
    "predicates": [{
        "operation": "EQUALS",
        "key": "display_name",
        "stringValue": "my-name",
    }]
})

Returns

V2beta1ListPipelinesResponse object.

run_pipeline(experiment_id: str, job_name: str, pipeline_package_path: str | None = None, params: dict[str, Any] | None = None, pipeline_id: str | None = None, version_id: str | None = None, pipeline_root: str | None = None, enable_caching: bool | None = None, service_account: str | None = None) V2beta1Run[source]

Runs a specified pipeline.

Parameters
experiment_id: str

ID of an experiment.

job_name: str

Name of the job.

pipeline_package_path: str | None = None

Local path of the pipeline package (the filename should end with one of the following .tar.gz, .tgz, .zip, .json).

params: dict[str, Any] | None = None

Arguments to the pipeline function provided as a dict.

pipeline_id: str | None = None

ID of the pipeline.

version_id: str | None = None

ID of the pipeline version to run. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run.

pipeline_root: str | None = None

Root path of the pipeline outputs.

enable_caching: bool | None = None

Whether or not to enable caching for the run. If not set, defaults to the compile-time settings, which is True for all tasks by default. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).

service_account: str | None = None

Specifies which Kubernetes service account to use for this run.

Returns

V2beta1Run object.

archive_run(run_id: str) dict[source]

Archives a run.

Parameters
run_id: str

ID of the run.

Returns

Empty dictionary.

unarchive_run(run_id: str) dict[source]

Restores an archived run.

Parameters
run_id: str

ID of the run.

Returns

Empty dictionary.

delete_run(run_id: str) dict[source]

Deletes a run.

Parameters
run_id: str

ID of the run.

Returns

Empty dictionary.

terminate_run(run_id: str) dict[source]

Terminates a run.

Parameters
run_id: str

ID of the run.

Returns

Empty dictionary.

create_recurring_run(experiment_id: str, job_name: str, description: str | None = None, start_time: str | None = None, end_time: str | None = None, interval_second: int | None = None, cron_expression: str | None = None, max_concurrency: int | None = 1, no_catchup: bool | None = None, params: dict | None = None, pipeline_package_path: str | None = None, pipeline_id: str | None = None, version_id: str | None = None, enabled: bool = True, pipeline_root: str | None = None, enable_caching: bool | None = None, service_account: str | None = None) V2beta1RecurringRun[source]

Creates a recurring run.

Parameters
experiment_id: str

ID of the experiment.

job_name: str

Name of the job.

description: str | None = None

Description of the job.

start_time: str | None = None

RFC3339 time string of the time when to start the job.

end_time: str | None = None

RFC3339 time string of the time when to end the job.

interval_second: int | None = None

Integer indicating the seconds between two recurring runs in for a periodic schedule.

cron_expression: str | None = None

Cron expression representing a set of times, using 6 space-separated fields (e.g., '0 0 9 ? * 2-6'). See cron format.

max_concurrency: int | None = 1

Integer indicating how many jobs can be run in parallel.

no_catchup: bool | None = None

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If no_catchup=False, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

pipeline_package_path: str | None = None

Local path of the pipeline package (the filename should end with one of the following .tar.gz, .tgz, .zip, .json).

params: dict | None = None

Arguments to the pipeline function provided as a dict.

pipeline_id: str | None = None

ID of a pipeline.

version_id: str | None = None

ID of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precedence. If only pipeline_id is specified, the default version of this pipeline is used to create the run.

enabled: bool = True

Whether to enable or disable the recurring run.

pipeline_root: str | None = None

Root path of the pipeline outputs.

enable_caching: bool | None = None

Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which is True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).

service_account: str | None = None

Specifies which Kubernetes service account this recurring run uses.

Returns

V2beta1RecurringRun object.

create_run_from_pipeline_func(pipeline_func: BaseComponent, arguments: dict[str, Any] | None = None, run_name: str | None = None, experiment_name: str | None = None, namespace: str | None = None, pipeline_root: str | None = None, enable_caching: bool | None = None, service_account: str | None = None, experiment_id: str | None = None) RunPipelineResult[source]

Runs pipeline on KFP-enabled Kubernetes cluster.

This command compiles the pipeline function, creates or gets an experiment, then submits the pipeline for execution.

Parameters
pipeline_func: BaseComponent

Pipeline function constructed with @kfp.dsl.pipeline decorator.

arguments: dict[str, Any] | None = None

Arguments to the pipeline function provided as a dict.

run_name: str | None = None

Name of the run to be shown in the UI.

experiment_name: str | None = None

Name of the experiment to add the run to. You cannot specify both experiment_name and experiment_id.

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

pipeline_root: str | None = None

Root path of the pipeline outputs.

enable_caching: bool | None = None

Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which is True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).

service_account: str | None = None

Specifies which Kubernetes service account to use for this run.

experiment_id: str | None = None

ID of the experiment to add the run to. You cannot specify both experiment_id and experiment_name.

Returns

RunPipelineResult object containing information about the pipeline run.

create_run_from_pipeline_package(pipeline_file: str, arguments: dict[str, Any] | None = None, run_name: str | None = None, experiment_name: str | None = None, namespace: str | None = None, pipeline_root: str | None = None, enable_caching: bool | None = None, service_account: str | None = None, experiment_id: str | None = None) RunPipelineResult[source]

Runs pipeline on KFP-enabled Kubernetes cluster.

This command takes a local pipeline package, creates or gets an experiment, then submits the pipeline for execution.

Parameters
pipeline_file: str

A compiled pipeline package file.

arguments: dict[str, Any] | None = None

Arguments to the pipeline function provided as a dict.

run_name: str | None = None

Name of the run to be shown in the UI.

experiment_name: str | None = None

Name of the experiment to add the run to. You cannot specify both experiment_name and experiment_id.

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

pipeline_root: str | None = None

Root path of the pipeline outputs.

enable_caching: bool | None = None

Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which is True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).

service_account: str | None = None

Specifies which Kubernetes service account to use for this run.

experiment_id: str | None = None

ID of the experiment to add the run to. You cannot specify both experiment_id and experiment_name.

Returns

RunPipelineResult object containing information about the pipeline run.

delete_job(job_id: str) dict[source]

Deletes a job (recurring run).

Parameters
job_id: str

ID of the job.

Returns

Empty dictionary.

delete_recurring_run(recurring_run_id: str) dict[source]

Deletes a recurring run.

Parameters
recurring_run_id: str

ID of the recurring_run.

Returns

Empty dictionary.

disable_job(job_id: str) dict[source]

Disables a job (recurring run).

Parameters
job_id: str

ID of the job.

Returns

Empty dictionary.

disable_recurring_run(recurring_run_id: str) dict[source]

Disables a recurring run.

Parameters
recurring_run_id: str

ID of the recurring_run.

Returns

Empty dictionary.

enable_job(job_id: str) dict[source]

Enables a job (recurring run).

Parameters
job_id: str

ID of the job.

Returns

Empty dictionary.

enable_recurring_run(recurring_run_id: str) dict[source]

Enables a recurring run.

Parameters
recurring_run_id: str

ID of the recurring_run.

Returns

Empty dictionary.

list_runs(page_token: str = '', page_size: int = 10, sort_by: str = '', experiment_id: str | None = None, namespace: str | None = None, filter: str | None = None) V2beta1ListRunsResponse[source]

List runs.

Parameters
page_token: str = ''

Page token for obtaining page from paginated response.

page_size: int = 10

Size of the page.

sort_by: str = ''

Sort string of format '[field_name]', '[field_name] desc'. For example, 'display_name desc'.

experiment_id: str | None = None

Experiment ID to filter upon

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

filter: str | None = None

A url-encoded, JSON-serialized Filter protocol buffer

(see filter.proto message). For a list of all filter operations 'opertion', see here. Example:

json.dumps({
    "predicates": [{
        "operation": "EQUALS",
        "key": "display_name",
        "stringValue": "my-name",
    }]
})
Returns:

V2beta1ListRunsResponse object.

list_recurring_runs(page_token: str = '', page_size: int = 10, sort_by: str = '', experiment_id: str | None = None, namespace: str | None = None, filter: str | None = None) V2beta1ListRecurringRunsResponse[source]

Lists recurring runs.

Parameters
page_token: str = ''

Page token for obtaining page from paginated response.

page_size: int = 10

Size of the page.

sort_by: str = ''

Sort string of format '[field_name]', '[field_name] desc'. For example, 'display_name desc'.

experiment_id: str | None = None

Experiment ID to filter upon.

namespace: str | None = None

Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as None.

filter: str | None = None

A url-encoded, JSON-serialized Filter protocol buffer (see filter.proto message). Example:

json.dumps({
    "predicates": [{
        "operation": "EQUALS",
        "key": "display_name",
        "stringValue": "my-name",
    }]
})

Returns

V2beta1ListRecurringRunsResponse object.

get_recurring_run(recurring_run_id: str, job_id: str | None = None) V2beta1RecurringRun[source]

Gets recurring run details.

Parameters
recurring_run_id: str

ID of the recurring run.

job_id: str | None = None

Deprecated. Use recurring_run_id instead.

Returns

V2beta1RecurringRun object.

get_run(run_id: str) V2beta1Run[source]

Gets run details.

Parameters
run_id: str

ID of the run.

Returns

V2beta1Run object.

wait_for_run_completion(run_id: str, timeout: int, sleep_duration: int = 5) V2beta1Run[source]

Waits for a run to complete.

Parameters
run_id: str

ID of the run.

timeout: int

Timeout after which the client should stop waiting for run completion (seconds).

sleep_duration: int = 5

Time in seconds between retries.

Returns

V2beta1Run object.

upload_pipeline(pipeline_package_path: str, pipeline_name: str | None = None, description: str | None = None, namespace: str | None = None) V2beta1Pipeline[source]

Uploads a pipeline.

Parameters
pipeline_package_path: str

Local path to the pipeline package.

pipeline_name: str | None = None

Name of the pipeline to be shown in the UI.

description: str | None = None

Description of the pipeline to be shown in the UI.

namespace: str | None = None

Optional. Kubernetes namespace where the pipeline should be uploaded. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.

Returns

V2beta1Pipeline object.

upload_pipeline_version(pipeline_package_path: str, pipeline_version_name: str, pipeline_id: str | None = None, pipeline_name: str | None = None, description: str | None = None) V2beta1PipelineVersion[source]

Uploads a new version of the pipeline.

Parameters
pipeline_package_path: str

Local path to the pipeline package.

pipeline_version_name: str

Name of the pipeline version to be shown in the UI.

pipeline_id: str | None = None

ID of the pipeline.

pipeline_name: str | None = None

Name of the pipeline.

description: str | None = None

Description of the pipeline version to show in the UI.

Returns

V2beta1PipelineVersion object.

get_pipeline(pipeline_id: str) V2beta1Pipeline[source]

Gets pipeline details.

Parameters
pipeline_id: str

ID of the pipeline.

Returns

V2beta1Pipeline object.

delete_pipeline(pipeline_id: str) dict[source]

Deletes a pipeline.

Parameters
pipeline_id: str

ID of the pipeline.

Returns

Empty dictionary.

list_pipeline_versions(pipeline_id: str, page_token: str = '', page_size: int = 10, sort_by: str = '', filter: str | None = None) V2beta1ListPipelineVersionsResponse[source]

Lists pipeline versions.

Parameters
pipeline_id: str

ID of the pipeline for which to list versions.

page_token: str = ''

Page token for obtaining page from paginated response.

page_size: int = 10

Size of the page.

sort_by: str = ''

Sort string of format '[field_name]', '[field_name] desc'. For example, 'display_name desc'.

filter: str | None = None

A url-encoded, JSON-serialized Filter protocol buffer (see filter.proto message). Example:

json.dumps({
    "predicates": [{
        "operation": "EQUALS",
        "key": "display_name",
        "stringValue": "my-name",
    }]
})

Returns

V2beta1ListPipelineVersionsResponse object.

get_pipeline_version(pipeline_id: str, pipeline_version_id: str) V2beta1PipelineVersion[source]

Gets a pipeline version.

Parameters
pipeline_id: str

ID of the pipeline.

pipeline_version_id: str

ID of the pipeline version.

Returns

V2beta1PipelineVersion object.

delete_pipeline_version(pipeline_id: str, pipeline_version_id: str) dict[source]

Deletes a pipeline version.p.

Parameters
pipeline_id: str

ID of the pipeline.

pipeline_version_id: str

ID of the pipeline version.

Returns

Empty dictionary.