kfp.Client class

class kfp.Client(host=None, client_id=None, namespace='kubeflow', other_client_id=None, other_client_secret=None, existing_token=None, cookies=None, proxy=None, ssl_ca_cert=None, kube_context=None, credentials=None, ui_host=None)[source]

Bases: object

API Client for KubeFlow Pipeline.

Parameters:
  • host – The host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster service DNS name will be used, which only works if the current environment is a pod in the same cluster (such as a Jupyter instance spawned by Kubeflow’s JupyterHub). If you have a different connection to cluster, such as a kubectl proxy connection, then set it to something like “127.0.0.1:8080/pipeline. If you connect to an IAP enabled cluster, set it to https://<your-deployment>.endpoints.<your-project>.cloud.goog/pipeline”.
  • client_id – The client ID used by Identity-Aware Proxy.
  • namespace – The namespace where the kubeflow pipeline system is run.
  • other_client_id – The client ID used to obtain the auth codes and refresh tokens. Reference: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app.
  • other_client_secret – The client secret used to obtain the auth codes and refresh tokens.
  • existing_token – Pass in token directly, it’s used for cases better get token outside of SDK, e.x. GCP Cloud Functions or caller already has a token
  • cookies – CookieJar object containing cookies that will be passed to the pipelines API.
  • proxy – HTTP or HTTPS proxy server
  • ssl_ca_cert – Cert for proxy
  • kube_context – String name of context within kubeconfig to use, defaults to the current-context set within kubeconfig.
  • credentials – A TokenCredentialsBase object which provides the logic to populate the requests with credentials to authenticate against the API server.
  • ui_host – Base url to use to open the Kubeflow Pipelines UI. This is used when running the client from a notebook to generate and print links.
IN_CLUSTER_DNS_NAME = 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH = 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'
LOCAL_KFP_CONTEXT = '/home/docs/.config/kfp/context.json'
NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
archive_experiment(experiment_id: str)[source]

Archive experiment.

Parameters:experiment_id – id of the experiment.
Raises:kfp_server_api.ApiException – If experiment is not found.
create_experiment(name: str, description: str = None, namespace: str = None) → kfp_server_api.models.api_experiment.ApiExperiment[source]

Create a new experiment.

Parameters:
  • name – The name of the experiment.
  • description – Description of the experiment.
  • namespace – Kubernetes namespace where the experiment should be created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.
Returns:

An Experiment object. Most important field is id.

create_recurring_run(experiment_id: str, job_name: str, description: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, interval_second: Optional[int] = None, cron_expression: Optional[str] = None, max_concurrency: Optional[int] = 1, no_catchup: Optional[bool] = None, params: Optional[dict] = None, pipeline_package_path: Optional[str] = None, pipeline_id: Optional[str] = None, version_id: Optional[str] = None, enabled: bool = True, enable_caching: Optional[bool] = None, service_account: Optional[str] = None) → kfp_server_api.models.api_job.ApiJob[source]

Create a recurring run.

Parameters:
  • experiment_id – The string id of an experiment.
  • job_name – Name of the job.
  • description – An optional job description.
  • start_time – The RFC3339 time string of the time when to start the job.
  • end_time – The RFC3339 time string of the time when to end the job.
  • interval_second – Integer indicating the seconds between two recurring runs in for a periodic schedule.
  • cron_expression – A cron expression representing a set of times, using 6 space-separated fields, e.g. “0 0 9 ? * 2-6”. See here for details of the cron expression format.
  • max_concurrency – Integer indicating how many jobs can be run in parallel.
  • no_catchup – Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If no_catchup=False, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill. (default: {False})
  • pipeline_package_path – Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
  • params – A dictionary with key (string) as param name and value (string) as param value.
  • pipeline_id – The id of a pipeline.
  • version_id – The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run.
  • enabled – A bool indicating whether the recurring run is enabled or disabled.
  • enable_caching – Optional. Whether or not to enable caching for the run. This setting affects v2 compatible mode and v2 mode only. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline – overrides the compile time settings.
  • service_account – Optional. Specifies which Kubernetes service account this recurring run uses.
