Source code for kfp.components._components

# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = [
    'load_component',
    'load_component_from_text',
    'load_component_from_url',
    'load_component_from_file',
]

import copy
from collections import OrderedDict
import pathlib
from typing import Any, Callable, List, Mapping, NamedTuple, Sequence, Union
import warnings

from ._naming import _sanitize_file_name, _sanitize_python_function_name, generate_unique_name_conversion_table
from ._yaml_utils import load_yaml
from .structures import *
from ._data_passing import serialize_value, get_canonical_type_for_type_name

from kfp.v2.components.types import type_utils

_default_component_name = 'Component'


[docs]def load_component(filename=None, url=None, text=None, component_spec=None): """Loads component from text, file or URL and creates a task factory function. Only one argument should be specified. Args: filename: Path of local file containing the component definition. url: The URL of the component file data. text: A string containing the component file data. component_spec: A ComponentSpec containing the component definition. Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). """ #This function should be called load_task_factory since it returns a factory function. #The real load_component function should produce an object with component properties (e.g. name, description, inputs/outputs). #TODO: Change this function to return component spec object but it should be callable to construct tasks. non_null_args_count = len( [name for name, value in locals().items() if value != None]) if non_null_args_count != 1: raise ValueError('Need to specify exactly one source') if filename: return load_component_from_file(filename) elif url: return load_component_from_url(url) elif text: return load_component_from_text(text) elif component_spec: return load_component_from_spec(component_spec) else: raise ValueError('Need to specify a source')
[docs]def load_component_from_url(url: str, auth=None): """Loads component from URL and creates a task factory function. Args: url: The URL of the component file data auth: Auth object for the requests library. See https://requests.readthedocs.io/en/master/user/authentication/ Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). """ component_spec = _load_component_spec_from_url(url, auth) url = _fix_component_uri(url) component_ref = ComponentReference(url=url) return _create_task_factory_from_component_spec( component_spec=component_spec, component_filename=url, component_ref=component_ref, )
[docs]def load_component_from_file(filename): """Loads component from file and creates a task factory function. Args: filename: Path of local file containing the component definition. Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). """ component_spec = _load_component_spec_from_file(path=filename) return _create_task_factory_from_component_spec( component_spec=component_spec, component_filename=filename, )
[docs]def load_component_from_text(text): """Loads component from text and creates a task factory function. Args: text: A string containing the component file data. Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). """ if text is None: raise TypeError component_spec = _load_component_spec_from_component_text(text) return _create_task_factory_from_component_spec( component_spec=component_spec)
def load_component_from_spec(component_spec): """Loads component from a ComponentSpec and creates a task factory function. Args: component_spec: A ComponentSpec containing the component definition. Returns: A factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp). """ if component_spec is None: raise TypeError return _create_task_factory_from_component_spec( component_spec=component_spec) def _fix_component_uri(uri: str) -> str: #Handling Google Cloud Storage URIs if uri.startswith('gs://'): #Replacing the gs:// URI with https:// URI (works for public objects) uri = 'https://storage.googleapis.com/' + uri[len('gs://'):] return uri def _load_component_spec_from_file(path) -> ComponentSpec: with open(path, 'rb') as component_stream: return _load_component_spec_from_yaml_or_zip_bytes( component_stream.read()) def _load_component_spec_from_url(url: str, auth=None): if url is None: raise TypeError url = _fix_component_uri(url) import requests resp = requests.get(url, auth=auth) resp.raise_for_status() return _load_component_spec_from_yaml_or_zip_bytes(resp.content) _COMPONENT_FILE_NAME_IN_ARCHIVE = 'component.yaml' def _load_component_spec_from_yaml_or_zip_bytes(data: bytes): """Loads component spec from binary data. The data can be a YAML file or a zip file with a component.yaml file inside. """ import zipfile import io stream = io.BytesIO(data) if zipfile.is_zipfile(stream): stream.seek(0) with zipfile.ZipFile(stream) as zip_obj: data = zip_obj.read(_COMPONENT_FILE_NAME_IN_ARCHIVE) return _load_component_spec_from_component_text(data) def _load_component_spec_from_component_text(text) -> ComponentSpec: component_dict = load_yaml(text) component_spec = ComponentSpec.from_dict(component_dict) if isinstance(component_spec.implementation, ContainerImplementation) and ( component_spec.implementation.container.command is None): warnings.warn( 'Container component must specify command to be compatible with KFP ' 'v2 compatible mode and emissary executor, which will be the default' ' executor for KFP v2.' 'https://www.kubeflow.org/docs/components/pipelines/installation/choose-executor/', category=FutureWarning, ) # Calculating hash digest for the component import hashlib data = text if isinstance(text, bytes) else text.encode('utf-8') data = data.replace(b'\r\n', b'\n') # Normalizing line endings digest = hashlib.sha256(data).hexdigest() component_spec._digest = digest return component_spec _inputs_dir = '/tmp/inputs' _outputs_dir = '/tmp/outputs' _single_io_file_name = 'data' def _generate_input_file_name(port_name): return str( pathlib.PurePosixPath(_inputs_dir, _sanitize_file_name(port_name), _single_io_file_name)) def _generate_output_file_name(port_name): return str( pathlib.PurePosixPath(_outputs_dir, _sanitize_file_name(port_name), _single_io_file_name)) def _react_to_incompatible_reference_type( input_type, argument_type, input_name: str, ): """Raises error for the case when the argument type is incompatible with the input type.""" message = 'Argument with type "{}" was passed to the input "{}" that has type "{}".'.format( argument_type, input_name, input_type) raise TypeError(message) def _create_task_spec_from_component_and_arguments( component_spec: ComponentSpec, arguments: Mapping[str, Any], component_ref: ComponentReference = None, **kwargs) -> TaskSpec: """Constructs a TaskSpec object from component reference and arguments. The function also checks the arguments types and serializes them. """ if component_ref is None: component_ref = ComponentReference(spec=component_spec) else: component_ref = copy.copy(component_ref) component_ref.spec = component_spec # Not checking for missing or extra arguments since the dynamic factory function checks that task_arguments = {} for input_name, argument_value in arguments.items(): input_type = component_spec._inputs_dict[input_name].type if isinstance(argument_value, (GraphInputArgument, TaskOutputArgument)): # argument_value is a reference if isinstance(argument_value, GraphInputArgument): reference_type = argument_value.graph_input.type elif isinstance(argument_value, TaskOutputArgument): reference_type = argument_value.task_output.type else: reference_type = None if reference_type and input_type and reference_type != input_type: _react_to_incompatible_reference_type(input_type, reference_type, input_name) task_arguments[input_name] = argument_value else: # argument_value is a constant value serialized_argument_value = serialize_value(argument_value, input_type) task_arguments[input_name] = serialized_argument_value task = TaskSpec( component_ref=component_ref, arguments=task_arguments, ) task._init_outputs() return task _default_container_task_constructor = _create_task_spec_from_component_and_arguments # Holds the function that constructs a task object based on ComponentSpec, arguments and ComponentReference. # Framework authors can override this constructor function to construct different framework-specific task-like objects. # The task object should have the task.outputs dictionary with keys corresponding to the ComponentSpec outputs. # The default constructor creates and instance of the TaskSpec class. _container_task_constructor = _default_container_task_constructor _always_expand_graph_components = False def _create_task_object_from_component_and_arguments( component_spec: ComponentSpec, arguments: Mapping[str, Any], component_ref: ComponentReference = None, **kwargs): """Creates a task object from component and argument. Unlike _container_task_constructor, handles the graph components as well. """ if (isinstance(component_spec.implementation, GraphImplementation) and ( # When the container task constructor is not overriden, we just construct TaskSpec for both container and graph tasks. # If the container task constructor is overriden, we should expand the graph components so that the override is called for all sub-tasks. _container_task_constructor != _default_container_task_constructor or _always_expand_graph_components)): return _resolve_graph_task( component_spec=component_spec, arguments=arguments, component_ref=component_ref, **kwargs, ) task = _container_task_constructor( component_spec=component_spec, arguments=arguments, component_ref=component_ref, **kwargs, ) return task class _DefaultValue: def __init__(self, value): self.value = value def __repr__(self): return repr(self.value) #TODO: Refactor the function to make it shorter def _create_task_factory_from_component_spec( component_spec: ComponentSpec, component_filename=None, component_ref: ComponentReference = None): name = component_spec.name or _default_component_name func_docstring_lines = [] if component_spec.name: func_docstring_lines.append(component_spec.name) if component_spec.description: func_docstring_lines.append(component_spec.description) inputs_list = component_spec.inputs or [] #List[InputSpec] input_names = [input.name for input in inputs_list] #Creating the name translation tables : Original <-> Pythonic input_name_to_pythonic = generate_unique_name_conversion_table( input_names, _sanitize_python_function_name) pythonic_name_to_input_name = { v: k for k, v in input_name_to_pythonic.items() } if component_ref is None: component_ref = ComponentReference( spec=component_spec, url=component_filename) else: component_ref.spec = component_spec digest = getattr(component_spec, '_digest', None) # TODO: Calculate the digest if missing if digest: # TODO: Report possible digest conflicts component_ref.digest = digest def create_task_object_from_component_and_pythonic_arguments( pythonic_arguments): arguments = { pythonic_name_to_input_name[argument_name]: argument_value for argument_name, argument_value in pythonic_arguments.items() if not isinstance( argument_value, _DefaultValue ) # Skipping passing arguments for optional values that have not been overridden. } return _create_task_object_from_component_and_arguments( component_spec=component_spec, arguments=arguments, component_ref=component_ref, ) import inspect from . import _dynamic #Reordering the inputs since in Python optional parameters must come after required parameters reordered_input_list = [ input for input in inputs_list if input.default is None and not input.optional ] + [ input for input in inputs_list if not (input.default is None and not input.optional) ] def component_default_to_func_default(component_default: str, is_optional: bool): if is_optional: return _DefaultValue(component_default) if component_default is not None: return component_default return inspect.Parameter.empty input_parameters = [ _dynamic.KwParameter( input_name_to_pythonic[port.name], annotation=(get_canonical_type_for_type_name(str(port.type)) or str( port.type) if port.type else inspect.Parameter.empty), default=component_default_to_func_default(port.default, port.optional), ) for port in reordered_input_list if not type_utils.is_task_final_status_type(port.type) ] factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system. task_factory = _dynamic.create_function_from_parameters( create_task_object_from_component_and_pythonic_arguments, factory_function_parameters, documentation='\n'.join(func_docstring_lines), func_name=name, func_filename=component_filename if (component_filename and (component_filename.endswith('.yaml') or component_filename.endswith('.yml'))) else None, ) task_factory.component_spec = component_spec return task_factory _ResolvedCommandLineAndPaths = NamedTuple( '_ResolvedCommandLineAndPaths', [ ('command', Sequence[str]), ('args', Sequence[str]), ('input_paths', Mapping[str, str]), ('output_paths', Mapping[str, str]), ('inputs_consumed_by_value', Mapping[str, str]), ], ) def _resolve_command_line_and_paths( component_spec: ComponentSpec, arguments: Mapping[str, str], input_path_generator: Callable[[str], str] = _generate_input_file_name, output_path_generator: Callable[[str], str] = _generate_output_file_name, argument_serializer: Callable[[str], str] = serialize_value, placeholder_resolver: Callable[[Any, ComponentSpec, Mapping[str, str]], str] = None, ) -> _ResolvedCommandLineAndPaths: """Resolves the command line argument placeholders. Also produces the maps of the generated inpuit/output paths. """ argument_values = arguments if not isinstance(component_spec.implementation, ContainerImplementation): raise TypeError( 'Only container components have command line to resolve') inputs_dict = { input_spec.name: input_spec for input_spec in component_spec.inputs or [] } container_spec = component_spec.implementation.container output_paths = OrderedDict( ) #Preserving the order to make the kubernetes output names deterministic unconfigurable_output_paths = container_spec.file_outputs or {} for output in component_spec.outputs or []: if output.name in unconfigurable_output_paths: output_paths[output.name] = unconfigurable_output_paths[output.name] input_paths = OrderedDict() inputs_consumed_by_value = {} def expand_command_part(arg) -> Union[str, List[str], None]: if arg is None: return None if placeholder_resolver: resolved_arg = placeholder_resolver( arg=arg, component_spec=component_spec, arguments=arguments, ) if resolved_arg is not None: return resolved_arg if isinstance(arg, (str, int, float, bool)): return str(arg) if isinstance(arg, InputValuePlaceholder): input_name = arg.input_name input_spec = inputs_dict[input_name] input_value = argument_values.get(input_name, None) if input_value is not None: serialized_argument = argument_serializer( input_value, input_spec.type) inputs_consumed_by_value[input_name] = serialized_argument return serialized_argument else: if input_spec.optional or type_utils.is_task_final_status_type( input_spec.type): return None else: raise ValueError( 'No value provided for input {}'.format(input_name)) if isinstance(arg, InputPathPlaceholder): input_name = arg.input_name input_value = argument_values.get(input_name, None) if input_value is not None: input_path = input_path_generator(input_name) input_paths[input_name] = input_path return input_path else: input_spec = inputs_dict[input_name] if input_spec.optional: #Even when we support default values there is no need to check for a default here. #In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself. return None else: raise ValueError( 'No value provided for input {}'.format(input_name)) elif isinstance(arg, OutputPathPlaceholder): output_name = arg.output_name output_filename = output_path_generator(output_name) if arg.output_name in output_paths: if output_paths[output_name] != output_filename: raise ValueError( 'Conflicting output files specified for port {}: {} and {}' .format(output_name, output_paths[output_name], output_filename)) else: output_paths[output_name] = output_filename return output_filename elif isinstance(arg, ConcatPlaceholder): expanded_argument_strings = expand_argument_list(arg.items) return ''.join(expanded_argument_strings) elif isinstance(arg, IfPlaceholder): arg = arg.if_structure condition_result = expand_command_part(arg.condition) from distutils.util import strtobool condition_result_bool = condition_result and strtobool( condition_result ) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and [] result_node = arg.then_value if condition_result_bool else arg.else_value if result_node is None: return [] if isinstance(result_node, list): expanded_result = expand_argument_list(result_node) else: expanded_result = expand_command_part(result_node) return expanded_result elif isinstance(arg, IsPresentPlaceholder): argument_is_present = argument_values.get(arg.input_name, None) is not None return str(argument_is_present) else: raise TypeError('Unrecognized argument type: {}'.format(arg)) def expand_argument_list(argument_list): expanded_list = [] if argument_list is not None: for part in argument_list: expanded_part = expand_command_part(part) if expanded_part is not None: if isinstance(expanded_part, list): expanded_list.extend(expanded_part) else: expanded_list.append(str(expanded_part)) return expanded_list expanded_command = expand_argument_list(container_spec.command) expanded_args = expand_argument_list(container_spec.args) return _ResolvedCommandLineAndPaths( command=expanded_command, args=expanded_args, input_paths=input_paths, output_paths=output_paths, inputs_consumed_by_value=inputs_consumed_by_value, ) _ResolvedGraphTask = NamedTuple( '_ResolvedGraphTask', [ ('component_spec', ComponentSpec), ('component_ref', ComponentReference), ('outputs', Mapping[str, Any]), ('task_arguments', Mapping[str, Any]), ], ) def _resolve_graph_task( component_spec: ComponentSpec, arguments: Mapping[str, Any], component_ref: ComponentReference = None, ) -> TaskSpec: from ..components import ComponentStore component_store = ComponentStore.default_store graph = component_spec.implementation.graph graph_input_arguments = { input.name: input.default for input in component_spec.inputs or [] if input.default is not None } graph_input_arguments.update(arguments) outputs_of_tasks = {} def resolve_argument(argument): if isinstance(argument, (str, int, float, bool)): return argument elif isinstance(argument, GraphInputArgument): return graph_input_arguments[argument.graph_input.input_name] elif isinstance(argument, TaskOutputArgument): upstream_task_output_ref = argument.task_output upstream_task_outputs = outputs_of_tasks[ upstream_task_output_ref.task_id] upstream_task_output = upstream_task_outputs[ upstream_task_output_ref.output_name] return upstream_task_output else: raise TypeError( 'Argument for input has unexpected type "{}".'.format( type(argument))) for task_id, task_spec in graph._toposorted_tasks.items( ): # Cannot use graph.tasks here since they might be listed not in dependency order. Especially on python <3.6 where the dicts do not preserve ordering resolved_task_component_ref = component_store._load_component_spec_in_component_ref( task_spec.component_ref) # TODO: Handle the case when optional graph component input is passed to optional task component input task_arguments = { input_name: resolve_argument(argument) for input_name, argument in task_spec.arguments.items() } task_component_spec = resolved_task_component_ref.spec task_obj = _create_task_object_from_component_and_arguments( component_spec=task_component_spec, arguments=task_arguments, component_ref=task_spec.component_ref, ) task_outputs_with_original_names = { output.name: task_obj.outputs[output.name] for output in task_component_spec.outputs or [] } outputs_of_tasks[task_id] = task_outputs_with_original_names resolved_graph_outputs = OrderedDict([ (output_name, resolve_argument(argument)) for output_name, argument in graph.output_values.items() ]) # For resolved graph component tasks task.outputs point to the actual tasks that originally produced the output that is later returned from the graph graph_task = _ResolvedGraphTask( component_ref=component_ref, component_spec=component_spec, outputs=resolved_graph_outputs, task_arguments=arguments, ) return graph_task