kfp.Client class

class kfp.Client(host: Optional[str] = None, client_id: Optional[str] = None, namespace: str = 'kubeflow', other_client_id: Optional[str] = None, other_client_secret: Optional[str] = None, existing_token: Optional[str] = None, cookies: Optional[str] = None, proxy: Optional[str] = None, ssl_ca_cert: Optional[str] = None, kube_context: Optional[str] = None, credentials: Optional[str] = None, ui_host: Optional[str] = None, verify_ssl: Optional[bool] = None)[source]

Bases: object

The API Client for KubeFlow Pipelines.

Parameters:
  • host – The host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster service DNS name will be used, which only works if the current environment is a pod in the same cluster (such as a Jupyter instance spawned by Kubeflow’s JupyterHub). Set the host based on https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/
  • client_id – The client ID used by Identity-Aware Proxy.
  • namespace – The namespace where the kubeflow pipeline system is run.
  • other_client_id – The client ID used to obtain the auth codes and refresh tokens. References: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app.
  • other_client_secret – The client secret used to obtain the auth codes and refresh tokens.
  • existing_token – Pass in token directly, it’s used for cases better get token outside of SDK, e.x. GCP Cloud Functions or caller already has a token.
  • cookies – CookieJar object containing cookies that will be passed to the pipelines API.
  • proxy – HTTP or HTTPS proxy server.
  • ssl_ca_cert – Cert for proxy.
  • kube_context – String name of context within kubeconfig to use, defaults to the current-context set within kubeconfig.
  • credentials – A TokenCredentialsBase object which provides the logic to populate the requests with credentials to authenticate against the API server.
  • ui_host – Base url to use to open the Kubeflow Pipelines UI. This is used when running the client from a notebook to generate and print links.
  • verify_ssl – Whether to verify the servers TLS certificate or not.
IN_CLUSTER_DNS_NAME = 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH = 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'
LOCAL_KFP_CONTEXT = '/home/docs/.config/kfp/context.json'
NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
archive_experiment(experiment_id: str) → dict[source]

Archives an experiment.

Parameters:experiment_id – id of the experiment.
Returns:Empty dictionary.
Return type:dict
archive_run(run_id: str) → dict[source]

Archives a run.

Parameters:run_id – id of the run.
Returns:Empty dictionary.
Return type:dict
create_experiment(name: str, description: str = None, namespace: str = None) → kfp_server_api.models.api_experiment.ApiExperiment[source]

Create a new experiment.

Parameters:
  • name – The name of the experiment.
  • description – Description of the experiment.
  • namespace – The Kubernetes namespace where the experiment should be created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.
Returns:

ApiExperiment object.

Return type:

kfp_server_api.ApiExperiment

create_recurring_run(experiment_id: str, job_name: str, description: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, interval_second: Optional[int] = None, cron_expression: Optional[str] = None, max_concurrency: Optional[int] = 1, no_catchup: Optional[bool] = None, params: Optional[dict] = None, pipeline_package_path: Optional[str] = None, pipeline_id: Optional[str] = None, version_id: Optional[str] = None, enabled: bool = True, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None) → kfp_server_api.models.api_job.ApiJob[source]

Creates a recurring run.

Parameters:
  • experiment_id – The string id of an experiment.
  • job_name – Name of the job.
  • description (Optional) – Job description.
  • start_time – The RFC3339 time string of the time when to start the job.
  • end_time – The RFC3339 time string of the time when to end the job.
  • interval_second – Integer indicating the seconds between two recurring runs in for a periodic schedule.
  • cron_expression – A cron expression representing a set of times, using 6 space-separated fields, e.g. “0 0 9 ? * 2-6”. See: https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format
  • max_concurrency – Integer indicating how many jobs can be run in parallel.
  • no_catchup – Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If no_catchup=False, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill. (default: {False})
  • pipeline_package_path – Local path of the pipeline package (the filename should end with one of the following .tar.gz, .tgz, .zip, .json).
  • params – A dictionary with key as param name and value as param value.
  • pipeline_id – The id of a pipeline.
  • version_id – The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precedence. If only pipeline_id is specified, the default version of this pipeline is used to create the run.
  • enabled – A bool indicating whether the recurring run is enabled or disabled.
  • pipeline_root – The root path of the pipeline outputs.
  • enable_caching (Optional) – Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).
  • service_account (Optional) – Specifies which Kubernetes service account this recurring run uses.
