dora.builder

  1from __future__ import annotations
  2
  3import yaml
  4
  5
  6class DataflowBuilder:
  7    """A dora dataflow."""
  8
  9    def __init__(self, name: str = "dora-dataflow"):
 10        self.name = name
 11        self.nodes = []
 12
 13    def add_node(self, id: str, **kwargs) -> Node:
 14        """Adds a new node to the dataflow."""
 15        node = Node(id, **kwargs)
 16        self.nodes.append(node)
 17        return node
 18
 19    def to_yaml(self, path: str = None) -> str | None:
 20        """Generates the YAML representation of the dataflow."""
 21        dataflow_spec = {"nodes": [node.to_dict() for node in self.nodes]}
 22        if path:
 23            with open(path, "w") as f:
 24                yaml.dump(dataflow_spec, f)
 25            return None
 26        else:
 27            return yaml.dump(dataflow_spec)
 28
 29    def __enter__(self):
 30        return self
 31
 32    def __exit__(self, exc_type, exc_val, exc_tb):
 33        pass
 34
 35
 36class Output:
 37    """Represents an output from a node."""
 38
 39    def __init__(self, node: Node, output_id: str):
 40        self.node = node
 41        self.output_id = output_id
 42
 43    def __str__(self) -> str:
 44        return f"{self.node.id}/{self.output_id}"
 45
 46
 47class Node:
 48    """A node in a dora dataflow."""
 49
 50    def __init__(self, id: str, **kwargs):
 51        self.id = id
 52        self.config = kwargs
 53        self.config["id"] = id
 54        self.operators = []
 55
 56    def path(self, path: str) -> Node:
 57        """Sets the path to the executable or script."""
 58        self.config["path"] = path
 59        return self
 60
 61    def args(self, args: str) -> Node:
 62        """Sets the command-line arguments for the node."""
 63        self.config["args"] = args
 64        return self
 65
 66    def env(self, env: dict) -> Node:
 67        """Sets the environment variables for the node."""
 68        self.config["env"] = env
 69        return self
 70
 71    def build(self, build_command: str) -> Node:
 72        """Sets the build command for the node."""
 73        self.config["build"] = build_command
 74        return self
 75
 76    def git(
 77        self, url: str, branch: str = None, tag: str = None, rev: str = None
 78    ) -> Node:
 79        """Sets the Git repository for the node."""
 80        self.config["git"] = url
 81        if branch:
 82            self.config["branch"] = branch
 83        if tag:
 84            self.config["tag"] = tag
 85        if rev:
 86            self.config["rev"] = rev
 87        return self
 88
 89    def add_operator(self, operator: Operator) -> Node:
 90        """Adds an operator to this node."""
 91        self.operators.append(operator)
 92        return self
 93
 94    def add_output(self, output_id: str) -> Output:
 95        """Adds an output to the node and returns an Output object."""
 96        if "outputs" not in self.config:
 97            self.config["outputs"] = []
 98        if output_id not in self.config["outputs"]:
 99            self.config["outputs"].append(output_id)
100        return Output(self, output_id)
101
102    def add_input(
103        self, input_id: str, source: str | Output, queue_size: int = None
104    ) -> Node:
105        """Adds a user-defined input to the node. Source can be a string or an Output object."""
106        if "inputs" not in self.config:
107            self.config["inputs"] = {}
108
109        if isinstance(source, Output):
110            source_str = str(source)
111            if queue_size is not None:
112                self.config["inputs"][input_id] = {
113                    "source": source_str,
114                    "queue_size": queue_size,
115                }
116            else:
117                self.config["inputs"][input_id] = source_str
118        else:
119            if queue_size is not None:
120                self.config["inputs"][input_id] = {
121                    "source": source,
122                    "queue_size": queue_size,
123                }
124            else:
125                self.config["inputs"][input_id] = source
126        return self
127
128    def to_dict(self) -> dict:
129        """Returns the dictionary representation of the node."""
130        config = self.config.copy()
131        if self.operators:
132            config["operators"] = [op.to_dict() for op in self.operators]
133        return config
134
135
136class Operator:
137    """An operator in a dora dataflow."""
138
139    def __init__(
140        self,
141        id: str,
142        name: str = None,
143        description: str = None,
144        build: str = None,
145        python: str = None,
146        shared_library: str = None,
147        send_stdout_as: str = None,
148    ):
149        self.id = id
150        self.config = {"id": id}
151        if name:
152            self.config["name"] = name
153        if description:
154            self.config["description"] = description
155        if build:
156            self.config["build"] = build
157        if python:
158            self.config["python"] = python
159        if shared_library:
160            self.config["shared-library"] = shared_library
161        if send_stdout_as:
162            self.config["send_stdout_as"] = send_stdout_as
163
164    def to_dict(self) -> dict:
165        """Returns the dictionary representation of the operator."""
166        return self.config
class DataflowBuilder:
 7class DataflowBuilder:
 8    """A dora dataflow."""
 9