Returns:

A Job object. Most important field is id.

Raises:

ValueError – If required parameters are not supplied.

create_run_from_pipeline_func(pipeline_func: Callable, arguments: Mapping[str, str], run_name: Optional[str] = None, experiment_name: Optional[str] = None, pipeline_conf: Optional[kfp.dsl._pipeline.PipelineConf] = None, namespace: Optional[str] = None, mode: kfp.dsl._pipeline.PipelineExecutionMode = <PipelineExecutionMode.V1_LEGACY: 1>, launcher_image: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None)[source]

Runs pipeline on KFP-enabled Kubernetes cluster.

This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.

Parameters:
  • pipeline_func – A function that describes a pipeline by calling components and composing them into execution graph.
  • arguments – Arguments to the pipeline function provided as a dict.
  • run_name – Optional. Name of the run to be shown in the UI.
  • experiment_name – Optional. Name of the experiment to add the run to.
  • pipeline_conf – Optional. Pipeline configuration ops that will be applied to all the ops in the pipeline func.
  • namespace – Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized
  • mode – The PipelineExecutionMode to use when compiling and running pipeline_func.
  • launcher_image – The launcher image to use if the mode is specified as PipelineExecutionMode.V2_COMPATIBLE. Should only be needed for tests or custom deployments right now.
  • pipeline_root – The root path of the pipeline outputs. This argument should be used only for pipeline compiled with dsl.PipelineExecutionMode.V2_COMPATIBLE or dsl.PipelineExecutionMode.V2_ENGINGE mode.
  • enable_caching – Optional. Whether or not to enable caching for the run. This setting affects v2 compatible mode and v2 mode only. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline – overrides the compile time settings.
  • service_account – Optional. Specifies which Kubernetes service account this run uses.
create_run_from_pipeline_package(pipeline_file: str, arguments: Mapping[str, str], run_name: Optional[str] = None, experiment_name: Optional[str] = None, namespace: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None)[source]

Runs pipeline on KFP-enabled Kubernetes cluster.

This command takes a local pipeline package, creates or gets an experiment and submits the pipeline for execution.

Parameters:
  • pipeline_file – A compiled pipeline package file.
  • arguments – Arguments to the pipeline function provided as a dict.
  • run_name – Optional. Name of the run to be shown in the UI.
  • experiment_name – Optional. Name of the experiment to add the run to.
  • namespace – Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized
  • pipeline_root – The root path of the pipeline outputs. This argument should be used only for pipeline compiled with dsl.PipelineExecutionMode.V2_COMPATIBLE or dsl.PipelineExecutionMode.V2_ENGINGE mode.
  • enable_caching – Optional. Whether or not to enable caching for the run. This setting affects v2 compatible mode and v2 mode only. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline – overrides the compile time settings.
  • service_account – Optional. Specifies which Kubernetes service account this run uses.
delete_experiment(experiment_id)[source]

Delete experiment.

Parameters:experiment_id – id of the experiment.
Returns:Object. If the method is called asynchronously, returns the request thread.
Raises:kfp_server_api.ApiException – If experiment is not found.
delete_job(job_id: str)[source]

Deletes a job.

Parameters:job_id – id of the job.
Returns:Object. If the method is called asynchronously, returns the request thread.
Raises:kfp_server_api.ApiException – If the job is not found.
delete_pipeline(pipeline_id)[source]

Delete pipeline.

Parameters:pipeline_id – id of the pipeline.
Returns:Object. If the method is called asynchronously, returns the request thread.
Raises:kfp_server_api.ApiException – If pipeline is not found.
delete_pipeline_version(version_id: str)[source]

Delete pipeline version.

Parameters:version_id – id of the pipeline version.
Returns:Object. If the method is called asynchronously, returns the request thread.
Raises:Exception if pipeline version is not found.
disable_job(job_id: str)[source]

Disables a job.

