Skip to content

Pipeline

Pipeline module providing unified access to sync and async pipelines.

This module imports and re-exports pipeline classes from their respective modules to maintain backward compatibility while keeping sync and async implementations separate.

AsyncPipeline

Bases: Generic[T, E]

An asynchronous pipeline supporting three modes:

  1. run: Concurrent execution of registered tasks, one-to-one with inputs.
  2. run_sequence: Sequential chaining of tasks.
  3. run_parallel: Concurrent execution of multiple pipelines, returning PipelineResult objects.

Each method supports an optional debug flag to capture per-task traces.

Tasks must consume and return a Result[T, E].

Source code in neopipe/async_pipeline.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class AsyncPipeline(Generic[T, E]):
    """
    An asynchronous pipeline supporting three modes:

    1. run: Concurrent execution of registered tasks, one-to-one with inputs.
    2. run_sequence: Sequential chaining of tasks.
    3. run_parallel: Concurrent execution of multiple pipelines, returning PipelineResult objects.

    Each method supports an optional debug flag to capture per-task traces.

    Tasks must consume and return a Result[T, E].
    """

    def __init__(self, name: Optional[str] = None):
        self.tasks: List[BaseAsyncTask[T, E]] = []
        self.pipeline_id = uuid.uuid4()
        self.name = name or f"AsyncPipeline-{self.pipeline_id}"

    @classmethod
    def from_tasks(
        cls, tasks: List[BaseAsyncTask[T, E]], name: Optional[str] = None
    ) -> "AsyncPipeline[T, E]":
        pipeline = cls(name)
        for task in tasks:
            pipeline.add_task(task)
        return pipeline

    def add_task(self, task: BaseAsyncTask[T, E]) -> None:
        self._validate_task(task)
        self.tasks.append(task)

    def replicate_task(self, task: BaseAsyncTask[T, E], num_replicas: int) -> List[BaseAsyncTask[T, E]]:
        """
        Replicate a task multiple times with unique task IDs.

        Args:
            task: The async task to replicate
            num_replicas: Number of replicas to create

        Returns:
            List[BaseAsyncTask[T, E]]: List of replicated tasks with unique IDs
        """
        replicas = []
        for _ in range(num_replicas):
            # Create a deep copy of the task
            replica = copy.deepcopy(task)
            # Assign a new unique task ID
            replica.task_id = uuid.uuid4()
            replicas.append(replica)
        return replicas

    def _validate_task(self, task: BaseAsyncTask[T, E]) -> None:
        if not isinstance(task, BaseAsyncTask):
            raise TypeError(f"Only BaseAsyncTask instances allowed, got {type(task)}")
        sig = inspect.signature(task.execute)
        params = [p for p in sig.parameters.values() if p.name != "self"]
        if len(params) != 1:
            raise TypeError(
                f"Task '{task.task_name}' must have exactly one input parameter"
            )
        origin = getattr(params[0].annotation, "__origin__", None)
        if origin is not Result:
            raise TypeError(
                f"Task '{task.task_name}' first arg must be Result[T, E], got {params[0].annotation}"
            )

    async def run(
        self,
        inputs: List[Result[T, E]],
        debug: bool = False
    ) -> ExecutionResult[List[Result[U, E]], E]:
        """
        Execute each task concurrently (1:1 to inputs).
        Returns:
          - .result: List[Result[U,E]]
          - .trace:  None or Trace[List[Result],E]
        """
        if len(inputs) != len(self.tasks):
            return Err("Number of inputs must match number of tasks")

        start = time.perf_counter()
        coros = [task(inp) for task, inp in zip(self.tasks, inputs)]
        try:
            results: List[Result[U, E]] = await asyncio.gather(*coros)
        except Exception as e:
            logger.exception(f"[{self.name}] run() exception")
            return Err(str(e))

        steps: List[Tuple[str, Result[U, E]]] = []
        for task, res in zip(self.tasks, results):
            if debug:
                steps.append((task.task_name, res))
        elapsed = time.perf_counter() - start

        return ExecutionResult(
            result=results,
            trace=Trace(steps=steps) if debug else None,
            execution_time=elapsed
        )

    async def run_sequence(
        self,
        input_result: Result[T, E],
        debug: bool = False
    ) -> ExecutionResult[Result[U, E], E]:
        """
        Run tasks in order, passing Result→Result.
        Returns:
          - .result: the final Result[U,E]
          - .trace:  None or Trace[Result,U,E] of each step
        """
        start = time.perf_counter()
        steps: List[Tuple[str, Result[Any, E]]] = []
        current = input_result

        for task in self.tasks:
            try:
                current = await task(current)
            except Exception as e:
                logger.exception(f"[{self.name}] run_sequence exception in {task.task_name}")
                current = Err(str(e))
            if debug:
                steps.append((task.task_name, current))
            if current.is_err() and not debug:
                break

        elapsed = time.perf_counter() - start
        return ExecutionResult(
            result=current,
            trace=Trace(steps=steps) if debug else None,
            execution_time=elapsed
        )

    @staticmethod
    async def run_parallel(
        pipelines: List["AsyncPipeline[T, E]"],
        inputs: List[Result[T, E]],
        debug: bool = False
    ) -> ExecutionResult[List[Result[U, E]], E]:
        """
        Execute several pipelines concurrently, each with a single input.
        Returns:
          - .result: List[Result[U,E]]
          - .trace:  None or Traces[List[Result],E]
        """
        if len(pipelines) != len(inputs):
            raise AssertionError("Each pipeline needs a corresponding input Result")

        start = time.perf_counter()
        coros = [p.run_sequence(inp, debug) for p, inp in zip(pipelines, inputs)]
        gathered = await asyncio.gather(*coros, return_exceptions=True)

        results: List[Result[U, E]] = []
        traces: List[Trace[U, E]] = []

        for pipeline, exec_res in zip(pipelines, gathered):
            if isinstance(exec_res, Exception):
                # unexpected exception
                err = Err(f"Exception in pipeline '{pipeline.name}': {exec_res}")
                results.append(err)
                if debug:
                    traces.append(Trace(steps=[(pipeline.name, err)]))
                continue

            # exec_res is ExecutionResult[Result[U,E],E]
            assert isinstance(exec_res, ExecutionResult)
            results.append(exec_res.result)
            if debug and exec_res.trace is not None:
                traces.append(exec_res.trace)  # a Trace[T,E]

        elapsed = time.perf_counter() - start
        return ExecutionResult(
            result=results,
            trace=Traces(pipelines=traces) if debug else None,
            execution_time=elapsed
        )

    def __str__(self) -> str:
        names = ", ".join(t.task_name for t in self.tasks)
        return f"{self.name}([{names}])"

    def __repr__(self) -> str:
        return self.__str__()