10    def __init__(self, name: str = "dora-dataflow"):
11        self.name = name
12        self.nodes = []
13
14    def add_node(self, id: str, **kwargs) -> Node:
15        """Adds a new node to the dataflow."""
16        node = Node(id, **kwargs)
17        self.nodes.append(node)
18        return node
19
20    def to_yaml(self, path: str = None) -> str | None:
21        """Generates the YAML representation of the dataflow."""
22        dataflow_spec = {"nodes": [node.to_dict() for node in self.nodes]}
23        if path:
24            with open(path, "w") as f:
25                yaml.dump(dataflow_spec, f)
26            return None
27        else:
28            return yaml.dump(dataflow_spec)
29
30    def __enter__(self):
31        return self
32
33    def __exit__(self, exc_type, exc_val, exc_tb):
34        pass

A dora dataflow.

DataflowBuilder(name: str = 'dora-dataflow')
10    def __init__(self, name: str = "dora-dataflow"):
11        self.name = name
12        self.nodes = []
name
nodes
def add_node(self, id: str, **kwargs) -> Node:
14    def add_node(self, id: str, **kwargs) -> Node:
15        """Adds a new node to the dataflow."""
16        node = Node(id, **kwargs)
17        self.nodes.append(node)
18        return node

Adds a new node to the dataflow.

def to_yaml(self, path: str = None) -> str | None:
20    def to_yaml(self, path: str = None) -> str | None:
21        """Generates the YAML representation of the dataflow."""
22        dataflow_spec = {"nodes": [node.to_dict() for node in self.nodes]}
23        if path:
24            with open(path, "w") as f:
25                yaml.dump(dataflow_spec, f)
26            return None
27        else:
28            return yaml.dump(dataflow_spec)

Generates the YAML representation of the dataflow.

class Output:
37class Output:
38    """Represents an output from a node."""
39
40    def __init__(self, node: Node, output_id: str):
41        self.node = node
42        self.output_id = output_id
43
44    def __str__(self) -> str:
45        return f"{self.node.id}/{self.output_id}"

Represents an output from a node.

Output(node: Node, output_id: str)
40    def __init__(self, node: Node, output_id: str):
41        self.node = node
42        self.output_id = output_id
node
output_id
class Node:
 48class Node:
 49    """A node in a dora dataflow."""
 50
 51    def __init__(self, id: str, **kwargs):
 52        self.id = id
 53        self.config = kwargs
 54        self.config["id"] = id
 55        self.operators = []
 56
 57    def path(self, path: str) -> Node:
 58        """Sets the path to the executable or script."""
 59        self.config["path"] = path
 60        return self
 61
 62    def args(self, args: str) -> Node:
 63        """Sets the command-line arguments for the node."""
 64        self.config["args"] = args
 65        return self
 66
 67    def env(self, env: dict) -> Node:
 68        """Sets the environment variables for the node."""
 69        self.config["env"] = env
 70        return self
 71
 72    def build(self, build_command: str) -> Node:
 73        """Sets the build command for the node."""
 74        self.config["build"] = build_command
 75        return self
 76
 77    def git(
 78        self, url: str, branch: str = None, tag: str = None, rev: str = None
 79    ) -> Node:
 80        """Sets the Git repository for the node."""
 81        self.config["git"] = url
 82        if branch:
 83            self.config["branch"] = branch
 84        if tag:
 85            self.config["tag"] = tag
 86        if rev:
 87            self.config["rev"] = rev
 88        return self
 89
 90    def add_operator(self, operator: Operator) -> Node:
 91        """Adds an operator to this node."""
 92        self.operators.append(operator)
 93        return self
 94
 95    def add_output(self, output_id: str) -> Output:
 96        """Adds an output to the node and returns an Output object."""
 97        if "outputs" not in self.config:
 98            self.config["outputs"] = []
 99        if output_id not in self.config["outputs"]:
100            self.config["outputs"].append(output_id)
101        return Output(self, output_id)
102
103    def add_input(
104        self, input_id: str, source: str | Output, queue_size: int = None
105    ) -> Node:
106        """Adds a user-defined input to the node. Source can be a string or an Output object."""
107        if "inputs" not in self.config:
108            self.config["inputs"] = {}
109
110        if isinstance(source, Output):
111            source_str = str(source)
112            if queue_size is not None:
113                self.config["inputs"][input_id] = {
114                    "source": source_str,
115                    "queue_size": queue_size,
116                }
117            else:
118                self.config["inputs"][input_id] = source_str
119        else:
120            if queue_size is not None:
121                self.config["inputs"][input_id] = {
122                    "source": source,
123                    "queue_size": queue_size,
124                }
125            else:
126                self.config["inputs"][input_id] = source
127        return self
128
129    def to_dict(self) -> dict:
130        """Returns the dictionary representation of the node."""
131        config = self.config.copy()
132        if self.operators:
133            config["operators"] = [op.to_dict() for op in self.operators]
134        return config

A node in a dora dataflow.