Returns:

ApiJob object.

Return type:

kfp_server_api.ApiJob

Raises:

ValueError – If required parameters are not supplied.

create_run_from_pipeline_func(pipeline_func: Callable, arguments: Optional[Mapping[str, Any]] = None, run_name: Optional[str] = None, experiment_name: Optional[str] = None, namespace: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None) → kfp.client.client.RunPipelineResult[source]

Runs pipeline on KFP-enabled Kubernetes cluster.

This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.

Parameters:
  • pipeline_func – A function that describes a pipeline by calling components and composing them into execution graph.
  • arguments – Arguments to the pipeline function provided as a dict.
  • run_name (Optional) – Name of the run to be shown in the UI.
  • experiment_name (Optional) – Name of the experiment to add the run to.
  • namespace – Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized
  • pipeline_root – The root path of the pipeline outputs.
  • enable_caching (Optional) – Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).
  • service_account (Optional) – Specifies which Kubernetes service account this run uses.
Returns:

RunPipelineResult object containing information about the pipeline run.

Return type:

RunPipelineResult

create_run_from_pipeline_package(pipeline_file: str, arguments: Optional[Mapping[str, str]] = None, run_name: Optional[str] = None, experiment_name: Optional[str] = None, namespace: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None) → kfp.client.client.RunPipelineResult[source]

Runs pipeline on KFP-enabled Kubernetes cluster.

This command takes a local pipeline package, creates or gets an experiment and submits the pipeline for execution.

Parameters:
  • pipeline_file – A compiled pipeline package file.
  • arguments – Arguments to the pipeline function provided as a dict.
  • run_name (Optional) – Name of the run to be shown in the UI.
  • experiment_name (Optional) – Name of the experiment to add the run to.
  • namespace (Optional) – Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized
  • pipeline_root (Optional) – The root path of the pipeline outputs.
  • enable_caching (Optional) – Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).
  • service_account (Optional) – Specifies which Kubernetes service account this run uses.
Returns:

RunPipelineResult object containing information about the pipeline run.

Return type:

RunPipelineResult

delete_experiment(experiment_id: str) → dict[source]

Delete experiment.

Parameters:experiment_id – id of the experiment.
Returns:Empty dictionary.
Return type:dict
Raises:kfp_server_api.ApiException – If experiment is not found.
delete_job(job_id: str) → dict[source]

Deletes a job.

Parameters:job_id – id of the job.
Returns:Empty dictionary.
Return type:dict
Raises:kfp_server_api.ApiException – If the job is not found.
delete_pipeline(pipeline_id: str) → dict[source]

Deletes a pipeline.

Parameters:pipeline_id – id of the pipeline.
Returns:Empty dictionary.
Return type:dict
Raises:kfp_server_api.ApiException – If pipeline is not found.
delete_pipeline_version(version_id: str) → dict[source]

Deletes a pipeline version.

Parameters:version_id – id of the pipeline version.
Returns:Empty dictionary.
Return type:dict
Raises:kfp_server_api.ApiException – If pipeline is not found.
delete_run(run_id: str) → dict[source]

Deletes a run.

Parameters:run_id – id of the run.
Returns:Empty dictionary.
Return type:dict
disable_job(job_id: str) → dict[source]

Disables a job.

Parameters:job_id – id of the job.
Returns:Empty dictionary.
Return type:dict
Raises:kfp_server_api.ApiException – If the job is not found.
enable_job(job_id: str) → dict[source]

Enables a job.

Parameters:job_id – id of the job.
Returns:Empty dictionary.
Return type:dict
Raises:kfp_server_api.ApiException – If the job is not found.
get_experiment(experiment_id=None, experiment_name=None, namespace=None) → kfp_server_api.models.api_experiment.ApiExperiment[source]

Gets details of an experiment.

Either experiment_id or experiment_name is required.

Parameters:
  • experiment_id (Optional) – Id of the experiment.
  • experiment_name (Optional) – Name of the experiment.
  • namespace – Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input the namespace where the user is authorized.
Returns:

ApiExperiment object.

Return type:

kfp_server_api.ApiExperiment

Raises:

kfp_server_api.ApiException – If experiment is not found or None of the arguments is provided

