Skip to content

Async Pipeline

AsyncPipeline

Bases: Generic[T, E]

An asynchronous pipeline supporting three modes:

  1. run: Concurrent execution of registered tasks, one-to-one with inputs.
  2. run_sequence: Sequential chaining of tasks.
  3. run_parallel: Concurrent execution of multiple pipelines, returning PipelineResult objects.

Each method supports an optional debug flag to capture per-task traces.

Tasks must consume and return a Result[T, E].

Source code in neopipe/async_pipeline.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class AsyncPipeline(Generic[T, E]):
    """
    An asynchronous pipeline supporting three modes:

    1. run: Concurrent execution of registered tasks, one-to-one with inputs.
    2. run_sequence: Sequential chaining of tasks.
    3. run_parallel: Concurrent execution of multiple pipelines, returning PipelineResult objects.

    Each method supports an optional debug flag to capture per-task traces.

    Tasks must consume and return a Result[T, E].
    """

    def __init__(self, name: Optional[str] = None):
        self.tasks: List[BaseAsyncTask[T, E]] = []
        self.pipeline_id = uuid.uuid4()
        self.name = name or f"AsyncPipeline-{self.pipeline_id}"

    @classmethod
    def from_tasks(
        cls, tasks: List[BaseAsyncTask[T, E]], name: Optional[str] = None
    ) -> "AsyncPipeline[T, E]":
        pipeline = cls(name)
        for task in tasks:
            pipeline.add_task(task)
        return pipeline

    def add_task(self, task: BaseAsyncTask[T, E]) -> None:
        self._validate_task(task)
        self.tasks.append(task)

    def replicate_task(self, task: BaseAsyncTask[T, E], num_replicas: int) -> List[BaseAsyncTask[T, E]]:
        """
        Replicate a task multiple times with unique task IDs.

        Args:
            task: The async task to replicate
            num_replicas: Number of replicas to create

        Returns:
            List[BaseAsyncTask[T, E]]: List of replicated tasks with unique IDs
        """
        replicas = []
        for _ in range(num_replicas):
            # Create a deep copy of the task
            replica = copy.deepcopy(task)
            # Assign a new unique task ID
            replica.task_id = uuid.uuid4()
            replicas.append(replica)
        return replicas

    def _validate_task(self, task: BaseAsyncTask[T, E]) -> None:
        if not isinstance(task, BaseAsyncTask):
            raise TypeError(f"Only BaseAsyncTask instances allowed, got {type(task)}")
        sig = inspect.signature(task.execute)
        params = [p for p in sig.parameters.values() if p.name != "self"]
        if len(params) != 1:
            raise TypeError(
                f"Task '{task.task_name}' must have exactly one input parameter"
            )
        origin = getattr(params[0].annotation, "__origin__", None)
        if origin is not Result:
            raise TypeError(
                f"Task '{task.task_name}' first arg must be Result[T, E], got {params[0].annotation}"
            )

    async def run(
        self,
        inputs: List[Result[T, E]],
        debug: bool = False
    ) -> ExecutionResult[List[Result[U, E]], E]:
        """
        Execute each task concurrently (1:1 to inputs).
        Returns:
          - .result: List[Result[U,E]]
          - .trace:  None or Trace[List[Result],E]
        """
        if len(inputs) != len(self.tasks):
            return Err("Number of inputs must match number of tasks")

        start = time.perf_counter()
        coros = [task(inp) for task, inp in zip(self.tasks, inputs)]
        try:
            results: List[Result[U, E]] = await asyncio.gather(*coros)
        except Exception as e:
            logger.exception(f"[{self.name}] run() exception")
            return Err(str(e))

        steps: List[Tuple[str, Result[U, E]]] = []
        for task, res in zip(self.tasks, results):
            if debug:
                steps.append((task.task_name, res))
        elapsed = time.perf_counter() - start

        return ExecutionResult(
            result=results,
            trace=Trace(steps=steps) if debug else None,
            execution_time=elapsed
        )

    async def run_sequence(
        self,
        input_result: Result[T, E],
        debug: bool = False
    ) -> ExecutionResult[Result[U, E], E]:
        """
        Run tasks in order, passing Result→Result.
        Returns:
          - .result: the final Result[U,E]
          - .trace:  None or Trace[Result,U,E] of each step
        """
        start = time.perf_counter()
        steps: List[Tuple[str, Result[Any, E]]] = []
        current = input_result

        for task in self.tasks:
            try:
                current = await task(current)
            except Exception as e:
                logger.exception(f"[{self.name}] run_sequence exception in {task.task_name}")
                current = Err(str(e))
            if debug:
                steps.append((task.task_name, current))
            if current.is_err() and not debug:
                break

        elapsed = time.perf_counter() - start
        return ExecutionResult(
            result=current,
            trace=Trace(steps=steps) if debug else None,
            execution_time=elapsed
        )

    @staticmethod
    async def run_parallel(
        pipelines: List["AsyncPipeline[T, E]"],
        inputs: List[Result[T, E]],
        debug: bool = False
    ) -> ExecutionResult[List[Result[U, E]], E]:
        """
        Execute several pipelines concurrently, each with a single input.
        Returns:
          - .result: List[Result[U,E]]
          - .trace:  None or Traces[List[Result],E]
        """
        if len(pipelines) != len(inputs):
            raise AssertionError("Each pipeline needs a corresponding input Result")

        start = time.perf_counter()
        coros = [p.run_sequence(inp, debug) for p, inp in zip(pipelines, inputs)]
        gathered = await asyncio.gather(*coros, return_exceptions=True)

        results: List[Result[U, E]] = []
        traces: List[Trace[U, E]] = []

        for pipeline, exec_res in zip(pipelines, gathered):
            if isinstance(exec_res, Exception):
                # unexpected exception
                err = Err(f"Exception in pipeline '{pipeline.name}': {exec_res}")
                results.append(err)
                if debug:
                    traces.append(Trace(steps=[(pipeline.name, err)]))
                continue

            # exec_res is ExecutionResult[Result[U,E],E]
            assert isinstance(exec_res, ExecutionResult)
            results.append(exec_res.result)
            if debug and exec_res.trace is not None:
                traces.append(exec_res.trace)  # a Trace[T,E]

        elapsed = time.perf_counter() - start
        return ExecutionResult(
            result=results,
            trace=Traces(pipelines=traces) if debug else None,
            execution_time=elapsed
        )

    def __str__(self) -> str:
        names = ", ".join(t.task_name for t in self.tasks)
        return f"{self.name}([{names}])"

    def __repr__(self) -> str:
        return self.__str__()

