Source code for kfp.components.yaml_component
# Copyright 2021-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for loading components from compiled YAML."""
from typing import Optional, Tuple
from google.protobuf import json_format
from kfp import components
from kfp.components import structures
from kfp.pipeline_spec import pipeline_spec_pb2
import requests
import yaml
[docs]class YamlComponent(components.BaseComponent):
"""A component loaded from a YAML file.
**Note:** ``YamlComponent`` is not intended to be used to construct components directly. Use ``kfp.components.load_component_from_*()`` instead.
Attribute:
component_spec: Component definition.
component_yaml: The yaml string that this component is loaded from.
"""
def __init__(
self,
component_spec: structures.ComponentSpec,
component_yaml: str,
):
super().__init__(component_spec=component_spec)
self.component_yaml = component_yaml
@property
def pipeline_spec(self) -> pipeline_spec_pb2.PipelineSpec:
"""Returns the pipeline spec of the component."""
component_dict = yaml.safe_load(self.component_yaml)
is_v1 = 'implementation' in set(component_dict.keys())
if is_v1:
return self.component_spec.to_pipeline_spec()
else:
return json_format.ParseDict(component_dict,
pipeline_spec_pb2.PipelineSpec())
[docs] def execute(self, *args, **kwargs):
"""Not implemented."""
raise NotImplementedError
[docs]def load_component_from_text(text: str) -> YamlComponent:
"""Loads a component from text.
Args:
text (str): Component YAML text.
Returns:
Component loaded from YAML.
"""
return YamlComponent(
component_spec=structures.ComponentSpec.load_from_component_yaml(text),
component_yaml=text)
[docs]def load_component_from_file(file_path: str) -> YamlComponent:
"""Loads a component from a file.
Args:
file_path (str): Filepath to a YAML component.
Returns:
Component loaded from YAML.
Example:
::
from kfp import components
components.load_component_from_file('~/path/to/pipeline.yaml')
"""
with open(file_path, 'r') as component_stream:
return load_component_from_text(component_stream.read())
[docs]def load_component_from_url(url: str,
auth: Optional[Tuple[str,
str]] = None) -> YamlComponent:
"""Loads a component from a URL.
Args:
url (str): URL to a YAML component.
auth (Tuple[str, str], optional): A ``('<username>', '<password>')`` tuple of authentication credentials necessary for URL access. See `Requests Authorization <https://requests.readthedocs.io/en/latest/user/authentication/#authentication>`_ for more information.
Returns:
Component loaded from YAML.
Example:
::
from kfp import components
components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/7b49eadf621a9054e1f1315c86f95fb8cf8c17c3/sdk/python/kfp/compiler/test_data/components/identity.yaml')
components.load_component_from_url('gs://path/to/pipeline.yaml')
"""
if url is None:
raise ValueError('url must be a string.')
if url.startswith('gs://'):
#Replacing the gs:// URI with https:// URI (works for public objects)
url = 'https://storage.googleapis.com/' + url[len('gs://'):]
resp = requests.get(url, auth=auth)
resp.raise_for_status()
return load_component_from_text(resp.content.decode('utf-8'))