get_kfp_healthz() → kfp_server_api.models.api_get_healthz_response.ApiGetHealthzResponse[source]

Gets healthz info of KFP deployment.

Returns:json formatted response from the healtz endpoint.
get_pipeline(pipeline_id: str) → kfp_server_api.models.api_pipeline.ApiPipeline[source]

Gets pipeline details.

Parameters:pipeline_id – id of the pipeline.
Returns:ApiPipeline object.
Return type:kfp_server_api.ApiPipeline
Raises:kfp_server_api.ApiException – If pipeline is not found.
get_pipeline_id(name) → Optional[str][source]

Find the id of a pipeline by name.

Parameters:name – Pipeline name.
Returns:The pipeline id if a pipeline with the name exists.
get_pipeline_version(version_id: str) → kfp_server_api.models.api_pipeline_version.ApiPipelineVersion[source]

Gets a pipeline version.

Parameters:version_id – id of the pipeline version.
Returns:ApiPipelineVersion object.
Return type:kfp_server_api.ApiPipelineVersion
Raises:kfp_server_api.ApiException – If pipeline version is not found.
get_recurring_run(job_id: str) → kfp_server_api.models.api_job.ApiJob[source]

Gets recurring_run details.

Parameters:job_id – id of the recurring_run.
Returns:ApiJob object.
Return type:kfp_server_api.ApiJob
Raises:kfp_server_api.ApiException – If recurring_run is not found.
get_run(run_id: str) → kfp_server_api.models.api_run.ApiRun[source]

Gets run details.

Parameters:run_id – id of the run.
Returns:ApiRun object.
Return type:kfp_server_api.ApiRun
Raises:kfp_server_api.ApiException – If run is not found.
get_user_namespace() → str[source]

Get user namespace in context config.

Returns:kubernetes namespace from the local context file or empty if it wasn’t set.
list_experiments(page_token: str = '', page_size: int = 10, sort_by: str = '', namespace: Optional[str] = None, filter: Optional[str] = None) → kfp_server_api.models.api_list_experiments_response.ApiListExperimentsResponse[source]

Lists experiments.

Parameters:
  • page_token – Token for starting of the page.
  • page_size – Size of the page.
  • sort_by – Can be ‘[field_name]’, ‘[field_name] desc’. For example, ‘name desc’.
  • namespace – Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.
  • filter

    A url-encoded, JSON-serialized Filter protocol buffer (see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto)).

    An example filter string would be:

    # For the list of filter operations please see: # https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/client/client.py#L36 json.dumps({
    ”predicates”: [{
    “op”: _FILTER_OPERATIONS[“EQUALS”], “key”: “name”, “stringValue”: “my-name”,

    }]

    })

Returns:

ApiListExperimentsResponse object.

Return type:

kfp_server_api.ApiListExperimentsResponse

list_pipeline_versions(pipeline_id: str, page_token: str = '', page_size: int = 10, sort_by: str = '', filter: Optional[str] = None) → kfp_server_api.models.api_list_pipeline_versions_response.ApiListPipelineVersionsResponse[source]

Lists pipeline versions.

Parameters:
Returns:

ApiListPipelineVersionsResponse object.

Return type:

kfp_server_api.ApiListPipelineVersionsResponse

Raises:

kfp_server_api.ApiException – If pipeline is not found.

list_pipelines(page_token: str = '', page_size: int = 10, sort_by: str = '', filter: Optional[str] = None) → kfp_server_api.models.api_list_pipelines_response.ApiListPipelinesResponse[source]

Lists pipelines.

Parameters:
Returns:

ApiListPipelinesResponse object.

Return type:

kfp_server_api.ApiListPipelinesResponse

list_recurring_runs(page_token: str = '', page_size: int = 10, sort_by: str = '', experiment_id: Optional[str] = None, filter: Optional[str] = None) → kfp_server_api.models.api_list_jobs_response.ApiListJobsResponse[source]

Lists recurring runs.

Parameters:
Returns:

ApiListJobsResponse object.

Return type:

kfp_server_api.ApiListJobsResponse

list_runs(page_token: str = '', page_size: int = 10, sort_by: str = '', experiment_id: Optional[str] = None, namespace: Optional[str] = None, filter: Optional[str] = None) → kfp_server_api.models.api_list_runs_response.ApiListRunsResponse[source]

List runs, optionally can be filtered by experiment or namespace.

