Skip to content

Async Pipeline — Basic Usage

The AsyncPipeline lets you run independent BaseAsyncTask instances concurrently—1:1 matching each task with its input Result. You can also capture a full execution trace by enabling debug mode.


Installation

pip install neopipe

Imports

import asyncio
from neopipe.result import Result, Ok, Err
from neopipe.async_task import FunctionAsyncTask, ClassAsyncTask
from neopipe.async_pipeline import AsyncPipeline

1. Define Your Async Tasks

1.1 Function‑based Tasks

@FunctionAsyncTask.decorator(retries=2)
async def fetch_user(res: Result[int, str]) -> Result[dict, str]:
    # Simulate an async I/O call
    if res.is_ok():
        await asyncio.sleep(0.1)
        user_id = res.unwrap()
        return Ok({"id": user_id, "name": f"User{user_id}"})
    return res
@FunctionAsyncTask.decorator()
async def validate_user(res: Result[dict, str]) -> Result[dict, str]:
    if res.is_ok():
        user = res.unwrap()
        if not user["id"] or not user["name"]:
            return Err("Invalid user data")
        return Ok(user)
    return res

1.2 Class‑based Tasks

class EnrichUserTask(ClassAsyncTask[dict, str]):
    async def execute(self, res: Result[dict, str]) -> Result[dict, str]:
        if res.is_ok():
            user = res.unwrap()
            # add some computed field
            user["active"] = (user["id"] % 2 == 0)
            return Ok(user)
        return res

2. Build the Pipeline

pipeline = AsyncPipeline.from_tasks([
    fetch_user,
    validate_user,
    EnrichUserTask()
], name="UserPipeline")

3. Run the Pipeline

3.1 Simple Concurrent Run

async def main():
    inputs = [Ok(1), Ok(2), Ok(3)]  # three independent runs
    result = await pipeline.run(inputs)

    if result.is_ok():
        users = result.unwrap()
        print("Enriched users:", users)
    else:
        print("Pipeline failed:", result.err())

asyncio.run(main())

Expected Output:

Enriched users: [
  {"id":1,"name":"User1","active":False},
  {"id":2,"name":"User2","active":True},
  {"id":3,"name":"User3","active":False}
]

3.2 Debug Mode (Trace)

async def debug_main():
    inputs = [Ok(10), Ok(20)]
    debug_result = await pipeline.run(inputs, debug=True)

    if debug_result.is_ok():
        outputs, trace = debug_result.unwrap()
        print("Final outputs:", outputs)
        print("Trace details:")
        for task_name, task_res in trace:
            print(f" - {task_name}: {task_res}")
    else:
        print("Pipeline error:", debug_result.err())

asyncio.run(debug_main())

4. Async Pipeline Parallel Execution

You can run multiple AsyncPipeline instances concurrently with run_parallel(). Each pipeline executes its sequence of tasks (via run_sequence) with its own input, and you get back a list of PipelineResult objects—and, in debug mode, a full PipelineTrace.


4.1. Define your pipelines

import asyncio
from neopipe.result import Result, Ok, Err
from neopipe.async_task import FunctionAsyncTask, ClassAsyncTask
from neopipe.async_pipeline import AsyncPipeline

# — Sequential pipeline: load → filter → extract names
@FunctionAsyncTask.decorator()
async def load_users(res: Result[None, str]) -> Result[list[dict], str]:
    return Ok([
        {"id": 1, "name": "Alice", "active": True},
        {"id": 2, "name": "Bob",   "active": False},
        {"id": 3, "name": "Carol", "active": True},
    ])

@FunctionAsyncTask.decorator()
async def filter_active(res: Result[list[dict], str]) -> Result[list[dict], str]:
    if res.is_err():
        return res
    return Ok([u for u in res.unwrap() if u["active"]])

class ExtractNamesTask(ClassAsyncTask[list[dict], str]):
    async def execute(self, res: Result[list[dict], str]) -> Result[list[str], str]:
        if res.is_err():
            return res
        return Ok([u["name"] for u in res.unwrap()])

seq_pipeline = AsyncPipeline.from_tasks(
    [load_users, filter_active, ExtractNamesTask()],
    name="UserSeq"
)

# — Independent pipelines: fetch and validate
@FunctionAsyncTask.decorator()
async def fetch_user(res: Result[int, str]) -> Result[dict, str]:
    if res.is_ok():
        await asyncio.sleep(0.05)
        uid = res.unwrap()
        return Ok({"id": uid, "name": f"User{uid}"})
    return res

@FunctionAsyncTask.decorator()
async def validate_user(res: Result[dict, str]) -> Result[dict, str]:
    if res.is_ok():
        user = res.unwrap()
        if not user.get("id") or not user.get("name"):
            return Err("Invalid")
        return Ok(user)
    return res

fetch_pipeline    = AsyncPipeline.from_tasks([fetch_user],    name="FetchUser")
validate_pipeline = AsyncPipeline.from_tasks([validate_user], name="ValidateUser")

4.2. Run the pipelines in parallel

async def main():
    inputs = [
        Ok(None),                  # seq_pipeline: needs None
        Ok(42),                    # fetch_pipeline: user ID
        Ok({"id": 99, "name": "X"})# validate_pipeline: user dict
    ]

    # debug=False ⇒ Ok[List[PipelineResult]]
    res = await AsyncPipeline.run_parallel(
        [seq_pipeline, fetch_pipeline, validate_pipeline],
        inputs
    )

    if res.is_ok():
        for pr in res.unwrap():
            print(f"{pr.name}{pr.result}")
    else:
        print("Error:", res.err())

asyncio.run(main())

Expected Output:

UserSeq      → ['Alice', 'Carol']
FetchUser    → {'id':42, 'name':'User42'}
ValidateUser → {'id':99, 'name':'X'}

4.3. Run the pipelines in parallel (debug mode)

async def debug_main():
    inputs = [Ok(None), Ok(7), Ok({"id":7,"name":"User7"})]

    # debug=True ⇒ Ok((List[PipelineResult], PipelineTrace))
    res = await AsyncPipeline.run_parallel(
        [seq_pipeline, fetch_pipeline, validate_pipeline],
        inputs,
        debug=True
    )

    if res.is_ok():
        results, trace = res.unwrap()

        # Print final results
        for pr in results:
            print(f"{pr.name}{pr.result}")

        # Print per-pipeline, per-task trace
        for single in trace.pipelines:
            print(f"\nTrace for {single.name}:")
            for task_name, task_res in single.tasks:
                print(f"  {task_name}{task_res}")
    else:
        print("Error:", res.err())

asyncio.run(debug_main())

Expected Output:

UserSeq       ['Alice', 'Carol']
FetchUser     {'id':7, 'name':'User7'}
ValidateUser  {'id':7, 'name':'User7'}

Trace for UserSeq:
  load_users        Ok([...])
  filter_active     Ok([...])
  ExtractNamesTask  Ok(['Alice','Carol'])

Trace for FetchUser:
  fetch_user        Ok({'id':7,'name':'User7'})

Trace for ValidateUser:
  validate_user     Ok({'id':7,'name':'User7'})

5. Notes

  • 1:1 matching: The inputs list must be the same length as your tasks list.
  • Short‑circuit: The first Err stops the entire pipeline (unless you inspect partial trace).
  • Retries: Any task annotated with retries > 1 will automatically retry on exceptions.
  • Trace: In debug mode you get a list of (task_name, Result) in the order tasks were run.