Source code for kfp._client

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import logging
import json
import os
import re
import tarfile
import tempfile
import warnings
import yaml
import zipfile
import datetime
from typing import Mapping, Callable, Optional

import kfp_server_api

from kfp import dsl
from kfp.compiler import compiler
from kfp.compiler._k8s_helper import sanitize_k8s_name

from kfp._auth import get_auth_token, get_gcp_access_token

# TTL of the access token associated with the client. This is needed because
# `gcloud auth print-access-token` generates a token with TTL=1 hour, after
# which the authentication expires. This TTL is needed for kfp.Client()
# initialized with host=<inverse proxy endpoint>.
# Set to 55 mins to provide some safe margin.
_GCP_ACCESS_TOKEN_TIMEOUT = datetime.timedelta(minutes=55)
# Operators on scalar values. Only applies to one of |int_value|,
# |long_value|, |string_value| or |timestamp_value|.
_FILTER_OPERATIONS = {"UNKNOWN": 0,
    "EQUALS" : 1,
    "NOT_EQUALS" : 2,
    "GREATER_THAN": 3,
    "GREATER_THAN_EQUALS": 5,
    "LESS_THAN": 6,
    "LESS_THAN_EQUALS": 7}

def _add_generated_apis(target_struct, api_module, api_client):
  """Initializes a hierarchical API object based on the generated API module.
  PipelineServiceApi.create_pipeline becomes target_struct.pipelines.create_pipeline
  """
  Struct = type('Struct', (), {})

  def camel_case_to_snake_case(name):
      import re
      return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()

  for api_name in dir(api_module):
      if not api_name.endswith('ServiceApi'):
          continue

      short_api_name = camel_case_to_snake_case(api_name[0:-len('ServiceApi')]) + 's'
      api_struct = Struct()
      setattr(target_struct, short_api_name, api_struct)
      service_api = getattr(api_module.api, api_name)
      initialized_service_api = service_api(api_client)
      for member_name in dir(initialized_service_api):
          if member_name.startswith('_') or member_name.endswith('_with_http_info'):
              continue

          bound_member = getattr(initialized_service_api, member_name)
          setattr(api_struct, member_name, bound_member)
  models_struct = Struct()
  for member_name in dir(api_module.models):
      if not member_name[0].islower():
          setattr(models_struct, member_name, getattr(api_module.models, member_name))
  target_struct.api_models = models_struct


KF_PIPELINES_ENDPOINT_ENV = 'KF_PIPELINES_ENDPOINT'
KF_PIPELINES_UI_ENDPOINT_ENV = 'KF_PIPELINES_UI_ENDPOINT'
KF_PIPELINES_DEFAULT_EXPERIMENT_NAME = 'KF_PIPELINES_DEFAULT_EXPERIMENT_NAME'
KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME = 'KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME'
KF_PIPELINES_IAP_OAUTH2_CLIENT_ID_ENV = 'KF_PIPELINES_IAP_OAUTH2_CLIENT_ID'
KF_PIPELINES_APP_OAUTH2_CLIENT_ID_ENV = 'KF_PIPELINES_APP_OAUTH2_CLIENT_ID'
KF_PIPELINES_APP_OAUTH2_CLIENT_SECRET_ENV = 'KF_PIPELINES_APP_OAUTH2_CLIENT_SECRET'