replicate_task(task, num_replicas)

Replicate a task multiple times with unique task IDs.

Parameters:

Name Type Description Default
task BaseAsyncTask[T, E]

The async task to replicate

required
num_replicas int

Number of replicas to create

required

Returns:

Type Description
List[BaseAsyncTask[T, E]]

List[BaseAsyncTask[T, E]]: List of replicated tasks with unique IDs

Source code in neopipe/async_pipeline.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def replicate_task(self, task: BaseAsyncTask[T, E], num_replicas: int) -> List[BaseAsyncTask[T, E]]:
    """
    Replicate a task multiple times with unique task IDs.

    Args:
        task: The async task to replicate
        num_replicas: Number of replicas to create

    Returns:
        List[BaseAsyncTask[T, E]]: List of replicated tasks with unique IDs
    """
    replicas = []
    for _ in range(num_replicas):
        # Create a deep copy of the task
        replica = copy.deepcopy(task)
        # Assign a new unique task ID
        replica.task_id = uuid.uuid4()
        replicas.append(replica)
    return replicas

run(inputs, debug=False) async

Execute each task concurrently (1:1 to inputs). Returns: - .result: List[Result[U,E]] - .trace: None or Trace[List[Result],E]

Source code in neopipe/async_pipeline.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def run(
    self,
    inputs: List[Result[T, E]],
    debug: bool = False
) -> ExecutionResult[List[Result[U, E]], E]:
    """
    Execute each task concurrently (1:1 to inputs).
    Returns:
      - .result: List[Result[U,E]]
      - .trace:  None or Trace[List[Result],E]
    """
    if len(inputs) != len(self.tasks):
        return Err("Number of inputs must match number of tasks")

    start = time.perf_counter()
    coros = [task(inp) for task, inp in zip(self.tasks, inputs)]
    try:
        results: List[Result[U, E]] = await asyncio.gather(*coros)
    except Exception as e:
        logger.exception(f"[{self.name}] run() exception")
        return Err(str(e))

    steps: List[Tuple[str, Result[U, E]]] = []
    for task, res in zip(self.tasks, results):
        if debug:
            steps.append((task.task_name, res))
    elapsed = time.perf_counter() - start

    return ExecutionResult(
        result=results,
        trace=Trace(steps=steps) if debug else None,
        execution_time=elapsed
    )

