# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import logging
import os
import re
import subprocess
import tempfile
import warnings
from collections import deque
from typing import Any, Callable, Dict, List, Mapping, Optional, Union, cast
from . import dsl
from .compiler.compiler import sanitize_k8s_name
class _Dag:
"""DAG stands for Direct Acyclic Graph.
DAG here is used to decide the order to execute pipeline ops.
For more information on DAG, please refer to `wiki <https://en.wikipedia.org/wiki/Directed_acyclic_graph>`_.
"""
def __init__(self, nodes: List[str]) -> None:
"""
Args::
nodes: List of DAG nodes, each node is identified by an unique name.
"""
self._graph = {node: [] for node in nodes}
self._reverse_graph = {node: [] for node in nodes}
@property
def graph(self):
return self._graph
@property
def reverse_graph(self):
return self._reverse_graph
def add_edge(self, edge_source: str, edge_target: str) -> None:
"""Add an edge between DAG nodes.
Args::
edge_source: the source node of the edge
edge_target: the target node of the edge
"""
self._graph[edge_source].append(edge_target)
self._reverse_graph[edge_target].append(edge_source)
def get_follows(self, source_node: str) -> List[str]:
"""Get all target nodes start from the specified source node.
Args::
source_node: the source node
"""
return self._graph.get(source_node, [])
def get_dependencies(self, target_node: str) -> List[str]:
"""Get all source nodes end with the specified target node.
Args::
target_node: the target node
"""
return self._reverse_graph.get(target_node, [])
def topological_sort(self) -> List[str]:
"""List DAG nodes in topological order."""
in_degree = {node: 0 for node in self._graph.keys()}
for i in self._graph:
for j in self._graph[i]:
in_degree[j] += 1
queue = deque()
for node, degree in in_degree.items():
if degree == 0:
queue.append(node)
sorted_nodes = []
while queue:
u = queue.popleft()
sorted_nodes.append(u)
for node in self._graph[u]:
in_degree[node] -= 1
if in_degree[node] == 0:
queue.append(node)
return sorted_nodes
def _extract_pipeline_param(param: str) -> dsl.PipelineParam:
"""Extract PipelineParam from string."""
matches = re.findall(r"{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+)}}",
param)
op_dependency_name = matches[0][0]
output_file_name = matches[0][1]
return dsl.PipelineParam(output_file_name, op_dependency_name)
def _get_op(ops: List[dsl.ContainerOp],
op_name: str) -> Union[dsl.ContainerOp, None]:
"""Get the first op with specified op name."""
return next(filter(lambda op: op.name == op_name, ops), None)
def _get_subgroup(groups: List[dsl.OpsGroup],
group_name: str) -> Union[dsl.OpsGroup, None]:
"""Get the first OpsGroup with specified group name."""
return next(filter(lambda g: g.name == group_name, groups), None)
[docs]class LocalClient:
[docs] class ExecutionMode:
"""Configuration to decide whether the client executes a component in
docker or in local process."""
DOCKER = "docker"
LOCAL = "local"
def __init__(
self,
mode: str = DOCKER,
images_to_exclude: List[str] = [],
ops_to_exclude: List[str] = [],
docker_options: List[str] = [],
) -> None:
"""Constructor.
Args:
mode: Default execution mode, default 'docker'
images_to_exclude: If the image of op is in images_to_exclude, the op is
executed in the mode different from default_mode.
ops_to_exclude: If the name of op is in ops_to_exclude, the op is
executed in the mode different from default_mode.
docker_options: Docker options used in docker mode,
e.g. docker_options=["-e", "foo=bar"].
"""
if mode not in [self.DOCKER, self.LOCAL]:
raise Exception(
"Invalid execution mode, must be docker of local")
self._mode = mode
self._images_to_exclude = images_to_exclude
self._ops_to_exclude = ops_to_exclude
self._docker_options = docker_options
@property
def mode(self) -> str:
return self._mode
@property
def images_to_exclude(self) -> List[str]:
return self._images_to_exclude
@property
def ops_to_exclude(self) -> List[str]:
return self._ops_to_exclude
@property
def docker_options(self) -> List[str]:
return self._docker_options
def __init__(self, pipeline_root: Optional[str] = None) -> None:
"""Construct the instance of LocalClient.
Args:
pipeline_root: The root directory where the output artifact of component
will be saved.
"""
warnings.warn(
'LocalClient is an Alpha[1] feature. It may be deprecated in the future.\n'
'[1] https://github.com/kubeflow/pipelines/blob/master/docs/release/feature-stages.md#alpha',
category=FutureWarning,
)
pipeline_root = pipeline_root or tempfile.tempdir
self._pipeline_root = pipeline_root
def _find_base_group(self, groups: List[dsl.OpsGroup],
op_name: str) -> Union[dsl.OpsGroup, None]:
"""Find the base group of op in candidate group list."""
if groups is None or len(groups) == 0:
return None
for group in groups:
if _get_op(group.ops, op_name):
return group
else:
_parent_group = self._find_base_group(group.groups, op_name)
if _parent_group:
return group
return None
def _create_group_dag(self, pipeline_dag: _Dag,
group: dsl.OpsGroup) -> _Dag:
"""Create DAG within current group, it's a DAG of direct ops and direct
subgroups.
Each node of the DAG is either an op or a subgroup. For each
node in current group, if one of its DAG follows is also an op
in current group, add an edge to this follow op, otherwise, if
this follow belongs to subgroups, add an edge to its subgroup.
If this node has dependency from subgroups, then add an edge
from this subgroup to current node.
"""
group_dag = _Dag([op.name for op in group.ops] +
[g.name for g in group.groups])
for op in group.ops:
for follow in pipeline_dag.get_follows(op.name):
if _get_op(group.ops, follow) is not None:
# add edge between direct ops
group_dag.add_edge(op.name, follow)
else:
_base_group = self._find_base_group(group.groups, follow)
if _base_group:
# add edge to direct subgroup
group_dag.add_edge(op.name, _base_group.name)
for dependency in pipeline_dag.get_dependencies(op.name):
if _get_op(group.ops, dependency) is None:
_base_group = self._find_base_group(group.groups,
dependency)
if _base_group:
# add edge from direct subgroup
group_dag.add_edge(_base_group.name, op.name)
return group_dag
def _create_op_dag(self, p: dsl.Pipeline) -> _Dag:
"""Create the DAG of the pipeline ops."""
dag = _Dag(p.ops.keys())
for op in p.ops.values():
# dependencies defined by inputs
for input_value in op.inputs:
if isinstance(input_value, dsl.PipelineParam):
input_param = _extract_pipeline_param(input_value.pattern)
if input_param.op_name:
dag.add_edge(input_param.op_name, op.name)
else:
logging.debug("%s depend on pipeline param", op.name)
# explicit dependencies of current op
for dependent in op.dependent_names:
dag.add_edge(dependent, op.name)
return dag
def _make_output_file_path_unique(self, run_name: str, op_name: str,
output_file: str) -> str:
"""Alter the file path of output artifact to make sure it's unique in
local runner.
kfp compiler will bound a tmp file for each component output,
which is unique in kfp runtime, but not unique in local runner.
We alter the file path of the name of current run and op, to
make it unique in local runner.
"""
if not output_file.startswith("/tmp/"):
return output_file
return f'{self._pipeline_root}/{run_name}/{op_name.lower()}/{output_file[len("/tmp/"):]}'
def _get_output_file_path(
self,
run_name: str,
pipeline: dsl.Pipeline,
op_name: str,
output_name: str = None,
) -> str:
"""Get the file path of component output."""
op_dependency = pipeline.ops[op_name]
if output_name is None and len(op_dependency.file_outputs) == 1:
output_name = next(iter(op_dependency.file_outputs.keys()))
output_file = op_dependency.file_outputs[output_name]
unique_output_file = self._make_output_file_path_unique(
run_name, op_name, output_file)
return unique_output_file
def _generate_cmd_for_subprocess_execution(
self,
run_name: str,
pipeline: dsl.Pipeline,
op: dsl.ContainerOp,
stack: Dict[str, Any],
) -> List[str]:
"""Generate shell command to run the op locally."""
cmd = op.command + op.arguments
# In debug mode, for `python -c cmd` format command, pydev will insert code before
# `cmd`, but there is no newline at the end of the inserted code, which will cause
# syntax error, so we add newline before `cmd`.
for i in range(len(cmd)):
if cmd[i] == "-c":
cmd[i + 1] = "\n" + cmd[i + 1]
for index, cmd_item in enumerate(cmd):
if cmd_item in stack: # Argument is LoopArguments item
cmd[index] = str(stack[cmd_item])
elif cmd_item in op.file_outputs.values(
): # Argument is output file
output_name = next(
filter(lambda item: item[1] == cmd_item,
op.file_outputs.items()))[0]
output_param = op.outputs[output_name]
output_file = cmd_item
output_file = self._make_output_file_path_unique(
run_name, output_param.op_name, output_file)
os.makedirs(os.path.dirname(output_file), exist_ok=True)
cmd[index] = output_file
elif (cmd_item in op.input_artifact_paths.values()
): # Argument is input artifact file
input_name = next(
filter(
lambda item: item[1] == cmd_item,
op.input_artifact_paths.items(),
))[0]
input_param_pattern = op.artifact_arguments[input_name]
pipeline_param = _extract_pipeline_param(input_param_pattern)
input_file = self._get_output_file_path(run_name, pipeline,
pipeline_param.op_name,
pipeline_param.name)
cmd[index] = input_file
return cmd
def _generate_cmd_for_docker_execution(
self,
run_name: str,
pipeline: dsl.Pipeline,
op: dsl.ContainerOp,
stack: Dict[str, Any],
docker_options: List[str] = []
) -> List[str]:
"""Generate the command to run the op in docker locally."""
cmd = self._generate_cmd_for_subprocess_execution(
run_name, pipeline, op, stack)
docker_cmd = [
"docker",
"run",
*docker_options,
"-v",
"{pipeline_root}:{pipeline_root}".format(
pipeline_root=self._pipeline_root),
op.image,
] + cmd
return docker_cmd
def _run_group_dag(
self,
run_name: str,
pipeline: dsl.Pipeline,
pipeline_dag: _Dag,
current_group: dsl.OpsGroup,
stack: Dict[str, Any],
execution_mode: ExecutionMode,
) -> bool:
"""Run ops in current group in topological order.
Args:
pipeline: kfp.dsl.Pipeline
pipeline_dag: DAG of pipeline ops
current_group: current ops group
stack: stack to trace `LoopArguments`
execution_mode: Configuration to decide whether the client executes
component in docker or in local process.
Returns:
True if succeed to run the group dag.
"""
group_dag = self._create_group_dag(pipeline_dag, current_group)
for node in group_dag.topological_sort():
subgroup = _get_subgroup(current_group.groups, node)
if subgroup is not None: # Node of DAG is subgroup
success = self._run_group(run_name, pipeline, pipeline_dag, subgroup,
stack, execution_mode)
if not success:
return False
else: # Node of DAG is op
op = _get_op(current_group.ops, node)
execution_mode = (
execution_mode
if execution_mode else LocalClient.ExecutionMode())
can_run_locally = execution_mode.mode == LocalClient.ExecutionMode.LOCAL
exclude = (
op.image in execution_mode.images_to_exclude or
op.name in execution_mode.ops_to_exclude)
if exclude:
can_run_locally = not can_run_locally
if can_run_locally:
cmd = self._generate_cmd_for_subprocess_execution(
run_name, pipeline, op, stack)
else:
cmd = self._generate_cmd_for_docker_execution(
run_name, pipeline, op, stack, execution_mode.docker_options)
process = subprocess.Popen(
cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
# TODO support async process
logging.info("start task:%s", op.name)
stdout, stderr = process.communicate()
if stdout:
logging.info(stdout)
if stderr:
logging.error(stderr)
if process.returncode != 0:
logging.error(cmd)
return False
def _run_group(
self,
run_name: str,
pipeline: dsl.Pipeline,
pipeline_dag: _Dag,
current_group: dsl.OpsGroup,
stack: Dict[str, Any],
execution_mode: ExecutionMode,
) -> bool:
"""Run all ops in current group.
Args:
run_name: str, the name of this run, can be used to query the run result
pipeline: kfp.dsl.Pipeline
pipeline_dag: DAG of pipeline ops
current_group: current ops group
stack: stack to trace `LoopArguments`
execution_mode: Configuration to decide whether the client executes
component in docker or in local process.
Returns:
True if succeed to run the group.
"""
if current_group.type == dsl.ParallelFor.TYPE_NAME:
current_group = cast(dsl.ParallelFor, current_group)
if current_group.items_is_pipeline_param:
_loop_args = current_group.loop_args
_param_name = _loop_args.name[:-len(_loop_args
.LOOP_ITEM_NAME_BASE) - 1]
_op_dependency = pipeline.ops[_loop_args.op_name]
_list_file = _op_dependency.file_outputs[_param_name]
_altered_list_file = self._make_output_file_path_unique(
run_name, _loop_args.op_name, _list_file)
with open(_altered_list_file, "r") as f:
_param_values = json.load(f)
for index, _param_value in enumerate(_param_values):
if isinstance(_param_values, (dict, list)):
_param_value = json.dumps(_param_value)
stack[_loop_args.pattern] = _param_value
loop_run_name = "{run_name}/{loop_index}".format(
run_name=run_name, loop_index=index)
success = self._run_group_dag(
loop_run_name,
pipeline,
pipeline_dag,
current_group,
stack,
execution_mode,
)
del stack[_loop_args.pattern]
if not success:
return False
return True
else:
raise Exception("Not implemented")
else:
return self._run_group_dag(run_name, pipeline, pipeline_dag, current_group,
stack, execution_mode)
[docs] def create_run_from_pipeline_func(
self,
pipeline_func: Callable,
arguments: Mapping[str, str],
execution_mode: ExecutionMode = ExecutionMode(),
):
"""Runs a pipeline locally, either using Docker or in a local process.
Parameters:
pipeline_func: pipeline function
arguments: Arguments to the pipeline function provided as a dict, reference
to `kfp.client.create_run_from_pipeline_func`
execution_mode: Configuration to decide whether the client executes component
in docker or in local process.
"""
class RunPipelineResult:
def __init__(self, client: LocalClient, pipeline: dsl.Pipeline,
run_id: str, success: bool):
self._client = client
self._pipeline = pipeline
self.run_id = run_id
self._success = success
def get_output_file(self, op_name: str, output: str = None):
return self._client._get_output_file_path(
self.run_id, self._pipeline, op_name, output)
def success(self) -> bool:
return self._success
def __repr__(self):
return "RunPipelineResult(run_id={})".format(self.run_id)
pipeline_name = sanitize_k8s_name(
getattr(pipeline_func, "_component_human_name", None) or
pipeline_func.__name__)
with dsl.Pipeline(pipeline_name) as pipeline:
pipeline_func(**arguments)
run_version = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
run_name = pipeline.name.replace(" ", "_").lower() + "_" + run_version
pipeline_dag = self._create_op_dag(pipeline)
success = self._run_group(run_name, pipeline, pipeline_dag, pipeline.groups[0],
{}, execution_mode)
return RunPipelineResult(self, pipeline, run_name, success=success)