Skip to content

Task Module

The Sync Task system provides a uniform, retryable wrapper around functions and classes that operate on Result[T, E]. Instead of raising exceptions, every task consumes a Result and returns a new Result, enabling safe, composable pipelines.


Overview

  • BaseSyncTask
    Abstract base that handles:
  • Retry with exponential backoff
  • Task identification (task_id, task_name)
  • Logging on attempts, success, and failure

FunctionSyncTask

Decorator wrapper for free functions: ```python @FunctionSyncTask.decorator(retries=2) def my_task(res: Result[int, str]) -> Result[int, str]: if res.is_ok(): return Ok(res.unwrap() + 1) return res

## ClassSyncTask
Subclass for stateful or multi‑step logic:

```python
class MultiplyTask(ClassSyncTask[int, str]):
    def __init__(self, multiplier: int):
        super().__init__(retries=3)
        self.multiplier = multiplier

    def execute(self, res: Result[int, str]) -> Result[int, str]:
        if res.is_ok():
            return Ok(res.unwrap() * self.multiplier)
        return res

BaseSyncTask

from neopipe.task import BaseSyncTask
from neopipe.result import Result, Ok, Err

class EchoTask(BaseSyncTask[str, str]):
    def execute(self, res: Result[str, str]) -> Result[str, str]:
        return Ok(res.unwrap()) if res.is_ok() else res

task = EchoTask()
print(task(Ok("hello")))  # Ok("hello")

FunctionSyncTask

from neopipe.task import FunctionSyncTask
from neopipe.result import Result, Ok, Err

@FunctionSyncTask.decorator(retries=2)
def add_ten(res: Result[int, str]) -> Result[int, str]:
    if res.is_ok():
        return Ok(res.unwrap() + 10)
    return res

print(add_ten(Ok(5)))  # Ok(15)

ClassSyncTask

from neopipe.task import ClassSyncTask
from neopipe.result import Result, Ok

class SquareTask(ClassSyncTask[int, str]):
    def execute(self, res: Result[int, str]) -> Result[int, str]:
        if res.is_ok():
            return Ok(res.unwrap() ** 2)
        return res

task = SquareTask()
print(task(Ok(3)))  # Ok(9)

Read the Pipeline Basics to see how to chain Sync Tasks.

FunctionAsyncTask

from neopipe.async_task import FunctionAsyncTask
from neopipe.result import Result, Ok, Err

@FunctionAsyncTask.decorator(retries=2)
async def shout(res: Result[str, str]) -> Result[str, str]:
    if res.is_ok():
        return Ok(res.unwrap().upper())
    return res

print(await shout(Ok("hello")))  # Ok("HELLO")

ClassAsyncTask

from neopipe.async_task import ClassAsyncTask
from neopipe.result import Result, Ok

class ReverseTask(ClassAsyncTask[str, str]):
    async def execute(self, res: Result[str, str]) -> Result[str, str]:
        if res.is_ok():
            return Ok(res.unwrap()[::-1])
        return res

task = ReverseTask()
print(await task(Ok("abc")))  # Ok("cba")