Task Replication Example¶
This example demonstrates how to use the replicate_task
functionality to create multiple instances of a task for parallel processing.
Overview¶
The replicate_task
method allows you to create multiple copies of a task with unique IDs, which is useful for:
- Processing multiple data items in parallel
- Load balancing across multiple task instances
- Scaling specific parts of your pipeline
Basic Example¶
import asyncio
from neopipe import Ok, Err, SyncPipeline, AsyncPipeline
from neopipe.task import FunctionSyncTask, FunctionAsyncTask
# Define a data processing task
@FunctionSyncTask.decorator()
def process_data(result):
"""Simulate data processing that takes some time."""
if result.is_err():
return result
data = result.unwrap()
# Simulate processing time
import time
time.sleep(0.1)
return Ok(f"processed_{data}")
# Create a pipeline and replicate the task
pipeline = SyncPipeline(name="DataProcessingPipeline")
# Replicate the task 3 times for parallel processing
replicated_tasks = pipeline.replicate_task(process_data, num_replicas=3)
print(f"Created {len(replicated_tasks)} task replicas")
for i, task in enumerate(replicated_tasks):
print(f"Task {i+1} ID: {task.task_id}")
Parallel Processing Example¶
# Use replicated tasks for parallel processing
def parallel_processing_example():
# Create multiple pipelines with replicated tasks
pipelines = []
for i in range(3):
pipeline = SyncPipeline(name=f"Pipeline_{i+1}")
# Each pipeline gets its own replica of the task
replicated_tasks = pipeline.replicate_task(process_data, num_replicas=1)
pipeline.add_task(replicated_tasks[0])
pipelines.append(pipeline)
# Input data to process
inputs = [Ok("data_1"), Ok("data_2"), Ok("data_3")]
# Process in parallel
results = SyncPipeline.run_parallel(
pipelines=pipelines,
inputs=inputs,
max_workers=3,
debug=True
)
print("Parallel processing results:")
for i, result in enumerate(results.result):
if result.is_ok():
print(f"Pipeline {i+1}: {result.unwrap()}")
else:
print(f"Pipeline {i+1}: Error - {result.unwrap_err()}")
print(f"Total execution time: {results.execution_time:.2f}s")
# Run the example
parallel_processing_example()
Async Task Replication¶
@FunctionAsyncTask.decorator()
async def async_process_data(result):
"""Async version of data processing."""
if result.is_err():
return result
data = result.unwrap()
# Simulate async processing
await asyncio.sleep(0.1)
return Ok(f"async_processed_{data}")
async def async_replication_example():
# Create async pipeline with replicated tasks
pipeline = AsyncPipeline(name="AsyncDataPipeline")
# Replicate async task
replicated_tasks = pipeline.replicate_task(async_process_data, num_replicas=2)
# Add tasks to pipeline
for task in replicated_tasks:
pipeline.add_task(task)
# Process data through the pipeline
result = await pipeline.run_sequence(Ok("async_data"), debug=True)
print(f"Async processing result: {result.result.unwrap()}")
print(f"Execution time: {result.execution_time:.2f}s")
# Show trace information
if result.trace:
print("\nExecution trace:")
for step_name, step_result in result.trace.steps:
status = "✅" if step_result.is_ok() else "❌"
print(f" {status} {step_name}")
# Run async example
asyncio.run(async_replication_example())
Use Cases¶
1. Load Balancing¶
# Replicate a heavy computation task for load balancing
heavy_task = create_heavy_computation_task()
replicas = pipeline.replicate_task(heavy_task, num_replicas=5)
# Distribute work across replicas
for i, replica in enumerate(replicas):
worker_pipeline = SyncPipeline(name=f"Worker_{i+1}")
worker_pipeline.add_task(replica)
2. Fault Tolerance¶
# Create multiple replicas for redundancy
unreliable_task = create_network_request_task()
backup_tasks = pipeline.replicate_task(unreliable_task, num_replicas=3)
# Try each replica until one succeeds
for task in backup_tasks:
result = task(input_data)
if result.is_ok():
break
3. A/B Testing¶
# Replicate task with different configurations
base_task = create_ml_model_task()
variants = pipeline.replicate_task(base_task, num_replicas=2)
# Configure different model parameters
variants[0].config = {"model": "version_a"}
variants[1].config = {"model": "version_b"}
# Compare results
results_a = variants[0](test_data)
results_b = variants[1](test_data)
Key Benefits¶
- Unique IDs: Each replicated task gets a unique
task_id
for tracking - Independent State: Deep copying ensures replicas don't share mutable state
- Flexible Usage: Can be used in both sync and async pipelines
- Scalability: Easy to scale specific bottleneck tasks
- Debugging: Each replica can be traced independently
Best Practices¶
- Resource Management: Be mindful of memory usage when creating many replicas
- Task Design: Ensure tasks are stateless or properly handle state isolation
- Error Handling: Use debug mode to trace issues across replicated tasks
- Performance: Measure actual performance gains from replication
- Monitoring: Track individual replica performance using unique task IDs