跳过主要内容

Python 交互

  1. 建立一个新的数据流(dataflow)

    # 建立一个新的基于 Python 的数据流并导行至项目目录
    dora new conversation_py --lang python
    cd conversation_py

    这将创建以下 conversation_py 目录

    .
    ├── dataflow.yml
    ├── node_1
    │ └── node_1.py
    ├── op_1
    │ └── op_1.py
    └── op_2
    └── op_2.py

    您可以删除名为 op_1 和 op_2 的两个运算符,但保留 node_1 因为我们稍后会用到它。

  2. 继续将另一个节点添加到工作区

     dora new --kind custom-node talker --lang python

    现在在您的文本编辑器中打开 talker.py 文件。

  3. 默认节点如何工作

    您的节点现在非常简陋,但这里是默认情况下其中发生的事情的解释。

    • 本节导入并初始化节点。
    from dora import Node

    node = Node()
    • 这部分代码检查节点是否收到了任何输入,如果有,它将打印出一些与输入相关的数据。
    event = node.next()
    if event["type"] == "INPUT":
    print(
    f"""Node received:
    id: {event["id"]},
    value: {event["value"]},
    metadata: {event["metadata"]}"""
    )
  4. 调整 talker 节点

    由于这是 talker 节点,我们将让它将输出发送到另一个节点。 为此,我们只需要添加一行。

      from dora import Node
    import pyarrow as pa

    node = Node()

    event = node.next()
    if event["type"] == "INPUT":
    print(
    f"""Node received:
    id: {event["id"]},
    value: {event["value"]},
    metadata: {event["metadata"]}"""
    )
    node.send_output("speech", pa.array(["Hello World"])) # 添加此行

    这条线看起来很复杂,所以让我们分解一下。

    • 我们使用 send_output 方法将字符串作为箭头数组发送到侦听器节点。
    • 第一个参数是我们想要在数据流中稍后引用的输出ID。
    • 第二个参数,pa.array(["Hello World"]),使用 Apache Arrow 处理数据。 在这里,pa.array 从列表["Hello World"]中创建一个箭头数组。
    • 此处省略了元数据的第三个参数,这表明本教程不需要有关传输的其他数据。
  5. 调整侦听器节点

    将node_1的名称更改为 listener。

      from dora import Node
    import pyarrow as pa

    node = Node()

    event = node.next()
    if event["type"] == "INPUT":
    message = event["value"][0].as_py()
    print(
    f"""I heard {message}"""
    )

    让我们分割这个脚本中的关键行。

    • event["value"] 包含一个 Apache Arrow 数组,这是一个高效处理复杂数据的结构性方法。 通过访问 [0],我们检索了这个数组的第一个元素。
    • .as_py()方法将箭头元素直接转换为原生的 Python 数据类型。
  6. 运行 数据流(dataflow)

    在我们运行数据流之前,我们必须先更改它

    nodes:
    - id: talker
    custom:
    source: talker/talker.py
    inputs:
    tick: dora/timer/secs/1
    outputs:
    - speech

    - id: listener
    custom:
    source: listener/listener.py
    inputs:
    speech: talker/speech

    在我们运行数据流之前,让我们快速回顾一下。

    • talker 节点将每秒发送一次输入,然后使其发送输出。
    • listener 节点将收到来自 talker 节点的输入,然后打印出它监听到的内容。
    • talker 输出的名称对应于talker 节点中设置的id。

    现在运行数据流。

    • 在终端运行
    dora up
    • 然后运行
    dora start dataflow.yml --name conversation
    • 数据流将运行一个循环,然后停止。 现在您将运行
    dora logs conversation listener
    • 您应该会看到 listener 节点打印出消息 “I heard Hello World”
  7. 结论

    本教程将完美结束! 您已经学会了创建和运行自定义 Dora 数据流,并集成了 talker 和 listener 节点。 此设置为更复杂的数据流奠定了基础。 如需进一步探索,请考虑尝试不同的数据类型或探索 Dora 的高级功能。 更多教程即将到来!