[docs]class Client(object): """API Client for KubeFlow Pipeline. Args: host: The host name to use to talk to Kubeflow Pipelines. If not set, the in-cluster service DNS name will be used, which only works if the current environment is a pod in the same cluster (such as a Jupyter instance spawned by Kubeflow's JupyterHub). If you have a different connection to cluster, such as a kubectl proxy connection, then set it to something like "127.0.0.1:8080/pipeline. If you connect to an IAP enabled cluster, set it to https://<your-deployment>.endpoints.<your-project>.cloud.goog/pipeline". client_id: The client ID used by Identity-Aware Proxy. namespace: The namespace where the kubeflow pipeline system is run. other_client_id: The client ID used to obtain the auth codes and refresh tokens. Reference: https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app. other_client_secret: The client secret used to obtain the auth codes and refresh tokens. existing_token: Pass in token directly, it's used for cases better get token outside of SDK, e.x. GCP Cloud Functions or caller already has a token cookies: CookieJar object containing cookies that will be passed to the pipelines API. proxy: HTTP or HTTPS proxy server ssl_ca_cert: Cert for proxy kube_context: String name of context within kubeconfig to use, defaults to the current-context set within kubeconfig. """ # in-cluster DNS name of the pipeline service IN_CLUSTER_DNS_NAME = 'ml-pipeline.{}.svc.cluster.local:8888' KUBE_PROXY_PATH = 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/' # Auto populated path in pods # https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod # https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#serviceaccount-admission-controller NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' LOCAL_KFP_CONTEXT = os.path.expanduser('~/.config/kfp/context.json') # TODO: Wrap the configurations for different authentication methods. def __init__(self, host=None, client_id=None, namespace='kubeflow', other_client_id=None, other_client_secret=None, existing_token=None, cookies=None, proxy=None, ssl_ca_cert=None, kube_context=None): """Create a new instance of kfp client. """ host = host or os.environ.get(KF_PIPELINES_ENDPOINT_ENV) self._uihost = os.environ.get(KF_PIPELINES_UI_ENDPOINT_ENV, host) client_id = client_id or os.environ.get(KF_PIPELINES_IAP_OAUTH2_CLIENT_ID_ENV) other_client_id = other_client_id or os.environ.get(KF_PIPELINES_APP_OAUTH2_CLIENT_ID_ENV) other_client_secret = other_client_secret or os.environ.get(KF_PIPELINES_APP_OAUTH2_CLIENT_SECRET_ENV) config = self._load_config(host, client_id, namespace, other_client_id, other_client_secret, existing_token, proxy, ssl_ca_cert, kube_context) # Save the loaded API client configuration, as a reference if update is # needed. self._load_context_setting_or_default() self._existing_config = config if cookies is None: cookies = self._context_setting.get('client_authentication_cookie') api_client = kfp_server_api.api_client.ApiClient(config, cookie=cookies, header_name=self._context_setting.get('client_authentication_header_name'), header_value=self._context_setting.get('client_authentication_header_value')) _add_generated_apis(self, kfp_server_api, api_client) self._job_api = kfp_server_api.api.job_service_api.JobServiceApi(api_client) self._run_api = kfp_server_api.api.run_service_api.RunServiceApi(api_client) self._experiment_api = kfp_server_api.api.experiment_service_api.ExperimentServiceApi(api_client) self._pipelines_api = kfp_server_api.api.pipeline_service_api.PipelineServiceApi(api_client) self._upload_api = kfp_server_api.api.PipelineUploadServiceApi(api_client) self._healthz_api = kfp_server_api.api.healthz_service_api.HealthzServiceApi(api_client) if not self._context_setting['namespace'] and self.get_kfp_healthz().multi_user is True: try: with open(Client.NAMESPACE_PATH, 'r') as f: current_namespace = f.read() self.set_user_namespace(current_namespace) except FileNotFoundError: logging.info('Failed to automatically set namespace.', exc_info=True) def _load_config(self, host, client_id, namespace, other_client_id, other_client_secret, existing_token, proxy, ssl_ca_cert, kube_context): config = kfp_server_api.configuration.Configuration() if proxy: # https://github.com/kubeflow/pipelines/blob/c6ac5e0b1fd991e19e96419f0f508ec0a4217c29/backend/api/python_http_client/kfp_server_api/rest.py#L100 config.proxy = proxy if ssl_ca_cert: config.ssl_ca_cert = ssl_ca_cert host = host or '' # Defaults to 'https' if host does not contain 'http' or 'https' protocol. if host and not host.startswith('http'): warnings.warn( 'The host %s does not contain the "http" or "https" protocol.' ' Defaults to "https".' % host) host = 'https://' + host # Preprocess the host endpoint to prevent some common user mistakes. if not client_id: # always preserving the protocol (http://localhost requires it) host = host.rstrip('/') if host: config.host = host token = None # "existing_token" is designed to accept token generated outside of SDK. Here is an example. # # https://cloud.google.com/functions/docs/securing/function-identity # https://cloud.google.com/endpoints/docs/grpc/service-account-authentication # # import requests # import kfp # # def get_access_token(): # url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token' # r = requests.get(url, headers={'Metadata-Flavor': 'Google'}) # r.raise_for_status() # access_token = r.json()['access_token'] # return access_token # # client = kfp.Client(host='<KFPHost>', existing_token=get_access_token()) # if existing_token: token = existing_token self._is_refresh_token = False elif client_id: token = get_auth_token(client_id, other_client_id, other_client_secret) self._is_refresh_token = True elif self._is_inverse_proxy_host(host): token = get_gcp_access_token() self._is_refresh_token = False if token: config.api_key['authorization'] = token config.api_key_prefix['authorization'] = 'Bearer' return config if host: # if host is explicitly set with auth token, it's probably a port forward address. return config import kubernetes as k8s in_cluster = True try: k8s.config.load_incluster_config() except: in_cluster = False pass if in_cluster: config.host = Client.IN_CLUSTER_DNS_NAME.format(namespace) return config try: k8s.config.load_kube_config(client_configuration=config, context=kube_context) except: print('Failed to load kube config.') return config if config.host: config.host = config.host + '/' + Client.KUBE_PROXY_PATH.format(namespace) return config def _is_inverse_proxy_host(self, host): if host: return re.match(r'\S+.googleusercontent.com/{0,1}$', host) if re.match(r'\w+', host): warnings.warn( 'The received host is %s, please include the full endpoint address ' '(with ".(pipelines/notebooks).googleusercontent.com")' % host) return False def _is_ipython(self): """Returns whether we are running in notebook.""" try: import IPython ipy = IPython.get_ipython() if ipy is None: return False except ImportError: return False return True def _get_url_prefix(self): if self._uihost: # User's own connection. if self._uihost.startswith('http://') or self._uihost.startswith('https://'): return self._uihost else: return 'http://' + self._uihost # In-cluster pod. We could use relative URL. return '/pipeline' def _load_context_setting_or_default(self): if os.path.exists(Client.LOCAL_KFP_CONTEXT): with open(Client.LOCAL_KFP_CONTEXT, 'r') as f: self._context_setting = json.load(f) else: self._context_setting = { 'namespace': '', } def _refresh_api_client_token(self): """Refreshes the existing token associated with the kfp_api_client.""" if getattr(self, '_is_refresh_token', None): return new_token = get_gcp_access_token() self._existing_config.api_key['authorization'] = new_token
[docs] def set_user_namespace(self, namespace): """Set user namespace into local context setting file. This function should only be used when Kubeflow Pipelines is in the multi-user mode. Args: namespace: kubernetes namespace the user has access to. """ self._context_setting['namespace'] = namespace if not os.path.exists(os.path.dirname(Client.LOCAL_KFP_CONTEXT)): os.makedirs(os.path.dirname(Client.LOCAL_KFP_CONTEXT)) with open(Client.LOCAL_KFP_CONTEXT, 'w') as f: json.dump(self._context_setting, f)
[docs] def get_kfp_healthz(self): """Gets healthz info of KFP deployment. Returns: response: json formatted response from the healtz endpoint. """ count = 0 response = None max_attempts = 5 while not response: count += 1 if count > max_attempts: raise TimeoutError('Failed getting healthz endpoint after {} attempts.'.format(max_attempts)) try: response = self._healthz_api.get_healthz() return response # ApiException, including network errors, is the only type that may # recover after retry. except kfp_server_api.ApiException: # logging.exception also logs detailed info about the ApiException logging.exception('Failed to get healthz info attempt {} of 5.'.format(count)) time.sleep(5)
[docs] def get_user_namespace(self): """Get user namespace in context config. Returns: namespace: kubernetes namespace from the local context file or empty if it wasn't set. """ return self._context_setting['namespace']
[docs] def create_experiment(self, name, description=None, namespace=None): """Create a new experiment. Args: name: The name of the experiment. description: Description of the experiment. namespace: Kubernetes namespace where the experiment should be created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized. Returns: An Experiment object. Most important field is id. """ namespace = namespace or self.get_user_namespace() experiment = None try: experiment = self.get_experiment(experiment_name=name, namespace=namespace) except ValueError as error: # Ignore error if the experiment does not exist. if not str(error).startswith('No experiment is found with name'): raise error if not experiment: logging.info('Creating experiment {}.'.format(name)) resource_references = [] if namespace: key = kfp_server_api.models.ApiResourceKey(id=namespace, type=kfp_server_api.models.ApiResourceType.NAMESPACE) reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER) resource_references.append(reference) experiment = kfp_server_api.models.ApiExperiment( name=name, description=description, resource_references=resource_references) experiment = self._experiment_api.create_experiment(body=experiment) if self._is_ipython(): import IPython html = \ ('<a href="%s/#/experiments/details/%s" target="_blank" >Experiment details</a>.' % (self._get_url_prefix(), experiment.id)) IPython.display.display(IPython.display.HTML(html)) return experiment
[docs] def get_pipeline_id(self, name): """Find the id of a pipeline by name. Args: name: Pipeline name. Returns: Returns the pipeline id if a pipeline with the name exists. """ pipeline_filter = json.dumps({ "predicates": [ { "op": _FILTER_OPERATIONS["EQUALS"], "key": "name", "stringValue": name, } ] }) result = self._pipelines_api.list_pipelines(filter=pipeline_filter) if result.pipelines is None: return None if len(result.pipelines)==1: return result.pipelines[0].id elif len(result.pipelines)>1: raise ValueError("Multiple pipelines with the name: {} found, the name needs to be unique".format(name)) return None
[docs] def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=None): """List experiments. Args: page_token: Token for starting of the page. page_size: Size of the page. sort_by: Can be '[field_name]', '[field_name] desc'. For example, 'name desc'. namespace: Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized. Returns: A response object including a list of experiments and next page token. """ namespace = namespace or self.get_user_namespace() response = self._experiment_api.list_experiment( page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.NAMESPACE, resource_reference_key_id=namespace) return response
[docs] def get_experiment(self, experiment_id=None, experiment_name=None, namespace=None): """Get details of an experiment Either experiment_id or experiment_name is required Args: experiment_id: Id of the experiment. (Optional) experiment_name: Name of the experiment. (Optional) namespace: Kubernetes namespace where the experiment was created. For single user deployment, leave it as None; For multi user, input the namespace where the user is authorized. Returns: A response object including details of a experiment. Throws: Exception if experiment is not found or None of the arguments is provided """ namespace = namespace or self.get_user_namespace() if experiment_id is None and experiment_name is None: raise ValueError('Either experiment_id or experiment_name is required') if experiment_id is not None: return self._experiment_api.get_experiment(id=experiment_id) experiment_filter = json.dumps({ "predicates": [ { "op": _FILTER_OPERATIONS["EQUALS"], "key": "name", "stringValue": experiment_name, } ] }) if namespace: result = self._experiment_api.list_experiment( filter=experiment_filter, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.NAMESPACE, resource_reference_key_id=namespace) else: result = self._experiment_api.list_experiment(filter=experiment_filter) if not result.experiments: raise ValueError('No experiment is found with name {}.'.format(experiment_name)) if len(result.experiments) > 1: raise ValueError('Multiple experiments is found with name {}.'.format(experiment_name)) return result.experiments[0]
[docs] def delete_experiment(self, experiment_id): """Delete experiment. Args: experiment_id: id of the experiment. Returns: Object. If the method is called asynchronously, returns the request thread. Throws: Exception if experiment is not found. """ return self._experiment_api.delete_experiment(id=experiment_id)
def _extract_pipeline_yaml(self, package_file): def _choose_pipeline_yaml_file(file_list) -> str: yaml_files = [file for file in file_list if file.endswith('.yaml')] if len(yaml_files) == 0: raise ValueError('Invalid package. Missing pipeline yaml file in the package.') if 'pipeline.yaml' in yaml_files: return 'pipeline.yaml' else: if len(yaml_files) == 1: return yaml_files[0] raise ValueError('Invalid package. There is no pipeline.yaml file and there are multiple yaml files.') if package_file.endswith('.tar.gz') or package_file.endswith('.tgz'): with tarfile.open(package_file, "r:gz") as tar: file_names = [member.name for member in tar if member.isfile()] pipeline_yaml_file = _choose_pipeline_yaml_file(file_names) with tar.extractfile(tar.getmember(pipeline_yaml_file)) as f: return yaml.safe_load(f) elif package_file.endswith('.zip'): with zipfile.ZipFile(package_file, 'r') as zip: pipeline_yaml_file = _choose_pipeline_yaml_file(zip.namelist()) with zip.open(pipeline_yaml_file) as f: return yaml.safe_load(f) elif package_file.endswith('.yaml') or package_file.endswith('.yml'): with open(package_file, 'r') as f: return yaml.safe_load(f) else: raise ValueError('The package_file '+ package_file + ' should end with one of the following formats: [.tar.gz, .tgz, .zip, .yaml, .yml]')
[docs] def list_pipelines(self, page_token='', page_size=10, sort_by=''): """List pipelines. Args: page_token: Token for starting of the page. page_size: Size of the page. sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. Returns: A response object including a list of pipelines and next page token. """ return self._pipelines_api.list_pipelines(page_token=page_token, page_size=page_size, sort_by=sort_by)
def list_pipeline_versions(self, pipeline_id: str, page_token='', page_size=10, sort_by=''): """List all versions of a given pipeline. Args: pipeline_id: The id of a pipeline. page_token: Token for starting of the page. page_size: Size of the page. sort_by: one of 'field_name', 'field_name desc'. For example, 'name desc'. Returns: A response object including a list of pipeline versions and next page token. """ return self._pipelines_api.list_pipeline_versions( resource_key_type="PIPELINE", resource_key_id=pipeline_id, page_token=page_token, page_size=page_size, sort_by=sort_by ) # TODO: provide default namespace, similar to kubectl default namespaces.
[docs] def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, version_id=None): """Run a specified pipeline. Args: experiment_id: The id of an experiment. job_name: Name of the job. pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). params: A dictionary with key (string) as param name and value (string) as as param value. pipeline_id: The id of a pipeline. version_id: The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run. Returns: A run object. Most important field is id. """ job_config = self._create_job_config( experiment_id=experiment_id, params=params, pipeline_package_path=pipeline_package_path, pipeline_id=pipeline_id, version_id=version_id) run_body = kfp_server_api.models.ApiRun( pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name) response = self._run_api.create_run(body=run_body) if self._is_ipython(): import IPython html = ('<a href="%s/#/runs/details/%s" target="_blank" >Run details</a>.' % (self._get_url_prefix(), response.run.id)) IPython.display.display(IPython.display.HTML(html)) return response.run
[docs] def create_recurring_run(self, experiment_id, job_name, description=None, start_time=None, end_time=None, interval_second=None, cron_expression=None, max_concurrency=1, no_catchup=None, params={}, pipeline_package_path=None, pipeline_id=None, version_id=None, enabled=True): """Create a recurring run. Args: experiment_id: The string id of an experiment. job_name: Name of the job. description: An optional job description. start_time: The RFC3339 time string of the time when to start the job. end_time: The RFC3339 time string of the time when to end the job. interval_second: Integer indicating the seconds between two recurring runs in for a periodic schedule. cron_expression: A cron expression representing a set of times, using 5 space-separated fields, e.g. "0 0 9 ? * 2-6". max_concurrency: Integer indicating how many jobs can be run in parallel. no_catchup: Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If no_catchup=False, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill. (default: {False}) pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). params: A dictionary with key (string) as param name and value (string) as param value. pipeline_id: The id of a pipeline. version_id: The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run. enabled: A bool indicating whether the recurring run is enabled or disabled. Returns: A Job object. Most important field is id. """ job_config = self._create_job_config( experiment_id=experiment_id, params=params, pipeline_package_path=pipeline_package_path, pipeline_id=pipeline_id, version_id=version_id) if all([interval_second, cron_expression]) or not any([interval_second, cron_expression]): raise ValueError('Either interval_second or cron_expression is required') if interval_second is not None: trigger = kfp_server_api.models.ApiTrigger( periodic_schedule=kfp_server_api.models.ApiPeriodicSchedule( start_time=start_time, end_time=end_time, interval_second=interval_second) ) if cron_expression is not None: trigger = kfp_server_api.models.ApiTrigger( cron_schedule=kfp_server_api.models.ApiCronSchedule( start_time=start_time, end_time=end_time, cron=cron_expression) ) job_body = kfp_server_api.models.ApiJob( enabled=enabled, pipeline_spec=job_config.spec, resource_references=job_config.resource_references, name=job_name, description=description, no_catchup=no_catchup, trigger=trigger, max_concurrency=max_concurrency) return self._job_api.create_job(body=job_body)
def _create_job_config(self, experiment_id, params, pipeline_package_path, pipeline_id, version_id): """Create a JobConfig with spec and resource_references. Args: experiment_id: The id of an experiment. pipeline_package_path: Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). params: A dictionary with key (string) as param name and value (string) as param value. pipeline_id: The id of a pipeline. version_id: The id of a pipeline version. If both pipeline_id and version_id are specified, version_id will take precendence. If only pipeline_id is specified, the default version of this pipeline is used to create the run. Returns: A JobConfig object with attributes spec and resource_reference. """ class JobConfig: def __init__(self, spec, resource_references): self.spec = spec self.resource_references = resource_references pipeline_json_string = None if pipeline_package_path: pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path) pipeline_json_string = json.dumps(pipeline_obj) api_params = [kfp_server_api.ApiParameter( name=sanitize_k8s_name(name=k, allow_capital_underscore=True), value=str(v)) for k,v in params.items()] resource_references = [] key = kfp_server_api.models.ApiResourceKey(id=experiment_id, type=kfp_server_api.models.ApiResourceType.EXPERIMENT) reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER) resource_references.append(reference) if version_id: key = kfp_server_api.models.ApiResourceKey(id=version_id, type=kfp_server_api.models.ApiResourceType.PIPELINE_VERSION) reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.CREATOR) resource_references.append(reference) spec = kfp_server_api.models.ApiPipelineSpec( pipeline_id=pipeline_id, workflow_manifest=pipeline_json_string, parameters=api_params) return JobConfig(spec=spec, resource_references=resource_references)
[docs] def create_run_from_pipeline_func( self, pipeline_func: Callable, arguments: Mapping[str, str], run_name: Optional[str] = None, experiment_name: Optional[str] = None, pipeline_conf: Optional[dsl.PipelineConf] = None, namespace: Optional[str] = None, mode: dsl.PipelineExecutionMode = dsl.PipelineExecutionMode.V1_LEGACY, launcher_image: Optional[str] = None): """Runs pipeline on KFP-enabled Kubernetes cluster. This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution. Args: pipeline_func: A function that describes a pipeline by calling components and composing them into execution graph. arguments: Arguments to the pipeline function provided as a dict. run_name: Optional. Name of the run to be shown in the UI. experiment_name: Optional. Name of the experiment to add the run to. pipeline_conf: Optional. Pipeline configuration ops that will be applied to all the ops in the pipeline func. namespace: Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized mode: The PipelineExecutionMode to use when compiling and running pipeline_func. launcher_image: The launcher image to use if the mode is specified as PipelineExecutionMode.V2_COMPATIBLE. Should only be needed for tests or custom deployments right now. """ #TODO: Check arguments against the pipeline function pipeline_name = pipeline_func.__name__ run_name = run_name or pipeline_name + ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S') with tempfile.TemporaryDirectory() as tmpdir: pipeline_package_path = os.path.join(tmpdir, 'pipeline.yaml') compiler.Compiler(mode=mode, launcher_image=launcher_image).compile( pipeline_func=pipeline_func, package_path=pipeline_package_path, pipeline_conf=pipeline_conf) return self.create_run_from_pipeline_package( pipeline_file=pipeline_package_path, arguments=arguments, run_name=run_name, experiment_name=experiment_name, namespace=namespace)
[docs] def create_run_from_pipeline_package( self, pipeline_file: str, arguments: Mapping[str, str], run_name: Optional[str] = None, experiment_name: Optional[str] = None, namespace: Optional[str] = None): """Runs pipeline on KFP-enabled Kubernetes cluster. This command takes a local pipeline package, creates or gets an experiment and submits the pipeline for execution. Args: pipeline_file: A compiled pipeline package file. arguments: Arguments to the pipeline function provided as a dict. run_name: Optional. Name of the run to be shown in the UI. experiment_name: Optional. Name of the experiment to add the run to. namespace: Kubernetes namespace where the pipeline runs are created. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized """ class RunPipelineResult: def __init__(self, client, run_info): self._client = client self.run_info = run_info self.run_id = run_info.id def wait_for_run_completion(self, timeout=None): timeout = timeout or datetime.timedelta.max return self._client.wait_for_run_completion(self.run_id, timeout) def __repr__(self): return 'RunPipelineResult(run_id={})'.format(self.run_id) #TODO: Check arguments against the pipeline function pipeline_name = os.path.basename(pipeline_file) experiment_name = experiment_name or os.environ.get(KF_PIPELINES_DEFAULT_EXPERIMENT_NAME, None) overridden_experiment_name = os.environ.get(KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME, experiment_name) if overridden_experiment_name != experiment_name: import warnings warnings.warn('Changing experiment name from "{}" to "{}".'.format(experiment_name, overridden_experiment_name)) experiment_name = overridden_experiment_name or 'Default' run_name = run_name or (pipeline_name + ' ' + datetime.datetime.now().strftime( '%Y-%m-%d %H-%M-%S')) experiment = self.create_experiment(name=experiment_name, namespace=namespace) run_info = self.run_pipeline(experiment.id, run_name, pipeline_file, arguments) return RunPipelineResult(self, run_info)
[docs] def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None, namespace=None): """List runs, optionally can be filtered by experiment or namespace. Args: page_token: Token for starting of the page. page_size: Size of the page. sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'. experiment_id: Experiment id to filter upon namespace: Kubernetes namespace to filter upon. For single user deployment, leave it as None; For multi user, input a namespace where the user is authorized. Returns: A response object including a list of experiments and next page token. """ namespace = namespace or self.get_user_namespace() if experiment_id is not None: response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.EXPERIMENT, resource_reference_key_id=experiment_id) elif namespace: response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.NAMESPACE, resource_reference_key_id=namespace) else: response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by) return response
[docs] def list_recurring_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None): """List recurring runs. Args: page_token: Token for starting of the page. page_size: Size of the page. sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'. experiment_id: Experiment id to filter upon. Returns: A response object including a list of recurring_runs and next page token. """ if experiment_id is not None: response = self._job_api.list_jobs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.EXPERIMENT, resource_reference_key_id=experiment_id) else: response = self._job_api.list_jobs(page_token=page_token, page_size=page_size, sort_by=sort_by) return response
[docs] def get_recurring_run(self, job_id): """Get recurring_run details. Args: job_id: id of the recurring_run. Returns: A response object including details of a recurring_run. Throws: Exception if recurring_run is not found. """ return self._job_api.get_job(id=job_id)
[docs] def get_run(self, run_id): """Get run details. Args: run_id: id of the run. Returns: A response object including details of a run. Throws: Exception if run is not found. """ return self._run_api.get_run(run_id=run_id)
[docs] def wait_for_run_completion(self, run_id, timeout): """Waits for a run to complete. Args: run_id: Run id, returned from run_pipeline. timeout: Timeout in seconds. Returns: A run detail object: Most important fields are run and pipeline_runtime. Raises: TimeoutError: if the pipeline run failed to finish before the specified timeout. """ status = 'Running:' start_time = datetime.datetime.now() last_token_refresh_time = datetime.datetime.now() if isinstance(timeout, datetime.timedelta): timeout = timeout.total_seconds() while (status is None or status.lower() not in ['succeeded', 'failed', 'skipped', 'error']): # Refreshes the access token before it hits the TTL. if (datetime.datetime.now() - last_token_refresh_time > _GCP_ACCESS_TOKEN_TIMEOUT): self._refresh_api_client_token() last_token_refresh_time = datetime.datetime.now() get_run_response = self._run_api.get_run(run_id=run_id) status = get_run_response.run.status elapsed_time = (datetime.datetime.now() - start_time).total_seconds() logging.info('Waiting for the job to complete...') if elapsed_time > timeout: raise TimeoutError('Run timeout') time.sleep(5) return get_run_response
def _get_workflow_json(self, run_id): """Get the workflow json. Args: run_id: run id, returned from run_pipeline. Returns: workflow: Json workflow """ get_run_response = self._run_api.get_run(run_id=run_id) workflow = get_run_response.pipeline_runtime.workflow_manifest workflow_json = json.loads(workflow) return workflow_json
[docs] def upload_pipeline( self, pipeline_package_path: str = None, pipeline_name: str = None, description: str = None, ): """Uploads the pipeline to the Kubeflow Pipelines cluster. Args: pipeline_package_path: Local path to the pipeline package. pipeline_name: Optional. Name of the pipeline to be shown in the UI. description: Optional. Description of the pipeline to be shown in the UI. Returns: Server response object containing pipleine id and other information. """ response = self._upload_api.upload_pipeline(pipeline_package_path, name=pipeline_name, description=description) if self._is_ipython(): import IPython html = '<a href=%s/#/pipelines/details/%s>Pipeline details</a>.' % (self._get_url_prefix(), response.id) IPython.display.display(IPython.display.HTML(html)) return response
[docs] def upload_pipeline_version( self, pipeline_package_path, pipeline_version_name: str, pipeline_id: Optional[str] = None, pipeline_name: Optional[str] = None ): """Uploads a new version of the pipeline to the Kubeflow Pipelines cluster. Args: pipeline_package_path: Local path to the pipeline package. pipeline_version_name: Name of the pipeline version to be shown in the UI. pipeline_id: Optional. Id of the pipeline. pipeline_name: Optional. Name of the pipeline. Returns: Server response object containing pipleine id and other information. Throws: ValueError when none or both of pipeline_id or pipeline_name are specified Exception if pipeline id is not found. """ if all([pipeline_id, pipeline_name]) or not any([pipeline_id, pipeline_name]): raise ValueError('Either pipeline_id or pipeline_name is required') if pipeline_name: pipeline_id = self.get_pipeline_id(pipeline_name) response = self._upload_api.upload_pipeline_version( pipeline_package_path, name=pipeline_version_name, pipelineid=pipeline_id ) if self._is_ipython(): import IPython html = '<a href=%s/#/pipelines/details/%s>Pipeline details</a>.' % (self._get_url_prefix(), response.id) IPython.display.display(IPython.display.HTML(html)) return response
[docs] def get_pipeline(self, pipeline_id): """Get pipeline details. Args: pipeline_id: id of the pipeline. Returns: A response object including details of a pipeline. Throws: Exception if pipeline is not found. """ return self._pipelines_api.get_pipeline(id=pipeline_id)
[docs] def delete_pipeline(self, pipeline_id): """Delete pipeline. Args: pipeline_id: id of the pipeline. Returns: Object. If the method is called asynchronously, returns the request thread. Throws: Exception if pipeline is not found. """ return self._pipelines_api.delete_pipeline(id=pipeline_id)
[docs] def list_pipeline_versions(self, pipeline_id, page_token='', page_size=10, sort_by=''): """Lists pipeline versions. Args: pipeline_id: Id of the pipeline to list versions page_token: Token for starting of the page. page_size: Size of the page. sort_by: One of 'field_name', 'field_name desc'. For example, 'name desc'. Returns: A response object including a list of versions and next page token. """ return self._pipelines_api.list_pipeline_versions(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.PIPELINE, resource_key_id=pipeline_id)