Filter Interface
Filter interface is used for dynamic sampling and data filtering in Pipelines.
Location: siirl/user_interface/filter_interface/
Architecture Overview
Filter Interface Architecture
==============================================================================
+------------------+ +-------------------+ +------------------+
| Previous Node | | Filter Node | | Next Node |
| (e.g. Reward) |---->| (COMPUTE type) |---->| (e.g. Advantage)|
+------------------+ +-------------------+ +------------------+
|
v
+---------------+
| Filter Logic |
+---------------+
| 1. Get batch |
| 2. Compute |
| mask |
| 3. Apply |
| filter |
| 4. Return |
| NodeOutput |
+---------------+
==============================================================================
Filter Execution Flow:
Input Batch Filter Function Output
+-----------+ +-------------+ +-----------+
| samples | | | | filtered |
| [0,1,2,3, | -------> | mask = | -------> | samples |
| 4,5,6,7] | | [T,T,F,T, | | [0,1,3,5] |
+-----------+ | F,T,F,F] | +-----------+
+-------------+
|
v
+-------------+
| Metrics: |
| kept_ratio |
| kept_groups |
+-------------+
Built-in Filters
DAPO Dynamic Sampling
Location: siirl/user_interface/filter_interface/dapo.py
Function: dynamic_sampling()
Filters zero-variance sample groups (all correct or all incorrect).
Flow Diagram:
Input: Batch with rewards grouped by uid (prompt)
+-----------------------------------------------------------+
| uid=0: [1.0, 1.0, 1.0, 1.0] -> std=0 -> FILTER OUT |
| uid=1: [1.0, 0.0, 1.0, 0.0] -> std>0 -> KEEP |
| uid=2: [0.0, 0.0, 0.0, 0.0] -> std=0 -> FILTER OUT |
| uid=3: [0.5, 0.8, 0.2, 0.9] -> std>0 -> KEEP |
+-----------------------------------------------------------+
Output: Only uid=1 and uid=3 samples remain
How it works:
Group samples by uid (prompt)
Calculate variance for each group
Filter groups with variance = 0
Configuration:
python -m siirl.main_dag \
algorithm.workflow_type=DAPO \
algorithm.filter_groups.enable=true \
algorithm.filter_groups.metric=seq_final_reward
Usage in Pipeline:
pipeline.add_node(
"dynamic_sampling",
func="siirl.user_interface.filter_interface.dapo:dynamic_sampling",
deps=["function_reward"],
node_type=NodeType.COMPUTE,
node_role=NodeRole.DYNAMIC_SAMPLING
)
Returned Metrics:
dapo_sampling/kept_trajectories_ratiodapo_sampling/kept_groupsdapo_sampling/total_groups
Embodied AI Sampling
Location: siirl/user_interface/filter_interface/embodied.py
Function: embodied_local_rank_sampling()
Filters Embodied AI data based on task completion and accuracy.
Flow Diagram:
Input: Embodied rollout batch
+-----------------------------------------------------------------------+
| |
| Step 1: verify() - Compute accuracy from 'complete' field |
| +-------------------------------------------------------------------+|
| | Sample 0: complete=True -> acc=1.0 ||
| | Sample 1: complete=False -> acc=0.0 ||
| | ... ||
| +-------------------------------------------------------------------+|
| |
| Step 2: _filter_batch() - Apply filters |
| +-------------------------------------------------------------------+|
| | Accuracy Filter (per prompt group): ||
| | prompt_mean_acc >= lower_bound (0.1) AND ||
| | prompt_mean_acc <= upper_bound (0.9) ||
| | ||
| | Truncation Filter: ||
| | finish_step < max_steps (not truncated) ||
| +-------------------------------------------------------------------+|
| |
+-----------------------------------------------------------------------+
Output: Filtered batch (only "learnable" samples)
Features:
Task verification
Accuracy-based filtering
Truncated trajectory filtering
Configuration:
python -m siirl.main_dag \
algorithm.workflow_type=EMBODIED \
algorithm.embodied_sampling.filter_accuracy=true \
algorithm.embodied_sampling.filter_truncated=true \
algorithm.embodied_sampling.accuracy_lower_bound=0.0 \
algorithm.embodied_sampling.accuracy_upper_bound=1.0 \
actor_rollout_ref.embodied.env.max_steps=100
Usage in Pipeline:
pipeline.add_node(
"dynamic_sampling",
func="siirl.user_interface.filter_interface.embodied:embodied_local_rank_sampling",
deps=["rollout_actor"],
node_type=NodeType.COMPUTE,
node_role=NodeRole.DYNAMIC_SAMPLING
)
Custom Filter
Basic Template
from siirl.params import SiiRLArguments
from siirl.dag_worker.data_structures import NodeOutput
from siirl.data_coordinator.sample import filter_tensordict
import torch
def my_custom_filter(
config: SiiRLArguments,
batch,
**kwargs
) -> NodeOutput:
"""Custom filter function"""
# Get data
rewards = batch.batch["rewards"]
# Create filter mask
mask = rewards > threshold # Boolean tensor
# Apply filter
filtered_batch = filter_tensordict(batch, mask)
# Collect metrics
metrics = {
"filter/kept_ratio": mask.sum().item() / len(mask)
}
return NodeOutput(batch=filtered_batch, metrics=metrics)
Example: Reward Threshold Filter
def reward_threshold_filter(
config: SiiRLArguments,
batch,
**kwargs
) -> NodeOutput:
"""Filter samples below reward threshold"""
rewards = batch.batch["rewards"]
threshold = config.algorithm.filter_threshold
# Create mask
mask = rewards > threshold
# Apply filter
from siirl.data_coordinator.sample import filter_tensordict
filtered_batch = filter_tensordict(batch, mask)
# Metrics
metrics = {
"filter/kept_ratio": mask.sum().item() / len(mask),
"filter/threshold": threshold
}
return NodeOutput(batch=filtered_batch, metrics=metrics)
Configuration:
python -m siirl.main_dag \
algorithm.filter_threshold=0.5
Usage in Pipeline:
pipeline.add_node(
"reward_filter",
func="my_module:reward_threshold_filter",
deps=["function_reward"],
node_type=NodeType.COMPUTE,
node_role=NodeRole.DYNAMIC_SAMPLING
)
Example: Group Variance Filter
from collections import defaultdict
def group_variance_filter(
config: SiiRLArguments,
batch,
**kwargs
) -> NodeOutput:
"""Filter groups with low variance"""
rewards = batch.batch["rewards"]
uids = batch.batch["uid"]
# Group by uid
uid_to_rewards = defaultdict(list)
for i, uid in enumerate(uids):
uid_key = int(uid) if hasattr(uid, 'item') else uid
uid_to_rewards[uid_key].append(rewards[i].item())
# Calculate std for each group
min_std = config.algorithm.min_group_std
kept_uids = {
uid for uid, r in uid_to_rewards.items()
if torch.std(torch.tensor(r)).item() >= min_std
}
# Create mask
mask = torch.tensor([
(int(uids[i]) if hasattr(uids[i], 'item') else uids[i]) in kept_uids
for i in range(len(uids))
], dtype=torch.bool)
# Apply filter
from siirl.data_coordinator.sample import filter_tensordict
filtered_batch = filter_tensordict(batch, mask)
metrics = {
"filter/kept_groups": len(kept_uids),
"filter/total_groups": len(uid_to_rewards)
}
return NodeOutput(batch=filtered_batch, metrics=metrics)