replicate_task(task, num_replicas)

Replicate a task multiple times with unique task IDs.

Parameters:

Name Type Description Default
task BaseAsyncTask[T, E]

The async task to replicate

required
num_replicas int

Number of replicas to create

required

Returns:

Type Description
List[BaseAsyncTask[T, E]]

List[BaseAsyncTask[T, E]]: List of replicated tasks with unique IDs

Source code in neopipe/async_pipeline.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def replicate_task(self, task: BaseAsyncTask[T, E], num_replicas: int) -> List[BaseAsyncTask[T, E]]:
    """
    Replicate a task multiple times with unique task IDs.

    Args:
        task: The async task to replicate
        num_replicas: Number of replicas to create

    Returns:
        List[BaseAsyncTask[T, E]]: List of replicated tasks with unique IDs
    """
    replicas = []
    for _ in range(num_replicas):
        # Create a deep copy of the task
        replica = copy.deepcopy(task)
        # Assign a new unique task ID
        replica.task_id = uuid.uuid4()
        replicas.append(replica)
    return replicas

run(inputs, debug=False) async

Execute each task concurrently (1:1 to inputs). Returns: - .result: List[Result[U,E]] - .trace: None or Trace[List[Result],E]

Source code in neopipe/async_pipeline.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def run(
    self,
    inputs: List[Result[T, E]],
    debug: bool = False
) -> ExecutionResult[List[Result[U, E]], E]:
    """
    Execute each task concurrently (1:1 to inputs).
    Returns:
      - .result: List[Result[U,E]]
      - .trace:  None or Trace[List[Result],E]
    """
    if len(inputs) != len(self.tasks):
        return Err("Number of inputs must match number of tasks")

    start = time.perf_counter()
    coros = [task(inp) for task, inp in zip(self.tasks, inputs)]
    try:
        results: List[Result[U, E]] = await asyncio.gather(*coros)
    except Exception as e:
        logger.exception(f"[{self.name}] run() exception")
        return Err(str(e))

    steps: List[Tuple[str, Result[U, E]]] = []
    for task, res in zip(self.tasks, results):
        if debug:
            steps.append((task.task_name, res))
    elapsed = time.perf_counter() - start

    return ExecutionResult(
        result=results,
        trace=Trace(steps=steps) if debug else None,
        execution_time=elapsed
    )

