Skip to content

OpenAI Sync Pipeline with Task Replication Example

This example demonstrates using NeoPipe's replicate_task method to create multiple instances of the same OpenAI task and run them in parallel using SyncPipeline.

Overview

We'll create an OpenAI summarization system that: - Uses a single OpenAI task configuration - Replicates the task 5 times using replicate_task method - Runs replicated tasks in parallel on different texts - Demonstrates the power of task replication for scaling - Shows proper error handling with Result types

Complete Example

import os
from typing import List, Dict, Any
from dataclasses import dataclass
from openai import OpenAI
from dotenv import load_dotenv

from neopipe import Result, Ok, Err
from neopipe.task import ClassSyncTask
from neopipe.pipeline import SyncPipeline

# Load environment variables
load_dotenv()

@dataclass
class TextInput:
    """Simple input structure for text summarization."""
    text: str
    id: str = ""

@dataclass
class SummaryOutput:
    """Output structure for summarization results."""
    original_text: str
    summary: str
    tokens_used: int
    task_id: str

class OpenAISummarizationTask(ClassSyncTask[TextInput, str]):
    """
    A task that calls OpenAI API to summarize text using a consistent system prompt.

    This demonstrates:
    - Class-based task configuration
    - External API integration
    - Comprehensive error handling
    - Result type usage
    """

    def __init__(self,
                 system_prompt: str,
                 model: str = "gpt-3.5-turbo",
                 retries: int = 3):
        super().__init__(retries=retries)
        self.system_prompt = system_prompt
        self.model = model
        self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    def execute(self, input_result: Result[TextInput, str]) -> Result[SummaryOutput, str]:
        """Execute the summarization task."""
        if input_result.is_err():
            return input_result

        try:
            text_data = input_result.unwrap()

            # Validate input
            if not text_data.text.strip():
                return Err(f"Empty content for text ID: {text_data.id}")

            # Call OpenAI API
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": self.system_prompt},
                    {"role": "user", "content": text_data.text}
                ],
                max_tokens=150,
                temperature=0.3
            )

            # Extract summary
            summary = response.choices[0].message.content
            if not summary:
                return Err(f"No summary generated for text ID: {text_data.id}")

            # Create result
            result = SummaryOutput(
                original_text=text_data.text,
                summary=summary.strip(),
                tokens_used=response.usage.total_tokens,
                task_id=self.task_id
            )

            return Ok(result)

        except Exception as e:
            return Err(f"OpenAI API error for text ID {text_data.id}: {str(e)} (Task: {self.task_id})")

def create_sample_texts() -> List[TextInput]:
    """Create sample texts for demonstration."""
    return [
        TextInput(
            text="""
            Artificial Intelligence is rapidly transforming various industries,
            from healthcare to finance. Machine learning algorithms are becoming
            more sophisticated, enabling computers to perform tasks that previously
            required human intelligence. Deep learning, a subset of machine learning,
            uses neural networks with multiple layers to process and analyze complex data.
            """,
            id="tech_article"
        ),
        TextInput(
            text="""
            Climate change continues to be a pressing global issue. Rising temperatures,
            melting ice caps, and extreme weather events are becoming more frequent.
            Scientists worldwide are working on innovative solutions including renewable
            energy technologies, carbon capture methods, and sustainable agriculture
            practices to mitigate the effects of global warming.
            """,
            id="climate_news"
        ),
        TextInput(
            text="""
            Space exploration has entered a new era with private companies joining
            government agencies in pursuing ambitious missions. Mars exploration,
            asteroid mining, and space tourism are no longer science fiction but
            achievable goals. Advanced rocket technology and international cooperation
            are driving unprecedented progress in our understanding of the cosmos.
            """,
            id="space_exploration"
        ),
        TextInput(
            text="""
            Recent medical breakthroughs in gene therapy and personalized medicine
            are revolutionizing healthcare. CRISPR technology allows precise genetic
            editing, while immunotherapy is showing remarkable success in cancer
            treatment. Telemedicine and AI-assisted diagnosis are making healthcare
            more accessible and accurate than ever before.
            """,
            id="medical_breakthrough"
        ),
        TextInput(
            text="""
            Quantum computing represents a paradigm shift in computational power.
            Unlike classical computers that use bits, quantum computers use qubits
            that can exist in multiple states simultaneously. This quantum superposition
            enables solving complex problems exponentially faster, with applications
            in cryptography, drug discovery, and financial modeling.
            """,
            id="quantum_computing"
        )
    ]

