Pipeline API
Pipeline is a declarative Python API for defining training workflows. Each Pipeline consists of Nodes connected through dependencies to form a DAG.
Architecture Overview
Pipeline Architecture
==============================================================================
+------------------+ +------------------+
| Pipeline | .build() | TaskGraph |
| (Builder) | ------------------> | (DAG) |
+------------------+ +------------------+
| - pipeline_id | | - graph_id |
| - description | | - nodes: Dict |
| - _nodes: Dict | | - adj: Dict |
+------------------+ | - rev_adj: Dict |
+------------------+
|
| executed by
v
+------------------+
| DAGWorker |
| (per GPU) |
+------------------+
==============================================================================
Built-in Pipelines Comparison:
+----------+------------------------------------------------------------------+
| Pipeline | Nodes Flow |
+----------+------------------------------------------------------------------+
| GRPO | rollout -> reward -> advantage -> old_log -> ref_log -> train |
+----------+------------------------------------------------------------------+
| PPO | rollout -> reward -> value -> advantage -> old_log -> ref_log |
| | -> train_actor -> train_critic |
+----------+------------------------------------------------------------------+
| DAPO | rollout -> reward -> dynamic_sampling -> advantage -> old_log |
| | -> ref_log -> train |
+----------+------------------------------------------------------------------+
| Embodied | rollout -> embodied_sampling -> reward -> advantage -> old_log |
| SRPO | -> ref_log -> train |
+----------+------------------------------------------------------------------+
Basic Usage
Creating a Pipeline
from siirl.execution.dag.pipeline import Pipeline
from siirl.execution.dag.node import NodeType, NodeRole
pipeline = Pipeline("my_pipeline", "Description")
# Add nodes (supports chaining)
pipeline.add_node(
"node_id",
func="module:function", # or "module:Class.method"
deps=["dependency_node_ids"],
node_type=NodeType.COMPUTE,
node_role=NodeRole.DEFAULT
).add_node(
"next_node",
func="module:another_function",
deps=["node_id"],
node_type=NodeType.MODEL_TRAIN,
node_role=NodeRole.ACTOR
)
# Build TaskGraph
task_graph = pipeline.build()
Node Parameters
node_id: Unique identifierfunc: Function path ("module:function"or"module:Class.method")deps: List of dependency node IDsnode_type: MODEL_INFERENCE / MODEL_TRAIN / COMPUTE / DATA_LOADnode_role: ROLLOUT / ACTOR / CRITIC / REFERENCE / REWARD / ADVANTAGE / DYNAMIC_SAMPLING / DEFAULTonly_forward_compute: Forward only (default False)
Built-in Pipelines
siiRL provides 4 built-in pipelines in siirl/execution/dag/builtin_pipelines.py:
GRPO Pipeline
Workflow: rollout → reward → advantage → old_log_prob → ref_log_prob → train_actor
Usage:
python -m siirl.main_dag \
algorithm.adv_estimator=grpo
PPO Pipeline
Workflow: rollout → reward → critic_value → advantage → old_log_prob → ref_log_prob → train_actor → train_critic
Key Difference: Adds value function and critic training
Usage:
python -m siirl.main_dag \
algorithm.adv_estimator=gae \
critic.enable=true
DAPO Pipeline
Workflow: rollout → reward → dynamic_sampling → advantage → old_log_prob → ref_log_prob → train_actor
Key Feature: Filters zero-variance sample groups
Usage:
python -m siirl.main_dag \
algorithm.workflow_type=DAPO \
algorithm.filter_groups.enable=true
Embodied GRPO Pipeline
Workflow: rollout → embodied_sampling → reward → advantage → old_log_prob → ref_log_prob → train_actor
Key Feature: Embodied AI specific filtering
Usage:
python -m siirl.main_dag \
algorithm.workflow_type=EMBODIED
Custom Pipeline Definition
Define Custom Pipeline
from siirl.execution.dag.pipeline import Pipeline
from siirl.execution.dag.task_graph import TaskGraph
from siirl.execution.dag.node import NodeType, NodeRole
def my_custom_pipeline() -> TaskGraph:
pipeline = Pipeline("my_pipeline", "My workflow")
pipeline.add_node(
"rollout_actor",
func="siirl.dag_worker.dagworker:DAGWorker.generate",
deps=[],
node_type=NodeType.MODEL_INFERENCE,
node_role=NodeRole.ROLLOUT
).add_node(
"my_custom_node",
func="my_module:my_function",
deps=["rollout_actor"],
node_type=NodeType.COMPUTE,
node_role=NodeRole.DEFAULT
)
return pipeline.build()
Custom Node Function
Node functions must follow this signature:
from siirl.dag_worker.data_structures import NodeOutput
def my_function(batch, config=None, **kwargs) -> NodeOutput:
"""
Args:
batch: Input data (TensorDict)
config: Global configuration
**kwargs: Additional arguments
Returns:
NodeOutput(batch=processed_batch, metrics={})
"""
# Process batch
processed_batch = process(batch)
# Collect metrics
metrics = {"metric_name": value}
return NodeOutput(batch=processed_batch, metrics=metrics)
Use Custom Pipeline
Command Line:
python -m siirl.main_dag \
dag.custom_pipeline_fn="my_module:my_custom_pipeline"