run_parallel(pipelines, inputs, debug=False) async staticmethod

Execute several pipelines concurrently, each with a single input. Returns: - .result: List[Result[U,E]] - .trace: None or Traces[List[Result],E]

Source code in neopipe/async_pipeline.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@staticmethod
async def run_parallel(
    pipelines: List["AsyncPipeline[T, E]"],
    inputs: List[Result[T, E]],
    debug: bool = False
) -> ExecutionResult[List[Result[U, E]], E]:
    """
    Execute several pipelines concurrently, each with a single input.
    Returns:
      - .result: List[Result[U,E]]
      - .trace:  None or Traces[List[Result],E]
    """
    if len(pipelines) != len(inputs):
        raise AssertionError("Each pipeline needs a corresponding input Result")

    start = time.perf_counter()
    coros = [p.run_sequence(inp, debug) for p, inp in zip(pipelines, inputs)]
    gathered = await asyncio.gather(*coros, return_exceptions=True)

    results: List[Result[U, E]] = []
    traces: List[Trace[U, E]] = []

    for pipeline, exec_res in zip(pipelines, gathered):
        if isinstance(exec_res, Exception):
            # unexpected exception
            err = Err(f"Exception in pipeline '{pipeline.name}': {exec_res}")
            results.append(err)
            if debug:
                traces.append(Trace(steps=[(pipeline.name, err)]))
            continue

        # exec_res is ExecutionResult[Result[U,E],E]
        assert isinstance(exec_res, ExecutionResult)
        results.append(exec_res.result)
        if debug and exec_res.trace is not None:
            traces.append(exec_res.trace)  # a Trace[T,E]

    elapsed = time.perf_counter() - start
    return ExecutionResult(
        result=results,
        trace=Traces(pipelines=traces) if debug else None,
        execution_time=elapsed
    )

run_sequence(input_result, debug=False) async

Run tasks in order, passing Result→Result. Returns: - .result: the final Result[U,E] - .trace: None or Trace[Result,U,E] of each step

Source code in neopipe/async_pipeline.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
async def run_sequence(
    self,
    input_result: Result[T, E],
    debug: bool = False
) -> ExecutionResult[Result[U, E], E]:
    """
    Run tasks in order, passing Result→Result.
    Returns:
      - .result: the final Result[U,E]
      - .trace:  None or Trace[Result,U,E] of each step
    """
    start = time.perf_counter()
    steps: List[Tuple[str, Result[Any, E]]] = []
    current = input_result

    for task in self.tasks:
        try:
            current = await task(current)
        except Exception as e:
            logger.exception(f"[{self.name}] run_sequence exception in {task.task_name}")
            current = Err(str(e))
        if debug:
            steps.append((task.task_name, current))
        if current.is_err() and not debug:
            break

    elapsed = time.perf_counter() - start
    return ExecutionResult(
        result=current,
        trace=Trace(steps=steps) if debug else None,
        execution_time=elapsed
    )

SyncPipeline

Bases: Generic[T, E]

A pipeline that executes BaseSyncTasks sequentially, passing Result[T, E] through each step.

Attributes:

Name Type Description
tasks List[BaseSyncTask]

Registered tasks.

pipeline_id UUID

Unique ID for the pipeline.

name str

Optional name for logging/debugging.

