Source code for kfp.dsl.task_config

"""Definition for TaskConfig."""

import dataclasses
from typing import Any, Dict, List, Optional


# TaskConfig needs to stay aligned with the TaskConfig in backend/src/v2/driver/driver.go.
[docs]@dataclasses.dataclass class TaskConfig: """Configurations for a task. Annotate a component parameter with this type when you want the task's runtime configuration to be forwarded to an external workload and optionally applied to the task's own pod. This is useful when the task launches another Kubernetes resource (for example, a Kubeflow Trainer job). This is an empty value by default and is populated if the component is annotated with task_config_passthroughs. All fields are optional and map 1:1 to fragments of the Kubernetes Pod spec. These fields have values as Python dictionaries/lists that conform to the Kubernetes JSON schema. Example: :: @dsl.component( packages_to_install=["kubernetes"], task_config_passthroughs=[ dsl.TaskConfigField.RESOURCES, dsl.TaskConfigField.KUBERNETES_TOLERATIONS, dsl.TaskConfigField.KUBERNETES_NODE_SELECTOR, dsl.TaskConfigField.KUBERNETES_AFFINITY, dsl.TaskConfigPassthrough(field=dsl.TaskConfigField.ENV, apply_to_task=True), dsl.TaskConfigPassthrough(field=dsl.TaskConfigField.KUBERNETES_VOLUMES, apply_to_task=True), ], ) def train(num_nodes: int, workspace_path: str, output_model: dsl.Output[dsl.Model], task_config: dsl.TaskConfig): import os import shutil from kubernetes import client as k8s_client, config config.load_incluster_config() with open( "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r" ) as ns_file: namespace = ns_file.readline() train_job_script = "with open('/kfp-workspace/model', 'w') as f: f.write('hello')" dataset_path = os.path.join(workspace_path, "dataset") with open(dataset_path, "w") as f: f.write("Prepare dataset here...") train_job = { "apiVersion": "trainer.kubeflow.org/v1alpha1", "kind": "TrainJob", "metadata": {"name": f"kfp-train-job", "namespace": namespace}, "spec": { "runtimeRef": {"name": "torch-distributed"}, "trainer": { "numNodes": num_nodes, "resourcesPerNode": task_config.resources, "env": task_config.env, "command": ["python", "-c", train_job_script], }, "podSpecOverrides": [ { "targetJobs": [{"name": "node"}], "volumes": task_config.volumes, "containers": [ { "name": "node", "volumeMounts": task_config.volume_mounts, } ], "nodeSelector": task_config.node_selector, "tolerations": task_config.tolerations, } ], }, } print(train_job) api_client = k8s_client.ApiClient() custom_objects_api = k8s_client.CustomObjectsApi(api_client) response = custom_objects_api.create_namespaced_custom_object( group="trainer.kubeflow.org", version="v1alpha1", namespace=namespace, plural="trainjobs", body=train_job, ) job_name = response["metadata"]["name"] print(f"TrainJob {job_name} created successfully") print("Polling train job code goes here...") print("Copying output model") shutil.copy(os.path.join(workspace_path, "model"), output_model.path) @dsl.pipeline def example_task_config(): train_task = train(num_nodes=1, workspace_path=dsl.WORKSPACE_PATH_PLACEHOLDER) train_task.set_cpu_request("1") train_task.set_memory_request("20Gi") train_task.set_cpu_limit("2") train_task.set_memory_limit("50Gi") train_task.set_accelerator_type("nvidia.com/gpu") train_task.set_accelerator_limit("1") """ affinity: Optional[Dict[str, Any]] = None tolerations: Optional[List[Dict[str, Any]]] = None node_selector: Optional[Dict[str, str]] = None env: Optional[List[Dict[str, Any]]] = None volumes: Optional[List[Dict[str, Any]]] = None volume_mounts: Optional[List[Dict[str, Any]]] = None resources: Optional[Dict[str, Any]] = None