dora.builder
1from __future__ import annotations 2 3import yaml 4 5 6class DataflowBuilder: 7 """A dora dataflow.""" 8 9 def __init__(self, name: str = "dora-dataflow"): 10 self.name = name 11 self.nodes = [] 12 13 def add_node(self, id: str, **kwargs) -> Node: 14 """Adds a new node to the dataflow.""" 15 node = Node(id, **kwargs) 16 self.nodes.append(node) 17 return node 18 19 def to_yaml(self, path: str = None) -> str | None: 20 """Generates the YAML representation of the dataflow.""" 21 dataflow_spec = {"nodes": [node.to_dict() for node in self.nodes]} 22 if path: 23 with open(path, "w") as f: 24 yaml.dump(dataflow_spec, f) 25 return None 26 else: 27 return yaml.dump(dataflow_spec) 28 29 def __enter__(self): 30 return self 31 32 def __exit__(self, exc_type, exc_val, exc_tb): 33 pass 34 35 36class Output: 37 """Represents an output from a node.""" 38 39 def __init__(self, node: Node, output_id: str): 40 self.node = node 41 self.output_id = output_id 42 43 def __str__(self) -> str: 44 return f"{self.node.id}/{self.output_id}" 45 46 47class Node: 48 """A node in a dora dataflow.""" 49 50 def __init__(self, id: str, **kwargs): 51 self.id = id 52 self.config = kwargs 53 self.config["id"] = id 54 self.operators = [] 55 56 def path(self, path: str) -> Node: 57 """Sets the path to the executable or script.""" 58 self.config["path"] = path 59 return self 60 61 def args(self, args: str) -> Node: 62 """Sets the command-line arguments for the node.""" 63 self.config["args"] = args 64 return self 65 66 def env(self, env: dict) -> Node: 67 """Sets the environment variables for the node.""" 68 self.config["env"] = env 69 return self 70 71 def build(self, build_command: str) -> Node: 72 """Sets the build command for the node.""" 73 self.config["build"] = build_command 74 return self 75 76 def git( 77 self, url: str, branch: str = None, tag: str = None, rev: str = None 78 ) -> Node: 79 """Sets the Git repository for the node.""" 80 self.config["git"] = url 81 if branch: 82 self.config["branch"] = branch 83 if tag: 84 self.config["tag"] = tag 85 if rev: 86 self.config["rev"] = rev 87 return self 88 89 def add_operator(self, operator: Operator) -> Node: 90 """Adds an operator to this node.""" 91 self.operators.append(operator) 92 return self 93 94 def add_output(self, output_id: str) -> Output: 95 """Adds an output to the node and returns an Output object.""" 96 if "outputs" not in self.config: 97 self.config["outputs"] = [] 98 if output_id not in self.config["outputs"]: 99 self.config["outputs"].append(output_id) 100 return Output(self, output_id) 101 102 def add_input( 103 self, input_id: str, source: str | Output, queue_size: int = None 104 ) -> Node: 105 """Adds a user-defined input to the node. Source can be a string or an Output object.""" 106 if "inputs" not in self.config: 107 self.config["inputs"] = {} 108 109 if isinstance(source, Output): 110 source_str = str(source) 111 if queue_size is not None: 112 self.config["inputs"][input_id] = { 113 "source": source_str, 114 "queue_size": queue_size, 115 } 116 else: 117 self.config["inputs"][input_id] = source_str 118 else: 119 if queue_size is not None: 120 self.config["inputs"][input_id] = { 121 "source": source, 122 "queue_size": queue_size, 123 } 124 else: 125 self.config["inputs"][input_id] = source 126 return self 127 128 def to_dict(self) -> dict: 129 """Returns the dictionary representation of the node.""" 130 config = self.config.copy() 131 if self.operators: 132 config["operators"] = [op.to_dict() for op in self.operators] 133 return config 134 135 136class Operator: 137 """An operator in a dora dataflow.""" 138 139 def __init__( 140 self, 141 id: str, 142 name: str = None, 143 description: str = None, 144 build: str = None, 145 python: str = None, 146 shared_library: str = None, 147 send_stdout_as: str = None, 148 ): 149 self.id = id 150 self.config = {"id": id} 151 if name: 152 self.config["name"] = name 153 if description: 154 self.config["description"] = description 155 if build: 156 self.config["build"] = build 157 if python: 158 self.config["python"] = python 159 if shared_library: 160 self.config["shared-library"] = shared_library 161 if send_stdout_as: 162 self.config["send_stdout_as"] = send_stdout_as 163 164 def to_dict(self) -> dict: 165 """Returns the dictionary representation of the operator.""" 166 return self.config
class
DataflowBuilder:
7class DataflowBuilder: 8 """A dora dataflow.""" 9 10 def __init__(self, name: str = "dora-dataflow"): 11 self.name = name 12 self.nodes = [] 13 14 def add_node(self, id: str, **kwargs) -> Node: 15 """Adds a new node to the dataflow.""" 16 node = Node(id, **kwargs) 17 self.nodes.append(node) 18 return node 19 20 def to_yaml(self, path: str = None) -> str | None: 21 """Generates the YAML representation of the dataflow.""" 22 dataflow_spec = {"nodes": [node.to_dict() for node in self.nodes]} 23 if path: 24 with open(path, "w") as f: 25 yaml.dump(dataflow_spec, f) 26 return None 27 else: 28 return yaml.dump(dataflow_spec) 29 30 def __enter__(self): 31 return self 32 33 def __exit__(self, exc_type, exc_val, exc_tb): 34 pass
A dora dataflow.
14 def add_node(self, id: str, **kwargs) -> Node: 15 """Adds a new node to the dataflow.""" 16 node = Node(id, **kwargs) 17 self.nodes.append(node) 18 return node
Adds a new node to the dataflow.
def
to_yaml(self, path: str = None) -> str | None:
20 def to_yaml(self, path: str = None) -> str | None: 21 """Generates the YAML representation of the dataflow.""" 22 dataflow_spec = {"nodes": [node.to_dict() for node in self.nodes]} 23 if path: 24 with open(path, "w") as f: 25 yaml.dump(dataflow_spec, f) 26 return None 27 else: 28 return yaml.dump(dataflow_spec)
Generates the YAML representation of the dataflow.
class
Output:
37class Output: 38 """Represents an output from a node.""" 39 40 def __init__(self, node: Node, output_id: str): 41 self.node = node 42 self.output_id = output_id 43 44 def __str__(self) -> str: 45 return f"{self.node.id}/{self.output_id}"
Represents an output from a node.
Output(node: Node, output_id: str)
class
Node:
48class Node: 49 """A node in a dora dataflow.""" 50 51 def __init__(self, id: str, **kwargs): 52 self.id = id 53 self.config = kwargs 54 self.config["id"] = id 55 self.operators = [] 56 57 def path(self, path: str) -> Node: 58 """Sets the path to the executable or script.""" 59 self.config["path"] = path 60 return self 61 62 def args(self, args: str) -> Node: 63 """Sets the command-line arguments for the node.""" 64 self.config["args"] = args 65 return self 66 67 def env(self, env: dict) -> Node: 68 """Sets the environment variables for the node.""" 69 self.config["env"] = env 70 return self 71 72 def build(self, build_command: str) -> Node: 73 """Sets the build command for the node.""" 74 self.config["build"] = build_command 75 return self 76 77 def git( 78 self, url: str, branch: str = None, tag: str = None, rev: str = None 79 ) -> Node: 80 """Sets the Git repository for the node.""" 81 self.config["git"] = url 82 if branch: 83 self.config["branch"] = branch 84 if tag: 85 self.config["tag"] = tag 86 if rev: 87 self.config["rev"] = rev 88 return self 89 90 def add_operator(self, operator: Operator) -> Node: 91 """Adds an operator to this node.""" 92 self.operators.append(operator) 93 return self 94 95 def add_output(self, output_id: str) -> Output: 96 """Adds an output to the node and returns an Output object.""" 97 if "outputs" not in self.config: 98 self.config["outputs"] = [] 99 if output_id not in self.config["outputs"]: 100 self.config["outputs"].append(output_id) 101 return Output(self, output_id) 102 103 def add_input( 104 self, input_id: str, source: str | Output, queue_size: int = None 105 ) -> Node: 106 """Adds a user-defined input to the node. Source can be a string or an Output object.""" 107 if "inputs" not in self.config: 108 self.config["inputs"] = {} 109 110 if isinstance(source, Output): 111 source_str = str(source) 112 if queue_size is not None: 113 self.config["inputs"][input_id] = { 114 "source": source_str, 115 "queue_size": queue_size, 116 } 117 else: 118 self.config["inputs"][input_id] = source_str 119 else: 120 if queue_size is not None: 121 self.config["inputs"][input_id] = { 122 "source": source, 123 "queue_size": queue_size, 124 } 125 else: 126 self.config["inputs"][input_id] = source 127 return self 128 129 def to_dict(self) -> dict: 130 """Returns the dictionary representation of the node.""" 131 config = self.config.copy() 132 if self.operators: 133 config["operators"] = [op.to_dict() for op in self.operators] 134 return config
A node in a dora dataflow.
57 def path(self, path: str) -> Node: 58 """Sets the path to the executable or script.""" 59 self.config["path"] = path 60 return self
Sets the path to the executable or script.
62 def args(self, args: str) -> Node: 63 """Sets the command-line arguments for the node.""" 64 self.config["args"] = args 65 return self
Sets the command-line arguments for the node.
67 def env(self, env: dict) -> Node: 68 """Sets the environment variables for the node.""" 69 self.config["env"] = env 70 return self
Sets the environment variables for the node.
72 def build(self, build_command: str) -> Node: 73 """Sets the build command for the node.""" 74 self.config["build"] = build_command 75 return self
Sets the build command for the node.
77 def git( 78 self, url: str, branch: str = None, tag: str = None, rev: str = None 79 ) -> Node: 80 """Sets the Git repository for the node.""" 81 self.config["git"] = url 82 if branch: 83 self.config["branch"] = branch 84 if tag: 85 self.config["tag"] = tag 86 if rev: 87 self.config["rev"] = rev 88 return self
Sets the Git repository for the node.
90 def add_operator(self, operator: Operator) -> Node: 91 """Adds an operator to this node.""" 92 self.operators.append(operator) 93 return self
Adds an operator to this node.
95 def add_output(self, output_id: str) -> Output: 96 """Adds an output to the node and returns an Output object.""" 97 if "outputs" not in self.config: 98 self.config["outputs"] = [] 99 if output_id not in self.config["outputs"]: 100 self.config["outputs"].append(output_id) 101 return Output(self, output_id)
Adds an output to the node and returns an Output object.
103 def add_input( 104 self, input_id: str, source: str | Output, queue_size: int = None 105 ) -> Node: 106 """Adds a user-defined input to the node. Source can be a string or an Output object.""" 107 if "inputs" not in self.config: 108 self.config["inputs"] = {} 109 110 if isinstance(source, Output): 111 source_str = str(source) 112 if queue_size is not None: 113 self.config["inputs"][input_id] = { 114 "source": source_str, 115 "queue_size": queue_size, 116 } 117 else: 118 self.config["inputs"][input_id] = source_str 119 else: 120 if queue_size is not None: 121 self.config["inputs"][input_id] = { 122 "source": source, 123 "queue_size": queue_size, 124 } 125 else: 126 self.config["inputs"][input_id] = source 127 return self
Adds a user-defined input to the node. Source can be a string or an Output object.
def
to_dict(self) -> dict:
129 def to_dict(self) -> dict: 130 """Returns the dictionary representation of the node.""" 131 config = self.config.copy() 132 if self.operators: 133 config["operators"] = [op.to_dict() for op in self.operators] 134 return config
Returns the dictionary representation of the node.
class
Operator:
137class Operator: 138 """An operator in a dora dataflow.""" 139 140 def __init__( 141 self, 142 id: str, 143 name: str = None, 144 description: str = None, 145 build: str = None, 146 python: str = None, 147 shared_library: str = None, 148 send_stdout_as: str = None, 149 ): 150 self.id = id 151 self.config = {"id": id} 152 if name: 153 self.config["name"] = name 154 if description: 155 self.config["description"] = description 156 if build: 157 self.config["build"] = build 158 if python: 159 self.config["python"] = python 160 if shared_library: 161 self.config["shared-library"] = shared_library 162 if send_stdout_as: 163 self.config["send_stdout_as"] = send_stdout_as 164 165 def to_dict(self) -> dict: 166 """Returns the dictionary representation of the operator.""" 167 return self.config
An operator in a dora dataflow.
Operator( id: str, name: str = None, description: str = None, build: str = None, python: str = None, shared_library: str = None, send_stdout_as: str = None)
140 def __init__( 141 self, 142 id: str, 143 name: str = None, 144 description: str = None, 145 build: str = None, 146 python: str = None, 147 shared_library: str = None, 148 send_stdout_as: str = None, 149 ): 150 self.id = id 151 self.config = {"id": id} 152 if name: 153 self.config["name"] = name 154 if description: 155 self.config["description"] = description 156 if build: 157 self.config["build"] = build 158 if python: 159 self.config["python"] = python 160 if shared_library: 161 self.config["shared-library"] = shared_library 162 if send_stdout_as: 163 self.config["send_stdout_as"] = send_stdout_as