def run_replicated_task_demo():
    """
    Demonstrate task replication using SyncPipeline.replicate_task method.

    This shows how to:
    - Create a single task configuration
    - Replicate it multiple times with unique IDs
    - Run replicated tasks in parallel
    - Track which task processed which input
    """

    # System prompt used for all summarizations
    system_prompt = """
    You are a professional summarizer. Create concise, informative summaries
    that capture the key points and main ideas of the given text. Focus on
    the most important information and maintain a clear, readable style.
    """

    # Create sample texts
    texts = create_sample_texts()

    # Create a single summarization task
    summarization_task = OpenAISummarizationTask(
        system_prompt=system_prompt,
        model="gpt-3.5-turbo",
        retries=2
    )

    # Create a pipeline and replicate the task 5 times
    pipeline = SyncPipeline[TextInput, str]()
    replicated_tasks = pipeline.replicate_task(summarization_task, num_replicas=5)

    print(f"🔄 Replicated 1 task into {len(replicated_tasks)} instances")
    print(f"Task IDs: {[task.task_id for task in replicated_tasks]}")
    print("-" * 60)

    # Create individual pipelines for each replicated task
    pipelines = []
    initial_inputs = []

    for i, (task, text) in enumerate(zip(replicated_tasks, texts)):
        # Each pipeline contains one replicated task
        pipeline = SyncPipeline.from_tasks([task])
        pipelines.append(pipeline)
        initial_inputs.append(Ok(text))
        print(f"📝 Pipeline {i+1}: Task {task.task_id} → Text '{text.id}'")

    print(f"\nStarting parallel execution of {len(replicated_tasks)} replicated tasks...")
    print("-" * 60)

    # Run pipelines in parallel
    try:
        # SyncPipeline.run_parallel executes multiple pipelines concurrently
        execution_results = SyncPipeline.run_parallel(
            pipelines,
            initial_inputs,
            debug=True  # Enable tracing for debugging
        )

        # Process results
        successful_summaries = []
        failed_summaries = []

        for i, exec_result in enumerate(execution_results):
            text_id = texts[i].id
            task_id = replicated_tasks[i].task_id

            if exec_result.result.is_ok():
                summary = exec_result.result.unwrap()
                successful_summaries.append(summary)

                print(f"✅ Successfully processed '{text_id}' with task {task_id}:")
                print(f"   Original length: {len(texts[i].text)} chars")
                print(f"   Summary length: {len(summary.summary)} chars")
                print(f"   Tokens used: {summary.tokens_used}")
                print(f"   Task ID: {summary.task_id}")
                print(f"   Execution time: {exec_result.execution_time:.2f}s")
                if exec_result.trace:
                    print(f"   Trace steps: {len(exec_result.trace.steps)}")
                print()
            else:
                error_msg = exec_result.result.unwrap_err()
                failed_summaries.append((text_id, error_msg))

                print(f"❌ Failed to process '{text_id}' with task {task_id}:")
                print(f"   Error: {error_msg}")
                print(f"   Execution time: {exec_result.execution_time:.2f}s")
                print()

        # Summary statistics
        print("=" * 60)
        print("TASK REPLICATION SUMMARY")
        print("=" * 60)
        print(f"Original task replicated: {len(replicated_tasks)} times")
        print(f"Total texts processed: {len(texts)}")
        print(f"Successful summaries: {len(successful_summaries)}")
        print(f"Failed summaries: {len(failed_summaries)}")

        if successful_summaries:
            total_tokens = sum(s.tokens_used for s in successful_summaries)
            avg_tokens = total_tokens / len(successful_summaries)
            print(f"Total tokens used: {total_tokens}")
            print(f"Average tokens per summary: {avg_tokens:.1f}")

            # Show which tasks were most/least efficient
            task_performance = [(s.task_id, s.tokens_used) for s in successful_summaries]
            task_performance.sort(key=lambda x: x[1])
            print(f"Most efficient task: {task_performance[0][0]} ({task_performance[0][1]} tokens)")
            print(f"Least efficient task: {task_performance[-1][0]} ({task_performance[-1][1]} tokens)")

        total_time = sum(result.execution_time for result in execution_results)
        print(f"Total execution time: {total_time:.2f}s")
        print(f"Average time per replicated task: {total_time/len(execution_results):.2f}s")

        # Display actual summaries
        if successful_summaries:
            print("\n" + "=" * 60)
            print("SUMMARIES BY REPLICATED TASKS")
            print("=" * 60)

            for i, summary in enumerate(successful_summaries):
                print(f"\n📝 TEXT: {texts[i].id.upper()} | TASK: {summary.task_id}")
                print("-" * 50)
                print(summary.summary)
                print(f"(Tokens: {summary.tokens_used})")

        return execution_results

    except Exception as e:
        print(f"❌ Pipeline execution failed: {str(e)}")
        return None

