Source code for kfp.dsl._pipeline_volume

# Copyright 2019 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import json

from kubernetes.client.models import (V1Volume,
                                      V1PersistentVolumeClaimVolumeSource)

from . import _pipeline


def prune_none_dict_values(d: dict) -> dict:
  return {
      k: prune_none_dict_values(v) if isinstance(v, dict) else v
      for k, v in d.items()
      if v is not None
  }


[docs]class PipelineVolume(V1Volume): """Representing a volume that is passed between pipeline operators and is to be mounted by a ContainerOp or its inherited type. A PipelineVolume object can be used as an extention of the pipeline function's filesystem. It may then be passed between ContainerOps, exposing dependencies. TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the stability level of this feature. Args: pvc: The name of an existing PVC volume: Create a deep copy out of a V1Volume or PipelineVolume with no deps Raises: ValueError: If volume is not None and kwargs is not None If pvc is not None and kwargs.pop("name") is not None """ def __init__(self, pvc: str = None, volume: V1Volume = None, **kwargs): if volume and kwargs: raise ValueError("You can't pass a volume along with other " "kwargs.") name_provided = True init_volume = {} if volume: init_volume = { attr: getattr(volume, attr) for attr in self.attribute_map.keys() } else: if "name" in kwargs: if len(kwargs["name"]) > 63: raise ValueError("PipelineVolume name must be no more than" " 63 characters") init_volume = {"name": kwargs.pop("name")} else: name_provided = False init_volume = {"name": "pvolume-placeholder"} if pvc and kwargs: raise ValueError("You can only pass 'name' along with 'pvc'.") elif pvc and not kwargs: pvc_volume_source = V1PersistentVolumeClaimVolumeSource( claim_name=str(pvc)) init_volume["persistent_volume_claim"] = pvc_volume_source super().__init__(**init_volume, **kwargs) if not name_provided: volume_dict = prune_none_dict_values(self.to_dict()) hash_value = hashlib.sha256( bytes(json.dumps(volume_dict, sort_keys=True), "utf-8")).hexdigest() name = "pvolume-{}".format(hash_value) self.name = name[0:63] if len(name) > 63 else name self.dependent_names = []
[docs] def after(self, *ops): """Creates a duplicate of self with the required dependecies excluding the redundant dependenices. Args: *ops: Pipeline operators to add as dependencies """ def implies(newdep, olddep): if newdep.name == olddep: return True for parentdep_name in newdep.dependent_names: if parentdep_name == olddep: return True else: parentdep = _pipeline.Pipeline.get_default_pipeline( ).ops[parentdep_name] if parentdep: if implies(parentdep, olddep): return True return False ret = self.__class__(volume=self) ret.dependent_names = [op.name for op in ops] for olddep in self.dependent_names: implied = False for newdep in ops: implied = implies(newdep, olddep) if implied: break if not implied: ret.dependent_names.append(olddep) return ret