Source code in neopipe/sync_pipeline.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class SyncPipeline(Generic[T, E]):
    """
    A pipeline that executes BaseSyncTasks sequentially, passing Result[T, E] through each step.

    Attributes:
        tasks (List[BaseSyncTask]): Registered tasks.
        pipeline_id (UUID): Unique ID for the pipeline.
        name (str): Optional name for logging/debugging.
    """

    def __init__(self, name: Optional[str] = None):
        self.tasks: List[BaseSyncTask] = []
        self.pipeline_id = uuid.uuid4()
        self.name = name or f"SyncPipeline-{self.pipeline_id}"

    @classmethod
    def from_tasks(
        cls, tasks: List[BaseSyncTask], name: Optional[str] = None
    ) -> "SyncPipeline":
        pipeline = cls(name)
        for task in tasks:
            pipeline.add_task(task)
        return pipeline

    def add_task(self, task: BaseSyncTask) -> None:
        if not isinstance(task, BaseSyncTask):
            raise TypeError(
                f"Only BaseSyncTask instances can be added. Got {type(task)}"
            )

        sig = inspect.signature(task.execute)
        params = list(sig.parameters.values())
        non_self_params = [p for p in params if p.name != "self"]

        if len(non_self_params) < 1:
            raise TypeError(
                f"Task '{task.task_name}' must define an 'execute(self, input_result: Result)' method "
                "with at least one input parameter."
            )

        param = non_self_params[0]
        if get_origin(param.annotation) is not Result:
            raise TypeError(
                f"Task '{task.task_name}' first argument must be of type Result[T, E]. Found: {param.annotation}"
            )

        self.tasks.append(task)

    def replicate_task(self, task: BaseSyncTask, num_replicas: int) -> List[BaseSyncTask]:
        """
        Replicate a task multiple times with unique task IDs.

        Args:
            task: The task to replicate
            num_replicas: Number of replicas to create

        Returns:
            List[BaseSyncTask]: List of replicated tasks with unique IDs
        """
        replicas = []
        for _ in range(num_replicas):
            # Create a deep copy of the task
            replica = copy.deepcopy(task)
            # Assign a new unique task ID
            replica.task_id = uuid.uuid4()
            replicas.append(replica)
        return replicas

    def run(
        self,
        input_result: Result[T, E],
        debug: bool = False
    ) -> ExecutionResult[U, E]:
        """
        Run tasks sequentially. Always returns an ExecutionResult whose
        .result is a Result[T, E], and .trace is a Trace if debug=True.
        """
        start = time.perf_counter()
        steps: List[Tuple[str, Result[T, E]]] = []
        result: Result[T, E] = input_result

        if debug:
            steps.append((self.name, result))

        for task in self.tasks:
            name = task.task_name
            try:
                result = task(result)
            except Exception as ex:
                logger.exception(f"[{self.name}] Exception in {name}")
                result = Err(f"Exception in task {name}: {ex}")

            if debug:
                steps.append((name, result))
            # note: we continue even if Err, to record full trace

            if result.is_err() and not debug:
                break

        elapsed = time.perf_counter() - start
        return ExecutionResult(
            result=result,
            trace=Trace(steps=steps) if debug else None,
            execution_time=elapsed
        )

    @staticmethod
    def run_parallel(
        pipelines: List["SyncPipeline[T, E]"],
        inputs: List[Result[T, E]],
        max_workers: int = 4,
        debug: bool = False
    ) -> ExecutionResult[List[Result[U, E]], E]:
        """
        Execute multiple pipelines concurrently. Returns ExecutionResult where
        .result is List[Result[T, E]] (one per pipeline), and .trace is
        Traces(...) if debug=True.
        """
        if len(pipelines) != len(inputs):
            raise AssertionError("Each pipeline needs a corresponding input Result")

        start = time.perf_counter()
        results: List[Result[T, E]] = [None] * len(pipelines)
        pipeline_traces: List[Trace[T, E]] = []

        with ThreadPoolExecutor(max_workers=max_workers) as pool:
            futures = {
                pool.submit(p.run, inp, debug): idx
                for idx, (p, inp) in enumerate(zip(pipelines, inputs))
            }

            for fut in as_completed(futures):
                idx = futures[fut]
                pipe = pipelines[idx]
                try:
                    exec_res = fut.result()  # ExecutionResult[T, E]
                except Exception as ex:
                    logger.exception(f"[{pipe.name}] Unhandled exception")
                    results[idx] = Err(f"Exception in pipeline '{pipe.name}': {ex}")
                    if not debug:
                        break
                    exec_res = ExecutionResult(
                        result=results[idx],
                        trace=Trace(steps=[(pipe.name, results[idx])]),
                        execution_time=0.0
                    )

                results[idx] = exec_res.result
                if debug and exec_res.trace is not None:
                    # exec_res.trace is a Trace[T,E]
                    pipeline_traces.append(exec_res.trace)

                if results[idx].is_err() and not debug:
                    break

        elapsed = time.perf_counter() - start
        return ExecutionResult(
            result=results,
            trace=Traces(pipelines=pipeline_traces) if debug else None,
            execution_time=elapsed
        )

    def __str__(self) -> str:
        task_list = "\n  ".join(task.task_name for task in self.tasks)
        return f"{self.name} with {len(self.tasks)} task(s):\n  {task_list}"

    def __repr__(self) -> str:
        return self.__str__()