def demonstrate_task_replication_benefits():
    """
    Demonstrate the benefits and use cases of task replication.
    """
    print("\n" + "=" * 60)
    print("TASK REPLICATION BENEFITS DEMONSTRATION")
    print("=" * 60)

    system_prompt = "Summarize the following text:"

    # Create base task
    base_task = OpenAISummarizationTask(
        system_prompt=system_prompt,
        retries=1
    )

    print(f"🔧 Base task created with ID: {base_task.task_id}")

    # Create pipeline and replicate the task
    pipeline = SyncPipeline[TextInput, str]()
    replicated_tasks = pipeline.replicate_task(base_task, num_replicas=3)

    print(f"🔄 Task replicated {len(replicated_tasks)} times:")
    for i, task in enumerate(replicated_tasks):
        print(f"   Replica {i+1}: {task.task_id}")
        print(f"   Same config: {task.system_prompt == base_task.system_prompt}")
        print(f"   Unique ID: {task.task_id != base_task.task_id}")
        print()

    # Test with sample text
    sample_text = TextInput(
        text="Task replication allows scaling the same operation across multiple inputs efficiently.",
        id="replication_demo"
    )

    print("Testing replicated tasks with same input...")

    for i, task in enumerate(replicated_tasks[:2]):  # Test first 2 replicas
        pipeline = SyncPipeline.from_tasks([task])
        result = pipeline.run(Ok(sample_text), debug=True)

        if result.result.is_ok():
            summary = result.result.unwrap()
            print(f"✅ Replica {i+1} (Task {task.task_id}):")
            print(f"   Tokens: {summary.tokens_used}")
            print(f"   Time: {result.execution_time:.2f}s")
        else:
            print(f"❌ Replica {i+1} failed: {result.result.unwrap_err()}")

if __name__ == "__main__":
    # Ensure OpenAI API key is available
    if not os.getenv("OPENAI_API_KEY"):
        print("❌ Error: OPENAI_API_KEY environment variable not set.")
        print("Please set your OpenAI API key:")
        print("export OPENAI_API_KEY='your-api-key-here'")
        print("or create a .env file with: OPENAI_API_KEY=your-api-key-here")
        exit(1)

    print("🚀 NeoPipe OpenAI Sync Pipeline Example")
    print("=" * 60)

    # Run the main demonstration
    results = run_replicated_task_demo()

    # Demonstrate task replication benefits
    demonstrate_task_replication_benefits()

    print("\n✨ Task replication example completed!")
    print("\nKey takeaways:")
    print("- replicate_task() creates multiple instances with unique IDs")
    print("- Replicated tasks share configuration but have independent execution")
    print("- Perfect for scaling identical operations across multiple inputs")
    print("- Each replica can be tracked and monitored separately")
    print("- Enables efficient parallel processing with minimal setup")
    print("- SyncPipeline.run_parallel handles concurrent execution seamlessly")

Key Features Demonstrated

1. Task Replication

# Create base task
summarization_task = OpenAISummarizationTask(system_prompt, model="gpt-3.5-turbo")

# Replicate task 5 times with unique IDs
pipeline = SyncPipeline[TextInput, str]()
replicated_tasks = pipeline.replicate_task(summarization_task, num_replicas=5)

# Each replica has unique task_id but same configuration
print([task.task_id for task in replicated_tasks])

2. Parallel Execution of Replicated Tasks

# Create individual pipelines for each replicated task
pipelines = [SyncPipeline.from_tasks([task]) for task in replicated_tasks]

# Run in parallel with different inputs
execution_results = SyncPipeline.run_parallel(pipelines, inputs, debug=True)

3. Task Tracking and Identification

# Each replicated task has unique ID for tracking
result = SummaryOutput(
    original_text=text_data.text,
    summary=summary.strip(),
    tokens_used=response.usage.total_tokens,
    task_id=self.task_id  # Unique task identifier
)

4. Benefits of Task Replication

  • Scalability: Easily scale identical operations
  • Traceability: Track which task processed which input
  • Resource Management: Independent task instances
  • Fault Isolation: Failures in one replica don't affect others
  • Performance Monitoring: Compare efficiency across replicas

5. Use Cases for Task Replication

  • Processing multiple similar inputs with same logic
  • A/B testing different configurations
  • Load balancing across task instances
  • Parallel processing of independent data sets
  • Scaling API calls efficiently

Setup Requirements

To run this example, you'll need:

# Install dependencies
pip install openai python-dotenv

# Set up environment
export OPENAI_API_KEY="your-api-key-here"
# or create .env file with: OPENAI_API_KEY=your-api-key-here

Expected Output

The example will show: - Creation of 5 replicated tasks from a single base task - Parallel processing with unique task identification - Task performance comparison across replicas - Generated summaries with task traceability - Benefits of task replication for scaling

This demonstrates NeoPipe's replicate_task method for efficiently scaling identical operations across multiple inputs with full traceability and monitoring.