Basic Usage: Sync Pipeline¶
The Sync Pipeline ties together multiple BaseSyncTask
instances—both function‑based and class‑based—into a single, sequential workflow. Each task consumes a Result[T, E]
and returns a new Result[U, E]
. The pipeline stops on the first failure and can optionally emit a full trace for debugging.
Installation¶
pip install neopipe
Imports¶
from neopipe.result import Result, Ok, Err
from neopipe.task import FunctionSyncTask, ClassSyncTask
from neopipe.sync_pipeline import SyncPipeline
1. Define Your Tasks¶
1.1 Function‑based Tasks¶
@FunctionSyncTask.decorator(retries=2)
def load_users(res: Result[None, str]) -> Result[list[dict], str]:
# Simulate loading from a database
return Ok([
{"id": 1, "name": "Alice", "active": True},
{"id": 2, "name": "Bob", "active": False},
{"id": 3, "name": "Carol", "active": True},
])
@FunctionSyncTask.decorator()
def filter_active(res: Result[list[dict], str]) -> Result[list[dict], str]:
if res.is_err():
return res
active = [u for u in res.unwrap() if u["active"]]
return Ok(active)
1.2 Class‑based Tasks¶
class ExtractNamesTask(ClassSyncTask[list[dict], str]):
def execute(self, res: Result[list[dict], str]) -> Result[list[str], str]:
if res.is_err():
return res
names = [u["name"] for u in res.unwrap()]
return Ok(names)
class JoinNamesTask(ClassSyncTask[list[str], str]):
def execute(self, res: Result[list[str], str]) -> Result[str, str]:
if res.is_err():
return res
return Ok(", ".join(res.unwrap()))
2. Build the Pipeline¶
pipeline = SyncPipeline.from_tasks([
load_users,
filter_active,
ExtractNamesTask(),
JoinNamesTask()
], name="UserNamePipeline")
3. Run the Pipeline¶
3.1 Simple Run¶
result = pipeline.run(Ok(None))
if result.is_ok():
print("Final output:", result.unwrap())
else:
print("Pipeline failed:", result.err())
Expected Output:
Final output: Alice, Carol
3.2 Debug Mode (Trace)¶
debug_result = pipeline.run(Ok(None), debug=True)
if debug_result.is_ok():
final, trace = debug_result.unwrap()
print("Final:", final)
print("Trace:")
for task_name, step_res in trace:
print(f" - {task_name}: {step_res}")
else:
print("Pipeline failed:", debug_result.err())
Sample Trace:
Final: Alice, Carol
Trace:
- load_users: Ok([{'id':1,...}, ...])
- filter_active: Ok([{'id':1,...}, {'id':3,...}])
- ExtractNamesTask: Ok(['Alice','Carol'])
- JoinNamesTask: Ok('Alice, Carol')
4. Chaining Pipelines in Parallel¶
You can execute multiple SyncPipeline
instances concurrently with run_parallel
. Each pipeline runs in its own thread and returns a list of PipelineResult
objects (and, in debug mode, a PipelineTrace
with per‑task details).
4.1 Define your pipelines¶
from neopipe.result import Result, Ok
from neopipe.task import FunctionSyncTask, ClassSyncTask
from neopipe.sync_pipeline import SyncPipeline
@FunctionSyncTask.decorator()
def compute_length(res: Result[str, str]) -> Result[int, str]:
# returns the length of the string
return Ok(len(res.unwrap())) if res.is_ok() else res
class MultiplyTask(ClassSyncTask[int, str]):
def __init__(self, factor: int):
super().__init__()
self.factor = factor
def execute(self, res: Result[int, str]) -> Result[int, str]:
# multiplies the integer by the given factor
return Ok(res.unwrap() * self.factor) if res.is_ok() else res
# Pipeline A: compute length → multiply by 2
pA = SyncPipeline.from_tasks(
[compute_length, MultiplyTask(2)],
name="LengthX2"
)
# Pipeline B: compute length → multiply by 3
pB = SyncPipeline.from_tasks(
[compute_length, MultiplyTask(3)],
name="LengthX3"
)
4.2 Run in parallel (non‑debug)¶
You can run the pipelines in parallel by passing a list of inputs, one for each pipeline:
inputs = [Ok("hello"), Ok("world!")] # one input per pipeline
result = SyncPipeline.run_parallel([pA, pB], inputs)
if result.is_ok():
pipeline_results = result.unwrap()
# pipeline_results is a List[PipelineResult]:
# [
# PipelineResult(name="LengthX2", result=10),
# PipelineResult(name="LengthX3", result=18)
# ]
for pr in pipeline_results:
print(f"{pr.name} → {pr.result}")
else:
print("Error:", result.err())
4.3 Run in parallel (debug mode)¶
Capture a full per‑task trace alongside the final results:
res_debug = SyncPipeline.run_parallel([pA, pB], inputs, debug=True)
if res_debug.is_ok():
pipeline_results, trace = res_debug.unwrap()
# pipeline_results same as above
# trace is a PipelineTrace(pipelines=[SinglePipelineTrace(...), ...])
# Print results
for pr in pipeline_results:
print(f"{pr.name} final → {pr.result}")
# Inspect per‑task trace
for single in trace.pipelines:
print(f"\nTrace for {single.name}:")
for task_name, task_res in single.tasks:
print(f" {task_name} → {task_res}")
else:
print("Error:", res_debug.err())