Pipeline API

Pipeline is a declarative Python API for defining training workflows. Each Pipeline consists of Nodes connected through dependencies to form a DAG.

Architecture Overview

                         Pipeline Architecture
==============================================================================

+------------------+                      +------------------+
|    Pipeline      |     .build()         |   TaskGraph      |
|    (Builder)     | ------------------> |     (DAG)        |
+------------------+                      +------------------+
| - pipeline_id    |                      | - graph_id       |
| - description    |                      | - nodes: Dict    |
| - _nodes: Dict   |                      | - adj: Dict      |
+------------------+                      | - rev_adj: Dict  |
                                          +------------------+
                                                  |
                                                  | executed by
                                                  v
                                          +------------------+
                                          |   DAGWorker      |
                                          |   (per GPU)      |
                                          +------------------+

==============================================================================

Built-in Pipelines Comparison:

+----------+------------------------------------------------------------------+
| Pipeline | Nodes Flow                                                       |
+----------+------------------------------------------------------------------+
| GRPO     | rollout -> reward -> advantage -> old_log -> ref_log -> train    |
+----------+------------------------------------------------------------------+
| PPO      | rollout -> reward -> value -> advantage -> old_log -> ref_log    |
|          |         -> train_actor -> train_critic                           |
+----------+------------------------------------------------------------------+
| DAPO     | rollout -> reward -> dynamic_sampling -> advantage -> old_log    |
|          |         -> ref_log -> train                                      |
+----------+------------------------------------------------------------------+
| Embodied | rollout -> embodied_sampling -> reward -> advantage -> old_log   |
| SRPO     |         -> ref_log -> train                                      |
+----------+------------------------------------------------------------------+

Basic Usage

Creating a Pipeline

from siirl.execution.dag.pipeline import Pipeline
from siirl.execution.dag.node import NodeType, NodeRole

pipeline = Pipeline("my_pipeline", "Description")

# Add nodes (supports chaining)
pipeline.add_node(
    "node_id",
    func="module:function",  # or "module:Class.method"
    deps=["dependency_node_ids"],
    node_type=NodeType.COMPUTE,
    node_role=NodeRole.DEFAULT
).add_node(
    "next_node",
    func="module:another_function",
    deps=["node_id"],
    node_type=NodeType.MODEL_TRAIN,
    node_role=NodeRole.ACTOR
)

# Build TaskGraph
task_graph = pipeline.build()

Node Parameters

  • node_id: Unique identifier

  • func: Function path ("module:function" or "module:Class.method")

  • deps: List of dependency node IDs

  • node_type: MODEL_INFERENCE / MODEL_TRAIN / COMPUTE / DATA_LOAD

  • node_role: ROLLOUT / ACTOR / CRITIC / REFERENCE / REWARD / ADVANTAGE / DYNAMIC_SAMPLING / DEFAULT

  • only_forward_compute: Forward only (default False)

Built-in Pipelines

siiRL provides 4 built-in pipelines in siirl/execution/dag/builtin_pipelines.py:

GRPO Pipeline

Workflow: rollout → reward → advantage → old_log_prob → ref_log_prob → train_actor

Usage:

python -m siirl.main_dag \
  algorithm.adv_estimator=grpo

PPO Pipeline

Workflow: rollout → reward → critic_value → advantage → old_log_prob → ref_log_prob → train_actor → train_critic

Key Difference: Adds value function and critic training

Usage:

python -m siirl.main_dag \
  algorithm.adv_estimator=gae \
  critic.enable=true

DAPO Pipeline

Workflow: rollout → reward → dynamic_sampling → advantage → old_log_prob → ref_log_prob → train_actor

Key Feature: Filters zero-variance sample groups

Usage:

python -m siirl.main_dag \
  algorithm.workflow_type=DAPO \
  algorithm.filter_groups.enable=true

Embodied GRPO Pipeline

Workflow: rollout → embodied_sampling → reward → advantage → old_log_prob → ref_log_prob → train_actor

Key Feature: Embodied AI specific filtering

Usage:

python -m siirl.main_dag \
  algorithm.workflow_type=EMBODIED

Custom Pipeline Definition

Define Custom Pipeline

from siirl.execution.dag.pipeline import Pipeline
from siirl.execution.dag.task_graph import TaskGraph
from siirl.execution.dag.node import NodeType, NodeRole

def my_custom_pipeline() -> TaskGraph:
    pipeline = Pipeline("my_pipeline", "My workflow")

    pipeline.add_node(
        "rollout_actor",
        func="siirl.dag_worker.dagworker:DAGWorker.generate",
        deps=[],
        node_type=NodeType.MODEL_INFERENCE,
        node_role=NodeRole.ROLLOUT
    ).add_node(
        "my_custom_node",
        func="my_module:my_function",
        deps=["rollout_actor"],
        node_type=NodeType.COMPUTE,
        node_role=NodeRole.DEFAULT
    )

    return pipeline.build()

Custom Node Function

Node functions must follow this signature:

from siirl.dag_worker.data_structures import NodeOutput

def my_function(batch, config=None, **kwargs) -> NodeOutput:
    """
    Args:
        batch: Input data (TensorDict)
        config: Global configuration
        **kwargs: Additional arguments

    Returns:
        NodeOutput(batch=processed_batch, metrics={})
    """
    # Process batch
    processed_batch = process(batch)

    # Collect metrics
    metrics = {"metric_name": value}

    return NodeOutput(batch=processed_batch, metrics=metrics)

Use Custom Pipeline

Command Line:

python -m siirl.main_dag \
  dag.custom_pipeline_fn="my_module:my_custom_pipeline"