Parameters:job_id – id of the job.
Returns:Object. If the method is called asynchronously, returns the request thread.
Raises:ApiException – If the job is not found.
get_experiment(experiment_id=None, experiment_name=None, namespace=None) → kfp_server_api.models.api_experiment.ApiExperiment[source]

Get details of an experiment.

Either experiment_id or experiment_name is required

Parameters:
  • experiment_id – Id of the experiment. (Optional)
  • experiment_name – Name of the experiment. (Optional)
  • namespace – Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input the namespace where the user is authorized.
Returns:

A response object including details of a experiment.

Raises:

kfp_server_api.ApiException – If experiment is not found or None of the arguments is provided

get_kfp_healthz() → kfp_server_api.models.api_get_healthz_response.ApiGetHealthzResponse[source]

Gets healthz info of KFP deployment.

Returns:json formatted response from the healtz endpoint.
Return type:response
get_pipeline(pipeline_id: str) → kfp_server_api.models.api_pipeline.ApiPipeline[source]

Get pipeline details.

Parameters:pipeline_id – id of the pipeline.
Returns:A response object including details of a pipeline.
Raises:kfp_server_api.ApiException – If pipeline is not found.
get_pipeline_id(name) → Optional[str][source]

Find the id of a pipeline by name.

Parameters:name – Pipeline name.
Returns:Returns the pipeline id if a pipeline with the name exists.
get_recurring_run(job_id: str) → kfp_server_api.models.api_job.ApiJob[source]

Get recurring_run details.

Parameters:job_id – id of the recurring_run.
Returns:A response object including details of a recurring_run.
Raises:kfp_server_api.ApiException – If recurring_run is not found.
get_run(run_id: str) → kfp_server_api.models.api_run.ApiRun[source]

Get run details.

Parameters:run_id – id of the run.
Returns:A response object including details of a run.
Raises:kfp_server_api.ApiException – If run is not found.
get_user_namespace() → str[source]

Get user namespace in context config.

Returns:kubernetes namespace from the local context file or empty if it wasn’t set.
Return type:namespace
list_experiments(page_token='', page_size=10, sort_by='', namespace=None, filter=None) → kfp_server_api.models.api_list_experiments_response.ApiListExperimentsResponse[source]

List experiments.

Parameters:
  • page_token – Token for starting of the page.
  • page_size – Size of the page.
  • sort_by – Can be ‘[field_name]’, ‘[field_name] desc’. For example, ‘name desc’.
  • namespace – Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.
  • filter – A url-encoded, JSON-serialized Filter protocol buffer (see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto)).
Returns:

A response object including a list of experiments and next page token.

list_pipeline_versions(pipeline_id: str, page_token: str = '', page_size: int = 10, sort_by: str = '') → kfp_server_api.models.api_list_pipeline_versions_response.ApiListPipelineVersionsResponse[source]

Lists pipeline versions.

Parameters:
  • pipeline_id – Id of the pipeline to list versions
  • page_token – Token for starting of the page.
  • page_size – Size of the page.
  • sort_by – One of ‘field_name’, ‘field_name desc’. For example, ‘name desc’.
Returns:

A response object including a list of versions and next page token.

Raises:

kfp_server_api.ApiException – If pipeline is not found.

list_pipelines(page_token='', page_size=10, sort_by='', filter=None) → kfp_server_api.models.api_list_pipelines_response.ApiListPipelinesResponse[source]

List pipelines.

Parameters:
Returns:

A response object including a list of pipelines and next page token.

list_recurring_runs(page_token='', page_size=10, sort_by='', experiment_id=None, filter=None) → kfp_server_api.models.api_list_jobs_response.ApiListJobsResponse[source]

List recurring runs.

Parameters:
  • page_token – Token for starting of the page.
  • page_size – Size of the page.
  • sort_by – One of ‘field_name’, ‘field_name desc’. For example, ‘name desc’.
  • experiment_id – Experiment id to filter upon.
  • filter – A url-encoded, JSON-serialized Filter protocol buffer (see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto)).
