Source code for kfp.dsl._resource_op

# Copyright 2019 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List, Optional
import warnings

from ._container_op import BaseOp, ContainerOp
from . import _pipeline_param


class Resource(object):
    """A wrapper over Argo ResourceTemplate definition object
    (io.argoproj.workflow.v1alpha1.ResourceTemplate) which is used to represent
    the `resource` property in argo's workflow template
    (io.argoproj.workflow.v1alpha1.Template)."""
    swagger_types = {
        "action": "str",
        "merge_strategy": "str",
        "success_condition": "str",
        "failure_condition": "str",
        "set_owner_reference": "bool",
        "manifest": "str",
        "flags": "list[str]"
    }
    openapi_types = {
        "action": "str",
        "merge_strategy": "str",
        "success_condition": "str",
        "failure_condition": "str",
        "set_owner_reference": "bool",
        "manifest": "str",
        "flags": "list[str]"
    }
    attribute_map = {
        "action": "action",
        "merge_strategy": "mergeStrategy",
        "success_condition": "successCondition",
        "failure_condition": "failureCondition",
        "set_owner_reference": "setOwnerReference",
        "manifest": "manifest",
        "flags": "flags"
    }

    def __init__(self,
                 action: str = None,
                 merge_strategy: str = None,
                 success_condition: str = None,
                 failure_condition: str = None,
                 set_owner_reference: bool = None,
                 manifest: str = None,
                 flags: Optional[List[str]] = None):
        """Create a new instance of Resource."""
        self.action = action
        self.merge_strategy = merge_strategy
        self.success_condition = success_condition
        self.failure_condition = failure_condition
        self.set_owner_reference = set_owner_reference
        self.manifest = manifest
        self.flags = flags


[docs]class ResourceOp(BaseOp): """Represents an op which will be translated into a resource template. TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the stability level of this feature. Args: k8s_resource: A k8s resource which will be submitted to the cluster action: One of "create"/"delete"/"apply"/"patch" (default is "create") merge_strategy: The merge strategy for the "apply" action success_condition: The successCondition of the template failure_condition: The failureCondition of the template For more info see: https://github.com/argoproj/argo-workflows/blob/master/examples/k8s-jobs.yaml attribute_outputs: Maps output labels to resource's json paths, similarly to file_outputs of ContainerOp kwargs: name, sidecars. See BaseOp definition Raises: ValueError: if not inside a pipeline if the name is an invalid string if no k8s_resource is provided if merge_strategy is set without "apply" action """ def __init__(self, k8s_resource=None, action: str = "create", merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, set_owner_reference: bool = None, attribute_outputs: Optional[Dict[str, str]] = None, flags: Optional[List[str]] = None, **kwargs): super().__init__(**kwargs) self.attrs_with_pipelineparams = list(self.attrs_with_pipelineparams) self.attrs_with_pipelineparams.extend( ["_resource", "k8s_resource", "attribute_outputs"]) if k8s_resource is None: raise ValueError("You need to provide a k8s_resource.") if action == "delete": warnings.warn( 'Please use `kubernetes_resource_delete_op` instead of ' '`ResourceOp(action="delete")`', DeprecationWarning) if merge_strategy and action != "apply": raise ValueError( "You can't set merge_strategy when action != 'apply'") # if action is delete, there should not be any outputs, success_condition, # and failure_condition if action == "delete" and (success_condition or failure_condition or attribute_outputs): raise ValueError( "You can't set success_condition, failure_condition, or " "attribute_outputs when action == 'delete'") if action == "delete" and flags is None: flags = ["--wait=false"] init_resource = { "action": action, "merge_strategy": merge_strategy, "success_condition": success_condition, "failure_condition": failure_condition, "set_owner_reference": set_owner_reference, "flags": flags } # `resource` prop in `io.argoproj.workflow.v1alpha1.Template` self._resource = Resource(**init_resource) self.k8s_resource = k8s_resource # if action is delete, there should not be any outputs, success_condition, # and failure_condition if action == "delete": self.attribute_outputs = {} self.outputs = {} self.output = None return # Set attribute_outputs extra_attribute_outputs = \ attribute_outputs if attribute_outputs else {} self.attribute_outputs = \ self.attribute_outputs if hasattr(self, "attribute_outputs") \ else {} self.attribute_outputs.update(extra_attribute_outputs) # Add name and manifest if not specified by the user if "name" not in self.attribute_outputs: self.attribute_outputs["name"] = "{.metadata.name}" if "manifest" not in self.attribute_outputs: self.attribute_outputs["manifest"] = "{}" # Set outputs self.outputs = { name: _pipeline_param.PipelineParam(name, op_name=self.name) for name in self.attribute_outputs.keys() } # If user set a single attribute_output, set self.output as that # parameter, else set it as the resource name self.output = self.outputs["name"] if len(extra_attribute_outputs) == 1: self.output = self.outputs[list(extra_attribute_outputs)[0]] @property def resource(self): """`Resource` object that represents the `resource` property in `io.argoproj.workflow.v1alpha1.Template`.""" return self._resource
[docs] def delete(self, flags: Optional[List[str]] = None): """Returns a ResourceOp which deletes the resource.""" if self.resource.action == "delete": raise ValueError("This operation is already a resource deletion.") if isinstance(self.k8s_resource, dict): kind = self.k8s_resource["kind"] else: kind = self.k8s_resource.kind return kubernetes_resource_delete_op( name=self.outputs["name"], kind=kind, flags=flags or ["--wait=false"])
def kubernetes_resource_delete_op( name: str, kind: str, namespace: str = None, flags: Optional[List[str]] = None, ) -> ContainerOp: """Operation that deletes a Kubernetes resource. Outputs: name: The name of the deleted resource """ command = [ "kubectl", "delete", str(kind), str(name), "--ignore-not-found", "--output", "name" ] if namespace: command.extend(["--namespace", str(namespace)]) if flags: command.extend(flags) result = ContainerOp( name="kubernetes_resource_delete", image="gcr.io/cloud-builders/kubectl", command=command, ) return result