OpenAI Async Pipeline with Task Replication Example¶
This example demonstrates using NeoPipe's replicate_task
method with AsyncPipeline to create multiple instances of the same OpenAI task and run them asynchronously. We'll focus on task replication for efficient parallel processing.
Overview¶
We'll create an OpenAI summarization system that:
- Uses a single OpenAI async task configuration
- Replicates the task multiple times using replicate_task
method
- Demonstrates async task replication with concurrent execution
- Shows how replicated tasks can process different inputs simultaneously
- Compares performance and efficiency of replicated async tasks
Complete Example¶
import os
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass
from openai import AsyncOpenAI
from dotenv import load_dotenv
from neopipe import Result, Ok, Err
from neopipe.task import ClassAsyncTask
from neopipe.async_pipeline import AsyncPipeline
# Load environment variables
load_dotenv()
@dataclass
class TextInput:
"""Simple input structure for text summarization."""
text: str
id: str = ""
@dataclass
class SummaryOutput:
"""Output structure for summarization results."""
original_text: str
summary: str
tokens_used: int
task_id: str
class AsyncOpenAISummarizationTask(ClassAsyncTask[TextInput, str]):
"""
An async task that calls OpenAI API to summarize text using a consistent system prompt.
This demonstrates:
- Async class-based task configuration
- Non-blocking external API integration
- Comprehensive async error handling
- Result type usage in async context
"""
def __init__(self,
system_prompt: str,
model: str = "gpt-3.5-turbo",
retries: int = 3):
super().__init__(retries=retries)
self.system_prompt = system_prompt
self.model = model
self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
async def execute(self, input_result: Result[TextInput, str]) -> Result[SummaryOutput, str]:
"""Execute the async summarization task."""
if input_result.is_err():
return input_result
try:
text_data = input_result.unwrap()
# Validate input
if not text_data.text.strip():
return Err(f"Empty content for text ID: {text_data.id}")
# Call OpenAI API asynchronously
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": text_data.text}
],
max_tokens=150,
temperature=0.3
)
# Extract summary
summary = response.choices[0].message.content
if not summary:
return Err(f"No summary generated for text ID: {text_data.id}")
# Create result
result = SummaryOutput(
original_text=text_data.text,
summary=summary.strip(),
tokens_used=response.usage.total_tokens,
task_id=self.task_id
)
return Ok(result)
except Exception as e:
return Err(f"OpenAI API error for text ID {text_data.id}: {str(e)} (Task: {self.task_id})")
def create_sample_texts() -> List[TextInput]:
"""Create sample texts for demonstration."""
return [
TextInput(
text="""
Artificial Intelligence is rapidly transforming various industries,
from healthcare to finance. Machine learning algorithms are becoming
more sophisticated, enabling computers to perform tasks that previously
required human intelligence. Deep learning, a subset of machine learning,
uses neural networks with multiple layers to process and analyze complex data.
""",
id="tech_article"
),
TextInput(
text="""
Climate change continues to be a pressing global issue. Rising temperatures,
melting ice caps, and extreme weather events are becoming more frequent.
Scientists worldwide are working on innovative solutions including renewable
energy technologies, carbon capture methods, and sustainable agriculture
practices to mitigate the effects of global warming.
""",
id="climate_news"
),
TextInput(
text="""
Space exploration has entered a new era with private companies joining
government agencies in pursuing ambitious missions. Mars exploration,
asteroid mining, and space tourism are no longer science fiction but
achievable goals. Advanced rocket technology and international cooperation
are driving unprecedented progress in our understanding of the cosmos.
""",
id="space_exploration"
),
TextInput(
text="""
Recent medical breakthroughs in gene therapy and personalized medicine
are revolutionizing healthcare. CRISPR technology allows precise genetic
editing, while immunotherapy is showing remarkable success in cancer
treatment. Telemedicine and AI-assisted diagnosis are making healthcare
more accessible and accurate than ever before.
""",
id="medical_breakthrough"
),
TextInput(
text="""
Quantum computing represents a paradigm shift in computational power.
Unlike classical computers that use bits, quantum computers use qubits
that can exist in multiple states simultaneously. This quantum superposition
enables solving complex problems exponentially faster, with applications
in cryptography, drug discovery, and financial modeling.
""",
id="quantum_computing"
)
]
async def run_replicated_async_tasks():
"""
Demonstrate async task replication using AsyncPipeline.replicate_task method.
This shows how to:
- Create a single async task configuration
- Replicate it multiple times with unique IDs
- Run replicated tasks concurrently on different inputs
- Track which task processed which input
"""
print("🔄 ASYNC TASK REPLICATION DEMO")
print("=" * 60)
print("Replicating async OpenAI task and running concurrently...")
# System prompt used for all summarizations
system_prompt = """
You are a professional summarizer. Create concise, informative summaries
that capture the key points and main ideas of the given text. Focus on
the most important information and maintain a clear, readable style.
"""
# Create a single async summarization task
summarization_task = AsyncOpenAISummarizationTask(
system_prompt=system_prompt,
model="gpt-3.5-turbo",
retries=2
)
# Get sample texts
texts = create_sample_texts()
# Create pipeline and replicate the task
pipeline = AsyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(summarization_task, num_replicas=len(texts))
print(f"🔄 Replicated 1 async task into {len(replicated_tasks)} instances")
print(f"Task IDs: {[task.task_id for task in replicated_tasks]}")
print("-" * 60)
# Show task-to-input mapping
for i, (task, text) in enumerate(zip(replicated_tasks, texts)):
print(f"📝 Task {task.task_id} → Text '{text.id}'")
print(f"\nStarting concurrent execution of {len(replicated_tasks)} replicated async tasks...")
print("-" * 60)
# Create individual pipelines for each replicated task
pipelines = []
inputs = []
for task, text in zip(replicated_tasks, texts):
pipeline = AsyncPipeline.from_tasks([task])
pipelines.append(pipeline)
inputs.append(Ok(text))
# Run pipelines in parallel
start_time = asyncio.get_event_loop().time()
execution_results = await AsyncPipeline.run_parallel(
pipelines,
inputs,
debug=True
)
end_time = asyncio.get_event_loop().time()
print(f"Parallel async execution completed in {end_time - start_time:.2f}s")
print("-" * 60)
# Process results
successful_summaries = []
failed_summaries = []
for i, exec_result in enumerate(execution_results):
text_id = texts[i].id
task_id = replicated_tasks[i].task_id
if exec_result.result.is_ok():
summary = exec_result.result.unwrap()
successful_summaries.append(summary)
print(f"✅ Successfully processed '{text_id}' with async task {task_id}:")
print(f" Original length: {len(texts[i].text)} chars")
print(f" Summary length: {len(summary.summary)} chars")
print(f" Tokens used: {summary.tokens_used}")
print(f" Task ID: {summary.task_id}")
print(f" Execution time: {exec_result.execution_time:.2f}s")
if exec_result.trace:
print(f" Trace steps: {len(exec_result.trace.steps)}")
print()
else:
error_msg = exec_result.result.unwrap_err()
failed_summaries.append((text_id, error_msg))
print(f"❌ Failed to process '{text_id}' with async task {task_id}:")
print(f" Error: {error_msg}")
print(f" Execution time: {exec_result.execution_time:.2f}s")
print()
# Summary statistics
print("=" * 60)
print("ASYNC TASK REPLICATION SUMMARY")
print("=" * 60)
print(f"Original async task replicated: {len(replicated_tasks)} times")
print(f"Total texts processed: {len(texts)}")
print(f"Successful summaries: {len(successful_summaries)}")
print(f"Failed summaries: {len(failed_summaries)}")
if successful_summaries:
total_tokens = sum(s.tokens_used for s in successful_summaries)
avg_tokens = total_tokens / len(successful_summaries)
print(f"Total tokens used: {total_tokens}")
print(f"Average tokens per summary: {avg_tokens:.1f}")
# Show async task performance
task_performance = [(s.task_id, s.tokens_used) for s in successful_summaries]
task_performance.sort(key=lambda x: x[1])
print(f"Most efficient async task: {task_performance[0][0]} ({task_performance[0][1]} tokens)")
print(f"Least efficient async task: {task_performance[-1][0]} ({task_performance[-1][1]} tokens)")
total_time = sum(result.execution_time for result in execution_results)
avg_time = total_time / len(execution_results)
print(f"Total execution time: {total_time:.2f}s")
print(f"Average time per replicated async task: {avg_time:.2f}s")
print(f"Concurrent speedup vs sequential: ~{len(texts):.1f}x faster")
return execution_results
async def demonstrate_concurrent_vs_sequential():
"""
Compare concurrent replicated tasks vs sequential processing.
"""
print("\n🔍 CONCURRENT vs SEQUENTIAL COMPARISON")
print("=" * 60)
print("Comparing replicated async tasks vs sequential processing...")
system_prompt = "Create a brief summary of the main points."
# Create base task
summarization_task = AsyncOpenAISummarizationTask(
system_prompt=system_prompt,
model="gpt-3.5-turbo",
retries=1
)
# Use first 3 texts for comparison
texts = create_sample_texts()[:3]
print(f"\n1. CONCURRENT EXECUTION (Replicated Tasks)")
print("-" * 50)
# Concurrent execution with replicated tasks
pipeline = AsyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(summarization_task, num_replicas=len(texts))
pipelines = [AsyncPipeline.from_tasks([task]) for task in replicated_tasks]
inputs = [Ok(text) for text in texts]
start_time = asyncio.get_event_loop().time()
concurrent_results = await AsyncPipeline.run_parallel(pipelines, inputs, debug=False)
concurrent_time = asyncio.get_event_loop().time() - start_time
concurrent_success = sum(1 for r in concurrent_results if r.result.is_ok())
print(f" Time: {concurrent_time:.2f}s")
print(f" Success: {concurrent_success}/{len(texts)}")
print(f" Tasks used: {len(replicated_tasks)} replicated instances")
print(f"\n2. SEQUENTIAL EXECUTION (Same Task)")
print("-" * 50)
# Sequential execution with same task
start_time = asyncio.get_event_loop().time()
sequential_results = []
for text in texts:
pipeline = AsyncPipeline.from_tasks([summarization_task])
result = await pipeline.run_sequence(Ok(text), debug=False)
sequential_results.append(result)
sequential_time = asyncio.get_event_loop().time() - start_time
sequential_success = sum(1 for r in sequential_results if r.result.is_ok())
print(f" Time: {sequential_time:.2f}s")
print(f" Success: {sequential_success}/{len(texts)}")
print(f" Tasks used: 1 task instance (reused)")
print(f"\n📊 PERFORMANCE ANALYSIS")
print("-" * 50)
speedup = sequential_time / concurrent_time if concurrent_time > 0 else 0
print(f"Concurrent replicated tasks: {concurrent_time:.2f}s")
print(f"Sequential single task: {sequential_time:.2f}s")
print(f"Speedup with replication: {speedup:.1f}x faster")
print(f"Efficiency gain: {((sequential_time - concurrent_time) / sequential_time * 100):.1f}%")
return concurrent_results, sequential_results
async def demonstrate_async_task_benefits():
"""
Demonstrate the benefits of async task replication.
"""
print("\n⚡ ASYNC TASK REPLICATION BENEFITS")
print("=" * 60)
print("Demonstrating benefits of async task replication...")
system_prompt = "Provide a concise summary of the key innovations."
# Create base async task
base_task = AsyncOpenAISummarizationTask(
system_prompt=system_prompt,
model="gpt-3.5-turbo",
retries=1
)
print(f"🔧 Base async task created with ID: {base_task.task_id}")
# Create pipeline and replicate the task
pipeline = AsyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(base_task, num_replicas=3)
print(f"🔄 Async task replicated {len(replicated_tasks)} times:")
for i, task in enumerate(replicated_tasks):
print(f" Replica {i+1}: {task.task_id}")
print(f" Same config: {task.system_prompt == base_task.system_prompt}")
print(f" Unique ID: {task.task_id != base_task.task_id}")
print(f" Async client: {type(task.client).__name__}")
print()
# Test with sample texts
sample_texts = create_sample_texts()[:3]
print("Testing replicated async tasks concurrently...")
print("-" * 50)
# Create pipelines for each replicated task
pipelines = []
inputs = []
for task, text in zip(replicated_tasks, sample_texts):
pipeline = AsyncPipeline.from_tasks([task])
pipelines.append(pipeline)
inputs.append(Ok(text))
# Run all replicated tasks concurrently
start_time = asyncio.get_event_loop().time()
results = await AsyncPipeline.run_parallel(pipelines, inputs, debug=True)
end_time = asyncio.get_event_loop().time()
print(f"Concurrent async execution completed in {end_time - start_time:.2f}s")
print("-" * 50)
successful_count = 0
for i, result in enumerate(results):
if result.result.is_ok():
summary = result.result.unwrap()
successful_count += 1
print(f"✅ Replica {i+1} (Task {replicated_tasks[i].task_id}):")
print(f" Text: {sample_texts[i].id}")
print(f" Tokens: {summary.tokens_used}")
print(f" Time: {result.execution_time:.2f}s")
print(f" Task ID in result: {summary.task_id}")
else:
print(f"❌ Replica {i+1} failed: {result.result.unwrap_err()}")
print()
print(f"📊 Async Replication Results:")
print(f" Successful executions: {successful_count}/{len(replicated_tasks)}")
print(f" Total concurrent time: {end_time - start_time:.2f}s")
print(f" Average time per task: {(end_time - start_time)/len(replicated_tasks):.2f}s")
print(f" Async advantage: Non-blocking concurrent execution")
return results
async def demonstrate_async_error_handling():
"""
Demonstrate error handling in replicated async tasks.
"""
print("\n🔍 ASYNC ERROR HANDLING WITH REPLICATION")
print("=" * 60)
system_prompt = "Summarize this text:"
# Create base task
base_task = AsyncOpenAISummarizationTask(
system_prompt=system_prompt,
retries=1
)
# Create test cases - some will fail
test_cases = [
TextInput(text="This is a valid text for summarization that should work fine.", id="valid_text"),
TextInput(text="", id="empty_text"), # This will fail
TextInput(text="Short", id="too_short"), # This might fail
]
# Replicate task for each test case
pipeline = AsyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(base_task, num_replicas=len(test_cases))
print(f"Testing {len(test_cases)} scenarios with {len(replicated_tasks)} replicated async tasks:")
for i, (task, test_case) in enumerate(zip(replicated_tasks, test_cases)):
print(f" Task {task.task_id} → {test_case.id}")
# Create pipelines and run in parallel
pipelines = [AsyncPipeline.from_tasks([task]) for task in replicated_tasks]
inputs = [Ok(test_case) for test_case in test_cases]
print("\nRunning error handling test...")
print("-" * 50)
start_time = asyncio.get_event_loop().time()
results = await AsyncPipeline.run_parallel(pipelines, inputs, debug=True)
end_time = asyncio.get_event_loop().time()
print(f"Async error handling test completed in {end_time - start_time:.2f}s")
print("-" * 50)
for i, (result, test_case, task) in enumerate(zip(results, test_cases, replicated_tasks)):
print(f"\n{test_case.id.upper()} (Task {task.task_id}):")
if result.result.is_ok():
summary = result.result.unwrap()
print(f" ✅ Success: {summary.tokens_used} tokens")
print(f" Summary: {summary.summary[:50]}...")
else:
print(f" ❌ Error: {result.result.unwrap_err()}")
print(f" Execution time: {result.execution_time:.2f}s")
if result.trace:
print(f" Trace steps: {len(result.trace.steps)}")
# Show error isolation benefit
successful = sum(1 for r in results if r.result.is_ok())
failed = len(results) - successful
print(f"\n🛡️ Error Isolation Benefits:")
print(f" Total tasks: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Success rate: {successful/len(results)*100:.1f}%")
print(f" Key benefit: Failed tasks don't affect successful ones!")
return results
async def async_replication_performance_analysis():
"""
Analyze performance characteristics of async task replication.
"""
print("\n📊 ASYNC TASK REPLICATION PERFORMANCE ANALYSIS")
print("=" * 60)
system_prompt = "Provide a brief summary of the main points."
texts = create_sample_texts()[:4] # Use 4 texts for analysis
# Create base task
base_task = AsyncOpenAISummarizationTask(
system_prompt=system_prompt,
model="gpt-3.5-turbo",
retries=1
)
print(f"Performance analysis with {len(texts)} texts...\n")
# 1. Single task, sequential execution
print("1. Single Async Task (Sequential Processing)")
start_time = asyncio.get_event_loop().time()
sequential_results = []
for text in texts:
pipeline = AsyncPipeline.from_tasks([base_task])
result = await pipeline.run_sequence(Ok(text), debug=False)
sequential_results.append(result)
sequential_time = asyncio.get_event_loop().time() - start_time
sequential_success = sum(1 for r in sequential_results if r.result.is_ok())
print(f" Time: {sequential_time:.2f}s")
print(f" Success: {sequential_success}/{len(texts)}")
print(f" Tasks used: 1 (reused)")
# 2. Replicated tasks, parallel execution
print("\n2. Replicated Async Tasks (Parallel Processing)")
pipeline = AsyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(base_task, num_replicas=len(texts))
pipelines = [AsyncPipeline.from_tasks([task]) for task in replicated_tasks]
inputs = [Ok(text) for text in texts]
start_time = asyncio.get_event_loop().time()
parallel_results = await AsyncPipeline.run_parallel(pipelines, inputs, debug=False)
parallel_time = asyncio.get_event_loop().time() - start_time
parallel_success = sum(1 for r in parallel_results if r.result.is_ok())
print(f" Time: {parallel_time:.2f}s")
print(f" Success: {parallel_success}/{len(texts)}")
print(f" Tasks used: {len(replicated_tasks)} (replicated)")
# 3. Analysis
print(f"\n📈 Performance Metrics:")
speedup = sequential_time / parallel_time if parallel_time > 0 else 0
efficiency = (sequential_time - parallel_time) / sequential_time * 100 if sequential_time > 0 else 0
print(f" Sequential time: {sequential_time:.2f}s")
print(f" Parallel time: {parallel_time:.2f}s")
print(f" Speedup: {speedup:.1f}x faster")
print(f" Efficiency gain: {efficiency:.1f}%")
print(f" Scalability: Linear with task replication")
# 4. Resource utilization
print(f"\n💻 Resource Utilization:")
print(f" Memory overhead: {len(replicated_tasks)} task instances")
print(f" Concurrent connections: {len(replicated_tasks)} to OpenAI API")
print(f" Task isolation: Each replica independent")
print(f" Error resilience: Failures don't cascade")
# 5. Recommendations
print(f"\n💡 Recommendations:")
if speedup > 2:
print(f" ✅ Excellent: Task replication provides {speedup:.1f}x speedup")
print(f" ✅ Use replicated tasks for parallel processing")
elif speedup > 1.5:
print(f" 🟡 Good: Task replication provides {speedup:.1f}x speedup")
print(f" 🟡 Consider replicated tasks for time-sensitive operations")
else:
print(f" 🔴 Limited: Only {speedup:.1f}x speedup - check for bottlenecks")
print(f" 🔴 Sequential might be sufficient for this workload")
return parallel_results, sequential_results
async def main():
"""Main demonstration function."""
# Ensure OpenAI API key is available
if not os.getenv("OPENAI_API_KEY"):
print("❌ Error: OPENAI_API_KEY environment variable not set.")
print("Please set your OpenAI API key:")
print("export OPENAI_API_KEY='your-api-key-here'")
print("or create a .env file with: OPENAI_API_KEY=your-api-key-here")
return
print("🚀 NeoPipe OpenAI Async Pipeline Example")
print("=" * 60)
try:
# Run all demonstrations
await run_replicated_async_tasks()
await demonstrate_concurrent_vs_sequential()
await demonstrate_async_task_benefits()
await demonstrate_async_error_handling()
await async_replication_performance_analysis()
print("\n✨ Async task replication example completed!")
print("\nKey takeaways:")
print("- replicate_task() creates multiple async task instances with unique IDs")
print("- Async replication enables true concurrent processing with non-blocking I/O")
print("- Each replicated async task operates independently")
print("- Perfect for scaling API calls and I/O-bound operations")
print("- Significant performance improvements over sequential execution")
print("- Error isolation: failures in one replica don't affect others")
print("- AsyncPipeline.run_parallel() efficiently manages concurrent execution")
print("- Ideal for high-throughput, low-latency processing scenarios")
except Exception as e:
print(f"❌ Example failed: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())
Key Features Demonstrated¶
1. Three Async Execution Modes¶
Async Task Replication:
# Create base async task
base_task = AsyncOpenAISummarizationTask(system_prompt, model="gpt-3.5-turbo")
# Replicate async task with unique IDs
pipeline = AsyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(base_task, num_replicas=5)
# Each replica has unique task_id but same async configuration
print([task.task_id for task in replicated_tasks])
Parallel Execution of Replicated Async Tasks:
# Create individual pipelines for each replicated async task
pipelines = [AsyncPipeline.from_tasks([task]) for task in replicated_tasks]
# Run in parallel with concurrent async execution
execution_results = await AsyncPipeline.run_parallel(pipelines, inputs, debug=True)
2. Async Task Tracking and Performance¶
# Each replicated async task provides detailed tracking
result = SummaryOutput(
original_text=text_data.text,
summary=summary.strip(),
tokens_used=response.usage.total_tokens,
task_id=self.task_id # Unique async task identifier
)
3. Non-blocking Concurrent API Calls¶
# Multiple async API calls execute concurrently
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": text_data.text}
],
max_tokens=150,
temperature=0.3
)
4. Benefits of Async Task Replication¶
- Concurrency: True parallel async execution
- Scalability: Linear performance improvement with replicas
- Efficiency: Non-blocking I/O maximizes resource utilization
- Isolation: Independent async task instances
- Monitoring: Individual task performance tracking
- Resilience: Async error handling with fault isolation
5. Use Cases for Async Task Replication¶
- High-throughput API processing
- Concurrent data fetching from multiple sources
- Parallel machine learning inference
- Real-time data processing pipelines
- Scalable microservices orchestration
Setup Requirements¶
# Install dependencies
pip install openai python-dotenv
# Set up environment
export OPENAI_API_KEY="your-api-key-here"
# or create .env file with: OPENAI_API_KEY=your-api-key-here
When to Use Async Task Replication¶
Scenario | Benefit | Performance Gain | Best Use Case |
---|---|---|---|
I/O-bound operations | Non-blocking concurrent execution | 3-10x speedup | API calls, file processing |
Independent processing | Parallel execution without dependencies | Linear scaling | Multiple data sources |
High-throughput systems | Maximize resource utilization | Significant | Real-time processing |
Fault-tolerant systems | Error isolation between replicas | Same + resilience | Production systems |
This example demonstrates NeoPipe's async task replication for building highly scalable, concurrent processing systems with optimal resource utilization and comprehensive monitoring.