Source code for kfp.dsl.types

# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for input/output types in Pipeline DSL.

Feature stage:
[Beta](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#beta).
"""
from typing import Dict, Union
import warnings

from kfp.v2.components.types import type_utils


[docs]class BaseType: """BaseType is a base type for all scalar and artifact types."""
[docs] def to_dict(self) -> Union[Dict, str]: """to_dict serializes the type instance into a python dictionary or string.""" return { type(self).__name__: self.__dict__ } if self.__dict__ else type(self).__name__
# Primitive Types
[docs]class Integer(BaseType): def __init__(self): self.openapi_schema_validator = {"type": "integer"}
[docs]class String(BaseType): def __init__(self): self.openapi_schema_validator = {"type": "string"}
[docs]class Float(BaseType): def __init__(self): self.openapi_schema_validator = {"type": "number"}
[docs]class Bool(BaseType): def __init__(self): self.openapi_schema_validator = {"type": "boolean"}
[docs]class List(BaseType): def __init__(self): self.openapi_schema_validator = {"type": "array"}
[docs]class Dict(BaseType): def __init__(self): self.openapi_schema_validator = { "type": "object", }
# GCP Types
[docs]class GCSPath(BaseType): def __init__(self): self.openapi_schema_validator = { "type": "string", "pattern": "^gs://.*$" }
[docs]class GCRPath(BaseType): def __init__(self): self.openapi_schema_validator = { "type": "string", "pattern": "^.*gcr\\.io/.*$" }
[docs]class GCPRegion(BaseType): def __init__(self): self.openapi_schema_validator = {"type": "string"}
[docs]class GCPProjectID(BaseType): """MetaGCPProjectID: GCP project id""" def __init__(self): self.openapi_schema_validator = {"type": "string"}
# General Types
[docs]class LocalPath(BaseType): #TODO: add restriction to path def __init__(self): self.openapi_schema_validator = {"type": "string"}
[docs]class InconsistentTypeException(Exception): """InconsistencyTypeException is raised when two types are not consistent.""" pass
[docs]class InconsistentTypeWarning(Warning): """InconsistentTypeWarning is issued when two types are not consistent.""" pass
TypeSpecType = Union[str, Dict]
[docs]def verify_type_compatibility(given_type: TypeSpecType, expected_type: TypeSpecType, error_message_prefix: str = ""): """verify_type_compatibility verifies that the given argument type is compatible with the expected input type. Args: given_type (str/dict): The type of the argument passed to the input expected_type (str/dict): The declared type of the input """ # Missing types are treated as being compatible with missing types. if given_type is None or expected_type is None: return True # Generic artifacts resulted from missing type or explicit "Artifact" type # is compatible with any artifact types. # However, generic artifacts resulted from arbitrary unknown types do not # have such "compatible" feature. if not type_utils.is_parameter_type( str(expected_type)) and str(given_type).lower() == "artifact": return True if not type_utils.is_parameter_type( str(given_type)) and str(expected_type).lower() == "artifact": return True types_are_compatible = check_types(given_type, expected_type) if not types_are_compatible: error_text = error_message_prefix + ( 'Argument type "{}" is incompatible with the input type "{}"' ).format(str(given_type), str(expected_type)) import kfp if kfp.TYPE_CHECK: raise InconsistentTypeException(error_text) else: warnings.warn(InconsistentTypeWarning(error_text)) return types_are_compatible
[docs]def check_types(checked_type, expected_type): """check_types checks the type consistency. For each of the attribute in checked_type, there is the same attribute in expected_type with the same value. However, expected_type could contain more attributes that checked_type does not contain. Args: checked_type (BaseType/str/dict): it describes a type from the upstream component output expected_type (BaseType/str/dict): it describes a type from the downstream component input """ if isinstance(checked_type, BaseType): checked_type = checked_type.to_dict() if isinstance(checked_type, str): checked_type = {checked_type: {}} if isinstance(expected_type, BaseType): expected_type = expected_type.to_dict() if isinstance(expected_type, str): expected_type = {expected_type: {}} return _check_dict_types(checked_type, expected_type)
def _check_valid_type_dict(payload): """_check_valid_type_dict checks whether a dict is a correct serialization of a type. Args: payload(dict) """ if not isinstance(payload, dict) or len(payload) != 1: return False for type_name in payload: if not isinstance(payload[type_name], dict): return False property_types = (int, str, float, bool) property_value_types = (int, str, float, bool, dict) for property_name in payload[type_name]: if not isinstance(property_name, property_types) or not isinstance( payload[type_name][property_name], property_value_types): return False return True def _check_dict_types(checked_type, expected_type): """_check_dict_types checks the type consistency. Args: checked_type (dict): A dict that describes a type from the upstream component output expected_type (dict): A dict that describes a type from the downstream component input """ if not checked_type or not expected_type: # If the type is empty, it matches any types return True checked_type_name, _ = list(checked_type.items())[0] expected_type_name, _ = list(expected_type.items())[0] if checked_type_name == "" or expected_type_name == "": # If the type name is empty, it matches any types return True if checked_type_name != expected_type_name: print("type name " + str(checked_type_name) + " is different from expected: " + str(expected_type_name)) return False type_name = checked_type_name for type_property in checked_type[type_name]: if type_property not in expected_type[type_name]: print(type_name + " has a property " + str(type_property) + " that the latter does not.") return False if checked_type[type_name][type_property] != expected_type[type_name][ type_property]: print(type_name + " has a property " + str(type_property) + " with value: " + str(checked_type[type_name][type_property]) + " and " + str(expected_type[type_name][type_property])) return False return True