Code Structure
This document describes the code structure and architecture of siiRL.
Directory Structure
siirl/
├── main_dag.py # Main entry point
├── dag_worker/ # DAG Worker implementation
├── execution/ # Execution engine
├── engine/ # Model engine
├── data_coordinator/ # Data coordination
├── params/ # Configuration parameters
├── environment/ # Environment abstraction
└── user_interface/ # User interface
Core Modules
dag_worker/
DAG execution unit, one worker per GPU.
dag_worker/
├── dagworker.py # Core Worker class (~1320 lines)
├── core_algos.py # RL algorithm implementations
├── dag_utils.py # Utility functions
├── checkpoint_manager.py # Checkpoint management
├── metrics_collector.py # Metrics collection
├── metric_aggregator.py # Metrics aggregation
├── validator.py # Validation logic
├── constants.py # Constants
└── data_structures.py # Data structures
Responsibilities:
Execute TaskGraph nodes
Manage model Workers (Actor/Critic/Rollout/Reference/Reward)
Data flow and caching
Metrics collection and reporting
Checkpoint saving and loading
execution/
Execution engine for DAG definition, scheduling, and metrics aggregation.
execution/
├── dag/ # DAG definition
│ ├── task_graph.py # TaskGraph class
│ ├── node.py # Node class
│ ├── builtin_pipelines.py # Built-in Pipelines
│ ├── pipeline.py # Pipeline Builder API
│ ├── config_loader.py # Configuration loader
│ └── task_loader.py # Task loader
├── scheduler/ # Task scheduling
│ ├── task_scheduler.py # Task scheduler
│ ├── launch.py # Ray launcher
│ ├── process_group_manager.py # Process group manager
│ ├── graph_updater.py # Graph updater
│ ├── reward.py # Reward scheduler
│ ├── enums.py # Enum definitions
│ └── resource_manager.py # Resource manager
├── metric_worker/ # Distributed metrics aggregation
│ ├── metric_worker.py # MetricWorker
│ └── utils.py
└── rollout_flow/ # Rollout flow
├── multi_agent/ # Multi-agent support
└── multiturn/ # Multi-turn interaction
Responsibilities:
DAG definition and validation
Task scheduling and resource allocation
Distributed metrics collection
Multi-agent/multi-turn interaction flow
engine/
Model execution engine containing all model workers.
engine/
├── actor/ # Actor models
│ ├── base.py
│ ├── dp_actor.py # FSDP Actor
│ ├── megatron_actor.py # Megatron Actor
│ └── embodied_actor.py # Embodied Actor
├── critic/ # Critic models
│ ├── base.py
│ ├── dp_critic.py
│ └── megatron_critic.py
├── rollout/ # Rollout engine
│ ├── base.py
│ ├── vllm_rollout/ # vLLM backend
│ ├── sglang_rollout/ # SGLang backend
│ ├── hf_rollout.py # HuggingFace backend
│ └── embodied_rollout.py # Embodied Rollout
├── reward_model/ # Reward models
├── reward_manager/ # Reward managers
│ ├── naive.py # Simple reward
│ ├── batch.py # Batch Reward Model
│ ├── parallel.py # Parallel Reward Model
│ ├── dapo.py # DAPO Reward
│ └── embodied.py # Embodied Reward
├── sharding_manager/ # Sharding management
├── base_worker/ # Worker base classes
├── fsdp_workers.py # FSDP Worker
└── megatron_workers.py # Megatron Worker
Responsibilities:
Training and inference for Actor/Critic/Rollout/Reference/Reward models
Support for FSDP and Megatron backends
Support for vLLM/SGLang/HuggingFace inference backends
data_coordinator/
Data coordinator for distributed data management.
data_coordinator/
├── data_buffer.py # Distributed data buffer
├── dataloader/ # Data loading
│ ├── data_loader_node.py
│ ├── partitioned_dataset.py
│ ├── embodied_preprocess.py
│ └── vision_utils.py
├── protocol.py # Data protocol
└── sample.py # Sampling logic
Responsibilities:
Distributed data buffering (per-server)
Data loading (per-GPU)
Data redistribution and load balancing
params/
Parameter configuration using Hydra.
params/
├── __init__.py # SiiRLArguments
├── parser.py # Configuration parser
├── data_args.py # Data parameters
├── model_args.py # Model parameters
├── training_args.py # Training parameters
├── dag_args.py # DAG parameters
├── embodied_args.py # Embodied parameters
└── profiler_args.py # Profiler parameters
environment/
Environment abstraction for Embodied AI and multi-agent systems.
environment/
└── embodied/
├── base.py # Environment base class
├── venv.py # Vectorized environment
└── adapters/ # Environment adapters
└── libero.py # Libero adapter
user_interface/
User-defined interfaces.
user_interface/
├── filter_interface/
│ ├── dapo.py # DAPO dynamic sampling
│ └── embodied.py # Embodied data filtering
└── rewards_interface/
└── custom_gsm8k_reward.py # Custom reward example
Purpose: Provides interfaces for user-defined node functions.
Data Structures
NodeOutput
Return value from node execution.
@dataclass
class NodeOutput:
batch: Any # Data batch
metrics: Dict = None # Metrics
info: Dict = None # Additional info
Node
DAG node definition.
@dataclass
class Node:
node_id: str # Node ID
node_type: NodeType # Node type
node_role: NodeRole # Node role
dependencies: List[str] # Dependency nodes
executable: Callable # Executable function
executable_ref: str # Function path
only_forward_compute: bool # Forward only
Enumerations
NodeType:
class NodeType(Enum):
MODEL_INFERENCE = "model_inference"
MODEL_TRAIN = "model_train"
COMPUTE = "compute"
DATA_LOAD = "data_load"
NodeRole:
class NodeRole(Enum):
ROLLOUT = "rollout"
ACTOR = "actor"
CRITIC = "critic"
REFERENCE = "reference"
REWARD = "reward"
ADVANTAGE = "advantage"
DYNAMIC_SAMPLING = "dynamic_sampling"
DEFAULT = "default"
AdvantageEstimator:
class AdvantageEstimator(Enum):
GRPO = "grpo"
GAE = "gae"
CPGD = "cpgd"
GSPO = "gspo"
WorkflowType:
class WorkflowType(Enum):
DEFAULT = "DEFAULT"
DAPO = "DAPO"
EMBODIED = "EMBODIED"
Execution Flow
Startup Flow (main_dag.py)
1. Parse configuration (parse_config)
2. Load Pipeline (load_pipeline)
3. Initialize DataBuffer (init_data_coordinator)
4. Initialize MetricWorker
5. Task scheduling (TaskScheduler)
6. Launch Ray cluster (RayTrainer)
7. Create DAGWorker (one per GPU)
8. Execute training (DAGWorker.execute_task_graph)
DAGWorker Execution Flow
1. Initialize Workers (Actor/Critic/Rollout/Reference/Reward)
2. Initialize DataLoader
3. Initialize Validator
4. Load Checkpoint (if exists)
5. Training loop:
- Load data
- Execute nodes in topological order
- Collect metrics
- Save Checkpoint
- Validate (if needed)
Node Execution Flow
1. DAGWorker gets node's executable function
2. Call function with current batch
3. Function processes data, returns NodeOutput
4. Update batch, pass to next node
5. Collect node metrics
Key Concepts
TaskGraph
Directed Acyclic Graph representing training workflow.
Core Methods:
add_node(): Add nodebuild_adjacency_lists(): Build adjacency listsvalidate_graph(): Validate DAGget_execution_order(): Get topological sort
Pipeline
Declarative API for building TaskGraph.
Core Methods:
add_node(): Add node (supports chaining)build(): Build and validate TaskGraph
DAGWorker Class
Execution unit per GPU.
Core Methods:
generate(): Rollout generationcompute_reward(): Compute rewardcompute_advantage(): Compute advantagecompute_old_log_prob(): Old policy log probcompute_ref_log_prob(): Reference model log probcompute_value(): Value function (PPO)train_actor(): Train actortrain_critic(): Train critic (PPO)
Configuration Parameters
Main Configuration Groups
algorithm:
adv_estimator: grpo # grpo/gae/cpgd/gspo
workflow_type: DEFAULT # DEFAULT/DAPO/EMBODIED
data:
train_files: /path/to/train.parquet
train_batch_size: 512
max_prompt_length: 2048
max_response_length: 4096
actor_rollout_ref:
model:
path: /path/to/model
actor:
optim:
lr: 1e-6
ppo_mini_batch_size: 256
rollout:
name: vllm # vllm/sglang/hf
tensor_model_parallel_size: 2
n: 8 # GRPO group size
trainer:
n_gpus_per_node: 8
nnodes: 1
total_epochs: 30
save_freq: 10
dag:
custom_pipeline_fn: null # Custom Pipeline
Extension Points
Custom Pipeline
Add new functions in siirl/execution/dag/builtin_pipelines.py.
Custom Node Functions
Implement functions following the signature:
def my_node(batch, config=None, **kwargs) -> NodeOutput:
return NodeOutput(batch=batch, metrics={})
Custom Reward Manager
Add new classes in siirl/engine/reward_manager/.
Custom Environment
Add new environment classes in siirl/environment/.