Node(id: str, **kwargs)
51    def __init__(self, id: str, **kwargs):
52        self.id = id
53        self.config = kwargs
54        self.config["id"] = id
55        self.operators = []
id
config
operators
def path(self, path: str) -> Node:
57    def path(self, path: str) -> Node:
58        """Sets the path to the executable or script."""
59        self.config["path"] = path
60        return self

Sets the path to the executable or script.

def args(self, args: str) -> Node:
62    def args(self, args: str) -> Node:
63        """Sets the command-line arguments for the node."""
64        self.config["args"] = args
65        return self

Sets the command-line arguments for the node.

def env(self, env: dict) -> Node:
67    def env(self, env: dict) -> Node:
68        """Sets the environment variables for the node."""
69        self.config["env"] = env
70        return self

Sets the environment variables for the node.

def build(self, build_command: str) -> Node:
72    def build(self, build_command: str) -> Node:
73        """Sets the build command for the node."""
74        self.config["build"] = build_command
75        return self

Sets the build command for the node.

def git( self, url: str, branch: str = None, tag: str = None, rev: str = None) -> Node:
77    def git(
78        self, url: str, branch: str = None, tag: str = None, rev: str = None
79    ) -> Node:
80        """Sets the Git repository for the node."""
81        self.config["git"] = url
82        if branch:
83            self.config["branch"] = branch
84        if tag:
85            self.config["tag"] = tag
86        if rev:
87            self.config["rev"] = rev
88        return self

Sets the Git repository for the node.

def add_operator(self, operator: Operator) -> Node:
90    def add_operator(self, operator: Operator) -> Node:
91        """Adds an operator to this node."""
92        self.operators.append(operator)
93        return self

Adds an operator to this node.

def add_output(self, output_id: str) -> Output:
 95    def add_output(self, output_id: str) -> Output:
 96        """Adds an output to the node and returns an Output object."""
 97        if "outputs" not in self.config:
 98            self.config["outputs"] = []
 99        if output_id not in self.config["outputs"]:
100            self.config["outputs"].append(output_id)
101        return Output(self, output_id)

Adds an output to the node and returns an Output object.

def add_input( self, input_id: str, source: str | Output, queue_size: int = None) -> Node:
103    def add_input(
104        self, input_id: str, source: str | Output, queue_size: int = None
105    ) -> Node:
106        """Adds a user-defined input to the node. Source can be a string or an Output object."""
107        if "inputs" not in self.config:
108            self.config["inputs"] = {}
109
110        if isinstance(source, Output):
111            source_str = str(source)
112            if queue_size is not None:
113                self.config["inputs"][input_id] = {
114                    "source": source_str,
115                    "queue_size": queue_size,
116                }
117            else:
118                self.config["inputs"][input_id] = source_str
119        else:
120            if queue_size is not None:
121                self.config["inputs"][input_id] = {
122                    "source": source,
123                    "queue_size": queue_size,
124                }
125            else:
126                self.config["inputs"][input_id] = source
127        return self

Adds a user-defined input to the node. Source can be a string or an Output object.

def to_dict(self) -> dict:
129    def to_dict(self) -> dict:
130        """Returns the dictionary representation of the node."""
131        config = self.config.copy()
132        if self.operators:
133            config["operators"] = [op.to_dict() for op in self.operators]
134        return config

Returns the dictionary representation of the node.

class Operator:
137class Operator:
138    """An operator in a dora dataflow."""
139
140    def __init__(
141        self,
142        id: str,
143        name: str = None,
144        description: str = None,
145        build: str = None,
146        python: str = None,
147        shared_library: str = None,
148        send_stdout_as: str = None,
149    ):
150        self.id = id
151        self.config = {"id": id}
152        if name:
153            self.config["name"] = name
154        if description:
155            self.config["description"] = description
156        if build:
157            self.config["build"] = build
158        if python:
159            self.config["python"] = python
160        if shared_library:
161            self.config["shared-library"] = shared_library
162        if send_stdout_as:
163            self.config["send_stdout_as"] = send_stdout_as
164
165    def to_dict(self) -> dict:
166        """Returns the dictionary representation of the operator."""
167        return self.config

An operator in a dora dataflow.

Operator( id: str, name: str = None, description: str = None, build: str = None, python: str = None, shared_library: str = None, send_stdout_as: str = None)
140    def __init__(
141        self,
142        id: str,
143        name: str = None,
144        description: str = None,
145        build: str = None,
146        python: str = None,
147        shared_library: str = None,
148        send_stdout_as: str = None,
149    ):
150        self.id = id
151        self.config = {"id": id}
152        if name:
153            self.config["name"] = name
154        if description:
155            self.config["description"] = description
156        if build:
157            self.config["build"] = build
158        if python:
159            self.config["python"] = python
160        if shared_library:
161            self.config["shared-library"] = shared_library
162        if send_stdout_as:
163            self.config["send_stdout_as"] = send_stdout_as
id
config
def to_dict(self) -> dict:
165    def to_dict(self) -> dict:
166        """Returns the dictionary representation of the operator."""
167        return self.config

Returns the dictionary representation of the operator.