replicate_task(task, num_replicas)

Replicate a task multiple times with unique task IDs.

Parameters:

Name Type Description Default
task BaseSyncTask

The task to replicate

required
num_replicas int

Number of replicas to create

required

Returns:

Type Description
List[BaseSyncTask]

List[BaseSyncTask]: List of replicated tasks with unique IDs

Source code in neopipe/sync_pipeline.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def replicate_task(self, task: BaseSyncTask, num_replicas: int) -> List[BaseSyncTask]:
    """
    Replicate a task multiple times with unique task IDs.

    Args:
        task: The task to replicate
        num_replicas: Number of replicas to create

    Returns:
        List[BaseSyncTask]: List of replicated tasks with unique IDs
    """
    replicas = []
    for _ in range(num_replicas):
        # Create a deep copy of the task
        replica = copy.deepcopy(task)
        # Assign a new unique task ID
        replica.task_id = uuid.uuid4()
        replicas.append(replica)
    return replicas

run(input_result, debug=False)

Run tasks sequentially. Always returns an ExecutionResult whose .result is a Result[T, E], and .trace is a Trace if debug=True.

Source code in neopipe/sync_pipeline.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def run(
    self,
    input_result: Result[T, E],
    debug: bool = False
) -> ExecutionResult[U, E]:
    """
    Run tasks sequentially. Always returns an ExecutionResult whose
    .result is a Result[T, E], and .trace is a Trace if debug=True.
    """
    start = time.perf_counter()
    steps: List[Tuple[str, Result[T, E]]] = []
    result: Result[T, E] = input_result

    if debug:
        steps.append((self.name, result))

    for task in self.tasks:
        name = task.task_name
        try:
            result = task(result)
        except Exception as ex:
            logger.exception(f"[{self.name}] Exception in {name}")
            result = Err(f"Exception in task {name}: {ex}")

        if debug:
            steps.append((name, result))
        # note: we continue even if Err, to record full trace

        if result.is_err() and not debug:
            break

    elapsed = time.perf_counter() - start
    return ExecutionResult(
        result=result,
        trace=Trace(steps=steps) if debug else None,
        execution_time=elapsed
    )

run_parallel(pipelines, inputs, max_workers=4, debug=False) staticmethod

Execute multiple pipelines concurrently. Returns ExecutionResult where .result is List[Result[T, E]] (one per pipeline), and .trace is Traces(...) if debug=True.

Source code in neopipe/sync_pipeline.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@staticmethod
def run_parallel(
    pipelines: List["SyncPipeline[T, E]"],
    inputs: List[Result[T, E]],
    max_workers: int = 4,
    debug: bool = False
) -> ExecutionResult[List[Result[U, E]], E]:
    """
    Execute multiple pipelines concurrently. Returns ExecutionResult where
    .result is List[Result[T, E]] (one per pipeline), and .trace is
    Traces(...) if debug=True.
    """
    if len(pipelines) != len(inputs):
        raise AssertionError("Each pipeline needs a corresponding input Result")

    start = time.perf_counter()
    results: List[Result[T, E]] = [None] * len(pipelines)
    pipeline_traces: List[Trace[T, E]] = []

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {
            pool.submit(p.run, inp, debug): idx
            for idx, (p, inp) in enumerate(zip(pipelines, inputs))
        }

        for fut in as_completed(futures):
            idx = futures[fut]
            pipe = pipelines[idx]
            try:
                exec_res = fut.result()  # ExecutionResult[T, E]
            except Exception as ex:
                logger.exception(f"[{pipe.name}] Unhandled exception")
                results[idx] = Err(f"Exception in pipeline '{pipe.name}': {ex}")
                if not debug:
                    break
                exec_res = ExecutionResult(
                    result=results[idx],
                    trace=Trace(steps=[(pipe.name, results[idx])]),
                    execution_time=0.0
                )

            results[idx] = exec_res.result
            if debug and exec_res.trace is not None:
                # exec_res.trace is a Trace[T,E]
                pipeline_traces.append(exec_res.trace)

            if results[idx].is_err() and not debug:
                break

    elapsed = time.perf_counter() - start
    return ExecutionResult(
        result=results,
        trace=Traces(pipelines=pipeline_traces) if debug else None,
        execution_time=elapsed
    )