run_parallel(pipelines, inputs, debug=False) async staticmethod

Execute several pipelines concurrently, each with a single input. Returns: - .result: List[Result[U,E]] - .trace: None or Traces[List[Result],E]

Source code in neopipe/async_pipeline.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@staticmethod
async def run_parallel(
    pipelines: List["AsyncPipeline[T, E]"],
    inputs: List[Result[T, E]],
    debug: bool = False
) -> ExecutionResult[List[Result[U, E]], E]:
    """
    Execute several pipelines concurrently, each with a single input.
    Returns:
      - .result: List[Result[U,E]]
      - .trace:  None or Traces[List[Result],E]
    """
    if len(pipelines) != len(inputs):
        raise AssertionError("Each pipeline needs a corresponding input Result")

    start = time.perf_counter()
    coros = [p.run_sequence(inp, debug) for p, inp in zip(pipelines, inputs)]
    gathered = await asyncio.gather(*coros, return_exceptions=True)

    results: List[Result[U, E]] = []
    traces: List[Trace[U, E]] = []

    for pipeline, exec_res in zip(pipelines, gathered):
        if isinstance(exec_res, Exception):
            # unexpected exception
            err = Err(f"Exception in pipeline '{pipeline.name}': {exec_res}")
            results.append(err)
            if debug:
                traces.append(Trace(steps=[(pipeline.name, err)]))
            continue

        # exec_res is ExecutionResult[Result[U,E],E]
        assert isinstance(exec_res, ExecutionResult)
        results.append(exec_res.result)
        if debug and exec_res.trace is not None:
            traces.append(exec_res.trace)  # a Trace[T,E]

    elapsed = time.perf_counter() - start
    return ExecutionResult(
        result=results,
        trace=Traces(pipelines=traces) if debug else None,
        execution_time=elapsed
    )

run_sequence(input_result, debug=False) async

Run tasks in order, passing Result→Result. Returns: - .result: the final Result[U,E] - .trace: None or Trace[Result,U,E] of each step

Source code in neopipe/async_pipeline.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
async def run_sequence(
    self,
    input_result: Result[T, E],
    debug: bool = False
) -> ExecutionResult[Result[U, E], E]:
    """
    Run tasks in order, passing Result→Result.
    Returns:
      - .result: the final Result[U,E]
      - .trace:  None or Trace[Result,U,E] of each step
    """
    start = time.perf_counter()
    steps: List[Tuple[str, Result[Any, E]]] = []
    current = input_result

    for task in self.tasks:
        try:
            current = await task(current)
        except Exception as e:
            logger.exception(f"[{self.name}] run_sequence exception in {task.task_name}")
            current = Err(str(e))
        if debug:
            steps.append((task.task_name, current))
        if current.is_err() and not debug:
            break

    elapsed = time.perf_counter() - start
    return ExecutionResult(
        result=current,
        trace=Trace(steps=steps) if debug else None,
        execution_time=elapsed
    )

AsyncWorkflow

Bases: AsyncPipeline[T, E]

A workflow class that inherits from AsyncPipeline, providing the same functionality under a different namespace for workflow-oriented use cases.

Source code in neopipe/async_pipeline.py
210
211
212
213
214
215
class AsyncWorkflow(AsyncPipeline[T, E]):
    """
    A workflow class that inherits from AsyncPipeline, providing the same functionality
    under a different namespace for workflow-oriented use cases.
    """
    pass