# Copyright 2019 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional
import warnings
from ._container_op import BaseOp, ContainerOp
from . import _pipeline_param
class Resource(object):
"""A wrapper over Argo ResourceTemplate definition object
(io.argoproj.workflow.v1alpha1.ResourceTemplate) which is used to represent
the `resource` property in argo's workflow template
(io.argoproj.workflow.v1alpha1.Template)."""
swagger_types = {
"action": "str",
"merge_strategy": "str",
"success_condition": "str",
"failure_condition": "str",
"set_owner_reference": "bool",
"manifest": "str",
"flags": "list[str]"
}
openapi_types = {
"action": "str",
"merge_strategy": "str",
"success_condition": "str",
"failure_condition": "str",
"set_owner_reference": "bool",
"manifest": "str",
"flags": "list[str]"
}
attribute_map = {
"action": "action",
"merge_strategy": "mergeStrategy",
"success_condition": "successCondition",
"failure_condition": "failureCondition",
"set_owner_reference": "setOwnerReference",
"manifest": "manifest",
"flags": "flags"
}
def __init__(self,
action: str = None,
merge_strategy: str = None,
success_condition: str = None,
failure_condition: str = None,
set_owner_reference: bool = None,
manifest: str = None,
flags: Optional[List[str]] = None):
"""Create a new instance of Resource."""
self.action = action
self.merge_strategy = merge_strategy
self.success_condition = success_condition
self.failure_condition = failure_condition
self.set_owner_reference = set_owner_reference
self.manifest = manifest
self.flags = flags
[docs]class ResourceOp(BaseOp):
"""Represents an op which will be translated into a resource template.
TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the
stability level of this feature.
Args:
k8s_resource: A k8s resource which will be submitted to the cluster
action: One of "create"/"delete"/"apply"/"patch" (default is "create")
merge_strategy: The merge strategy for the "apply" action
success_condition: The successCondition of the template
failure_condition: The failureCondition of the template
For more info see:
https://github.com/argoproj/argo-workflows/blob/master/examples/k8s-jobs.yaml
attribute_outputs: Maps output labels to resource's json paths,
similarly to file_outputs of ContainerOp
kwargs: name, sidecars. See BaseOp definition
Raises:
ValueError: if not inside a pipeline
if the name is an invalid string
if no k8s_resource is provided
if merge_strategy is set without "apply" action
"""
def __init__(self,
k8s_resource=None,
action: str = "create",
merge_strategy: str = None,
success_condition: str = None,
failure_condition: str = None,
set_owner_reference: bool = None,
attribute_outputs: Optional[Dict[str, str]] = None,
flags: Optional[List[str]] = None,
**kwargs):
super().__init__(**kwargs)
self.attrs_with_pipelineparams = list(self.attrs_with_pipelineparams)
self.attrs_with_pipelineparams.extend(
["_resource", "k8s_resource", "attribute_outputs"])
if k8s_resource is None:
raise ValueError("You need to provide a k8s_resource.")
if action == "delete":
warnings.warn(
'Please use `kubernetes_resource_delete_op` instead of '
'`ResourceOp(action="delete")`', DeprecationWarning)
if merge_strategy and action != "apply":
raise ValueError(
"You can't set merge_strategy when action != 'apply'")
# if action is delete, there should not be any outputs, success_condition,
# and failure_condition
if action == "delete" and (success_condition or failure_condition or
attribute_outputs):
raise ValueError(
"You can't set success_condition, failure_condition, or "
"attribute_outputs when action == 'delete'")
if action == "delete" and flags is None:
flags = ["--wait=false"]
init_resource = {
"action": action,
"merge_strategy": merge_strategy,
"success_condition": success_condition,
"failure_condition": failure_condition,
"set_owner_reference": set_owner_reference,
"flags": flags
}
# `resource` prop in `io.argoproj.workflow.v1alpha1.Template`
self._resource = Resource(**init_resource)
self.k8s_resource = k8s_resource
# if action is delete, there should not be any outputs, success_condition,
# and failure_condition
if action == "delete":
self.attribute_outputs = {}
self.outputs = {}
self.output = None
return
# Set attribute_outputs
extra_attribute_outputs = \
attribute_outputs if attribute_outputs else {}
self.attribute_outputs = \
self.attribute_outputs if hasattr(self, "attribute_outputs") \
else {}
self.attribute_outputs.update(extra_attribute_outputs)
# Add name and manifest if not specified by the user
if "name" not in self.attribute_outputs:
self.attribute_outputs["name"] = "{.metadata.name}"
if "manifest" not in self.attribute_outputs:
self.attribute_outputs["manifest"] = "{}"
# Set outputs
self.outputs = {
name: _pipeline_param.PipelineParam(name, op_name=self.name)
for name in self.attribute_outputs.keys()
}
# If user set a single attribute_output, set self.output as that
# parameter, else set it as the resource name
self.output = self.outputs["name"]
if len(extra_attribute_outputs) == 1:
self.output = self.outputs[list(extra_attribute_outputs)[0]]
@property
def resource(self):
"""`Resource` object that represents the `resource` property in
`io.argoproj.workflow.v1alpha1.Template`."""
return self._resource
[docs] def delete(self, flags: Optional[List[str]] = None):
"""Returns a ResourceOp which deletes the resource."""
if self.resource.action == "delete":
raise ValueError("This operation is already a resource deletion.")
if isinstance(self.k8s_resource, dict):
kind = self.k8s_resource["kind"]
else:
kind = self.k8s_resource.kind
return kubernetes_resource_delete_op(
name=self.outputs["name"],
kind=kind,
flags=flags or ["--wait=false"])
def kubernetes_resource_delete_op(
name: str,
kind: str,
namespace: str = None,
flags: Optional[List[str]] = None,
) -> ContainerOp:
"""Operation that deletes a Kubernetes resource.
Outputs:
name: The name of the deleted resource
"""
command = [
"kubectl", "delete",
str(kind),
str(name), "--ignore-not-found", "--output", "name"
]
if namespace:
command.extend(["--namespace", str(namespace)])
if flags:
command.extend(flags)
result = ContainerOp(
name="kubernetes_resource_delete",
image="gcr.io/cloud-builders/kubectl",
command=command,
)
return result