Parameters:
  • page_token – Token for starting of the page.
  • page_size – Size of the page.
  • sort_by – One of ‘field_name’, ‘field_name desc’. For example, ‘name desc’.
  • experiment_id – Experiment id to filter upon
  • namespace – Kubernetes namespace to filter upon. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.
  • filter
    A url-encoded, JSON-serialized Filter protocol buffer
    (see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto)).

    An example filter string would be:

    # For the list of filter operations please see: # https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/_client.py#L40 json.dumps({
    ”predicates”: [{
    “op”: _FILTER_OPERATIONS[“EQUALS”], “key”: “name”, “stringValue”: “my-name”,

    }]

    })

    Returns:
    kfp_server_api.ApiListRunsResponse: ApiListRunsResponse object.
run_pipeline(experiment_id: str, job_name: str, pipeline_package_path: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, pipeline_id: Optional[str] = None, version_id: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None) → kfp_server_api.models.api_run.ApiRun[source]

Runs a specified pipeline.

Parameters:
  • experiment_id – The id of an experiment.
  • job_name – Name of the job.
  • pipeline_package_path – Local path of the pipeline package (the filename should end with one of the following .tar.gz, .tgz, .zip, .json).
  • params – A dictionary with key (string) as param name and value (string) as as param value.
  • pipeline_id – The id of a pipeline.
  • version_id – The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run.
  • pipeline_root – The root path of the pipeline outputs.
  • enable_caching (Optional) – . Whether or not to enable caching for the run. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline (overrides the compile time settings).
  • service_account (Optional) – Specifies which Kubernetes service account this run uses.
Returns:

ApiRun object.

Return type:

kfp_server_api.ApiRun

set_user_namespace(namespace: str) → None[source]

Set user namespace into local context setting file.

This function should only be used when Kubeflow Pipelines is in the multi-user mode.

Parameters:namespace – kubernetes namespace the user has access to.
Returns:None
terminate_run(run_id: str) → dict[source]

Terminates a run.

Parameters:run_id – id of the run.
Returns:Empty dictionary.
Return type:dict
unarchive_experiment(experiment_id: str) → dict[source]

Unarchives an experiment.

Parameters:experiment_id – id of the experiment.
Returns:Empty dictionary.
Return type:dict
unarchive_run(run_id: str) → dict[source]

Restores an archived run.

Parameters:run_id – id of the run.
Returns:Empty dictionary.
Return type:dict
upload_pipeline(pipeline_package_path: str = None, pipeline_name: str = None, description: str = None) → kfp_server_api.models.api_pipeline.ApiPipeline[source]

Uploads the pipeline to the Kubeflow Pipelines cluster.

Parameters:
  • pipeline_package_path – Local path to the pipeline package.
  • pipeline_name (Optional) – Name of the pipeline to be shown in the UI.
  • description (Optional) – Description of the pipeline to be shown in the UI.
Returns:

ApiPipeline object.

Return type:

kfp_server_api.ApiPipeline

upload_pipeline_version(pipeline_package_path: str, pipeline_version_name: str, pipeline_id: Optional[str] = None, pipeline_name: Optional[str] = None, description: Optional[str] = None) → kfp_server_api.models.api_pipeline_version.ApiPipelineVersion[source]

Uploads a new version of the pipeline to the KFP cluster.

Parameters:
  • pipeline_package_path – Local path to the pipeline package.
  • pipeline_version_name – Name of the pipeline version to be shown in the UI.
  • pipeline_id (Optional) – Id of the pipeline.
  • pipeline_name (Optional) – Name of the pipeline.
  • description (Optional) – Description of the pipeline version to be shown in the UI.
Returns:

ApiPipelineVersion object.

Return type:

kfp_server_api.ApiPipelineVersion

Raises:
  • ValueError – when none or both of pipeline_id or pipeline_name are specified.
  • kfp_server_api.ApiException – If pipeline id is not found.
wait_for_run_completion(run_id: str, timeout: int) → kfp_server_api.models.api_run.ApiRun[source]

Waits for a run to complete.

Parameters:
  • run_id – Run id, returned from run_pipeline.
  • timeout – Timeout in seconds.
Returns:

ApiRun object.

Return type:

kfp_server_api.ApiRun

Raises:

TimeoutError – if the pipeline run failed to finish before the specified timeout.