Returns:

A response object including a list of recurring_runs and next page token.

list_runs(page_token='', page_size=10, sort_by='', experiment_id=None, namespace=None, filter=None) → kfp_server_api.models.api_list_runs_response.ApiListRunsResponse[source]

List runs, optionally can be filtered by experiment or namespace.

Parameters:
  • page_token – Token for starting of the page.
  • page_size – Size of the page.
  • sort_by – One of ‘field_name’, ‘field_name desc’. For example, ‘name desc’.
  • experiment_id – Experiment id to filter upon
  • namespace – Kubernetes namespace to filter upon. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized.
  • filter – A url-encoded, JSON-serialized Filter protocol buffer (see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto)).
Returns:

A response object including a list of experiments and next page token.

run_pipeline(experiment_id: str, job_name: str, pipeline_package_path: Optional[str] = None, params: Optional[dict] = None, pipeline_id: Optional[str] = None, version_id: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[str] = None, service_account: Optional[str] = None) → kfp_server_api.models.api_run.ApiRun[source]

Run a specified pipeline.

Parameters:
  • experiment_id – The id of an experiment.
  • job_name – Name of the job.
  • pipeline_package_path – Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
  • params – A dictionary with key (string) as param name and value (string) as as param value.
  • pipeline_id – The id of a pipeline.
  • version_id – The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run.
  • pipeline_root – The root path of the pipeline outputs. This argument should be used only for pipeline compiled with dsl.PipelineExecutionMode.V2_COMPATIBLE or dsl.PipelineExecutionMode.V2_ENGINGE mode.
  • enable_caching – Optional. Whether or not to enable caching for the run. This setting affects v2 compatible mode and v2 mode only. If not set, defaults to the compile time settings, which are True for all tasks by default, while users may specify different caching options for individual tasks. If set, the setting applies to all tasks in the pipeline – overrides the compile time settings.
  • service_account – Optional. Specifies which Kubernetes service account this run uses.
Returns:

A run object. Most important field is id.

set_user_namespace(namespace: str)[source]

Set user namespace into local context setting file.

This function should only be used when Kubeflow Pipelines is in the multi-user mode.

Parameters:namespace – kubernetes namespace the user has access to.
upload_pipeline(pipeline_package_path: str = None, pipeline_name: str = None, description: str = None) → kfp_server_api.models.api_pipeline.ApiPipeline[source]

Uploads the pipeline to the Kubeflow Pipelines cluster.

Parameters:
  • pipeline_package_path – Local path to the pipeline package.
  • pipeline_name – Optional. Name of the pipeline to be shown in the UI.
  • description – Optional. Description of the pipeline to be shown in the UI.
Returns:

Server response object containing pipleine id and other information.

upload_pipeline_version(pipeline_package_path, pipeline_version_name: str, pipeline_id: Optional[str] = None, pipeline_name: Optional[str] = None, description: Optional[str] = None) → kfp_server_api.models.api_pipeline_version.ApiPipelineVersion[source]

Uploads a new version of the pipeline to the Kubeflow Pipelines cluster.

Parameters:
  • pipeline_package_path – Local path to the pipeline package.
  • pipeline_version_name – Name of the pipeline version to be shown in the UI.
  • pipeline_id – Optional. Id of the pipeline.
  • pipeline_name – Optional. Name of the pipeline.
  • description – Optional. Description of the pipeline version to be shown in the UI.
Returns:

Server response object containing pipleine id and other information.

Raises:
  • ValueError when none or both of pipeline_id or pipeline_name are specified
  • kfp_server_api.ApiException – If pipeline id is not found.
wait_for_run_completion(run_id: str, timeout: int)[source]

Waits for a run to complete.

Parameters:
  • run_id – Run id, returned from run_pipeline.
  • timeout – Timeout in seconds.
Returns:

Most important fields are run and pipeline_runtime.

Return type:

A run detail object

Raises:

TimeoutError – if the pipeline run failed to finish before the specified timeout.