Skip to content

Result

ExecutionResult dataclass

Bases: Generic[T, E]

The unified result container for piping runs.

Attributes:

Name Type Description
result Union[Result[T, E], List[Result[T, E]]]

Either a Result[T, E] (single pipeline) or List[Result[T, E]] (parallel pipelines).

trace Optional[Union[Trace[T, E], Traces[T, E]]]

Optional Trace[T, E] or Traces[T, E] if debug=True.

execution_time float

Elapsed time in seconds.

time_unit str

Always 's'.

Source code in neopipe/result.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@dataclass
class ExecutionResult(Generic[T, E]):
    """
    The unified result container for piping runs.

    Attributes:
      result: Either a Result[T, E] (single pipeline) or
              List[Result[T, E]] (parallel pipelines).
      trace:  Optional Trace[T, E] or Traces[T, E] if debug=True.
      execution_time: Elapsed time in seconds.
      time_unit: Always 's'.
    """
    result: Union[Result[T, E], List[Result[T, E]]]
    trace: Optional[Union[Trace[T, E], Traces[T, E]]]
    execution_time: float
    time_unit: str = "s"

    def value(self) -> Union[T, List[T]]:
        """
        Extract the inner success value(s). If any entry is Err, raises.
        """
        if isinstance(self.result, list):
            return [r.unwrap() for r in self.result]
        return self.result.unwrap()

    def unwrap(self) -> Union[Result[T, E], List[Result[T, E]]]:
        if self.is_ok():
            return self.result
        raise UnwrapError(f"Called unwrap on Err: {self.result}")

    def is_ok(self) -> bool:
        if isinstance(self.result, list):
            return all(r.is_ok() for r in self.result)
        return self.result.is_ok()

    def is_err(self) -> bool:
        if isinstance(self.result, list):
            return any(r.is_err() for r in self.result)
        return self.result.is_err()


    def __len__(self) -> int:

        if isinstance(self.result, list):
            return len(self.result)
        return 1

    def __repr__(self) -> str:
        base = (
            f"ExecutionResult(result={self.result!r}, "
            f"execution_time={self.execution_time:.3f}{self.time_unit}"
        )
        if self.trace is not None:
            base += f", trace={self.trace!r}"
        base += ")"
        return base

    def __str__(self) -> str:
        return self.__repr__()

value()

Extract the inner success value(s). If any entry is Err, raises.

Source code in neopipe/result.py
342
343
344
345
346
347
348
def value(self) -> Union[T, List[T]]:
    """
    Extract the inner success value(s). If any entry is Err, raises.
    """
    if isinstance(self.result, list):
        return [r.unwrap() for r in self.result]
    return self.result.unwrap()

Result dataclass

Bases: Generic[T, E]

A Rust-style Result type for monadic error handling in Python.

Source code in neopipe/result.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@dataclass(frozen=True)
class Result(Generic[T, E]):
    """A Rust-style Result type for monadic error handling in Python."""

    _is_ok: bool
    _value: Union[T, E]

    @staticmethod
    def Ok(value: T) -> Self:
        """
        Create a Result representing a successful value.

        Args:
            value: The success value.

        Returns:
            Result[T, E]: An Ok variant.
        """
        return Result(True, value)

    @staticmethod
    def Err(error: E) -> Self:
        """
        Create a Result representing an error.

        Args:
            error: The error value.

        Returns:
            Result[T, E]: An Err variant.
        """
        return Result(False, error)

    def is_ok(self) -> bool:
        """
        Check if the result is Ok.

        Returns:
            bool: True if Ok, False otherwise.
        """
        return self._is_ok

    def is_err(self) -> bool:
        """
        Check if the result is Err.

        Returns:
            bool: True if Err, False otherwise.
        """
        return not self._is_ok

    def ok(self) -> Union[T, None]:
        """
        Get the success value if available.

        Returns:
            T | None: The Ok value or None.
        """
        return self._value if self._is_ok else None

    def err(self) -> Union[E, None]:
        """
        Get the error value if available.

        Returns:
            E | None: The Err value or None.
        """
        return self._value if not self._is_ok else None

    def map(self, op: Callable[[T], U]) -> Result[U, E]:
        """
        Apply a function to the Ok value.

        Args:
            op: A function that transforms the success value.

        Returns:
            Result[U, E]: A new Result with transformed success or the original error.
        """
        if self._is_ok:
            return Result.Ok(op(self._value))  # type: ignore
        return Result.Err(self._value)  # type: ignore

    async def map_async(self, op: Callable[[T], Awaitable[U]]) -> Result[U, E]:
        """
        Asynchronously apply a function to the Ok value.

        Args:
            op: An async function that transforms the success value.

        Returns:
            Result[U, E]: A new Result with transformed success or the original error.
        """
        if self._is_ok:
            return Result.Ok(await op(self._value))  # type: ignore
        return Result.Err(self._value)  # type: ignore

    def map_err(self, op: Callable[[E], U]) -> Result[T, U]:
        """
        Apply a function to the Err value.

        Args:
            op: A function that transforms the error value.

        Returns:
            Result[T, U]: A new Result with transformed error or the original success.
        """
        if self._is_ok:
            return Result.Ok(self._value)  # type: ignore
        return Result.Err(op(self._value))  # type: ignore

    async def map_err_async(self, op: Callable[[E], Awaitable[U]]) -> Result[T, U]:
        """
        Asynchronously apply a function to the Err value.

        Args:
            op: An async function that transforms the error value.

        Returns:
            Result[T, U]: A new Result with transformed error or the original success.
        """
        if self._is_ok:
            return Result.Ok(self._value)  # type: ignore
        return Result.Err(await op(self._value))  # type: ignore

    def and_then(self, op: Callable[[T], Result[U, E]]) -> Result[U, E]:
        """
        Chain another Result-returning function if current is Ok.

        Args:
            op: A function that takes a success value and returns a new Result.

        Returns:
            Result[U, E]: The new chained result, or the original error.
        """
        if self._is_ok:
            return op(self._value)  # type: ignore
        return Result.Err(self._value)  # type: ignore

    async def and_then_async(
        self, op: Callable[[T], Awaitable[Result[U, E]]]
    ) -> Result[U, E]:
        """
        Asynchronously chain another Result-returning function if current is Ok.

        Args:
            op: An async function that takes a success value and returns a new Result.

        Returns:
            Result[U, E]: The new chained result, or the original error.
        """
        if self._is_ok:
            return await op(self._value)  # type: ignore
        return Result.Err(self._value)  # type: ignore

    def unwrap(self) -> T:
        """
        Extract the success value or raise an error.

        Returns:
            T: The Ok value.

        Raises:
            UnwrapError: If the result is Err.
        """
        if self._is_ok:
            return self._value  # type: ignore
        raise UnwrapError(f"Called unwrap on Err: {self._value}")

    def unwrap_or(self, default: T) -> T:
        """
        Return the success value or a default.

        Args:
            default: The fallback value.

        Returns:
            T: The Ok value or the default.
        """
        return self._value if self._is_ok else default  # type: ignore

    def unwrap_or_else(self, op: Callable[[E], T]) -> T:
        """
        Return the success value or a value generated from the error.

        Args:
            op: A function that maps the error to a fallback value.

        Returns:
            T: The Ok value or a fallback derived from the error.
        """
        return self._value if self._is_ok else op(self._value)  # type: ignore

    def expect(self, msg: str) -> T:
        """
        Extract the success value or raise with a custom message.

        Args:
            msg: The message to include in the exception.

        Returns:
            T: The Ok value.

        Raises:
            UnwrapError: If the result is Err.
        """
        if self._is_ok:
            return self._value  # type: ignore
        raise UnwrapError(f"{msg}: {self._value}")

    def match(self, ok_fn: Callable[[T], U], err_fn: Callable[[E], U]) -> U:
        """
        Pattern match to handle both Ok and Err branches.

        Args:
            ok_fn: Function to handle Ok.
            err_fn: Function to handle Err.

        Returns:
            U: Result of executing the appropriate handler.
        """
        if self._is_ok:
            return ok_fn(self._value)  # type: ignore
        return err_fn(self._value)  # type: ignore

    def to_dict(self) -> dict:
        """Converts the Result to a dictionary.

        Returns:
            dict: The Result as a dictionary
        """
        return asdict(self)

    def to_json(self) -> str:
        """Converts the Result to a JSON string.

        Returns:
            str: The Result as a JSON string
        """
        return json.dumps(self.to_dict())

    def __repr__(self) -> str:
        """
        Return a string representation of the Result.

        Returns:
            str: Ok(value) or Err(error).
        """
        variant = "Ok" if self._is_ok else "Err"
        return f"{variant}({self._value!r})"

Err(error) staticmethod

Create a Result representing an error.

Parameters:

Name Type Description Default
error E

The error value.

required

Returns:

Type Description
Self

Result[T, E]: An Err variant.

Source code in neopipe/result.py
37
38
39
40
41
42
43
44
45
46
47
48
@staticmethod
def Err(error: E) -> Self:
    """
    Create a Result representing an error.

    Args:
        error: The error value.

    Returns:
        Result[T, E]: An Err variant.
    """
    return Result(False, error)

Ok(value) staticmethod

Create a Result representing a successful value.

Parameters:

Name Type Description Default
value T

The success value.

required

Returns:

Type Description
Self

Result[T, E]: An Ok variant.

Source code in neopipe/result.py
24
25
26
27
28
29
30
31
32
33
34
35
@staticmethod
def Ok(value: T) -> Self:
    """
    Create a Result representing a successful value.

    Args:
        value: The success value.

    Returns:
        Result[T, E]: An Ok variant.
    """
    return Result(True, value)

__repr__()

Return a string representation of the Result.

Returns:

Name Type Description
str str

Ok(value) or Err(error).

Source code in neopipe/result.py
258
259
260
261
262
263
264
265
266
def __repr__(self) -> str:
    """
    Return a string representation of the Result.

    Returns:
        str: Ok(value) or Err(error).
    """
    variant = "Ok" if self._is_ok else "Err"
    return f"{variant}({self._value!r})"

and_then(op)

Chain another Result-returning function if current is Ok.

Parameters:

Name Type Description Default
op Callable[[T], Result[U, E]]

A function that takes a success value and returns a new Result.

required

Returns:

Type Description
Result[U, E]

Result[U, E]: The new chained result, or the original error.

Source code in neopipe/result.py
142
143
144
145
146
147
148
149
150
151
152
153
154
def and_then(self, op: Callable[[T], Result[U, E]]) -> Result[U, E]:
    """
    Chain another Result-returning function if current is Ok.

    Args:
        op: A function that takes a success value and returns a new Result.

    Returns:
        Result[U, E]: The new chained result, or the original error.
    """
    if self._is_ok:
        return op(self._value)  # type: ignore
    return Result.Err(self._value)  # type: ignore

and_then_async(op) async

Asynchronously chain another Result-returning function if current is Ok.

Parameters:

Name Type Description Default
op Callable[[T], Awaitable[Result[U, E]]]

An async function that takes a success value and returns a new Result.

required

Returns:

Type Description
Result[U, E]

Result[U, E]: The new chained result, or the original error.

Source code in neopipe/result.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
async def and_then_async(
    self, op: Callable[[T], Awaitable[Result[U, E]]]
) -> Result[U, E]:
    """
    Asynchronously chain another Result-returning function if current is Ok.

    Args:
        op: An async function that takes a success value and returns a new Result.

    Returns:
        Result[U, E]: The new chained result, or the original error.
    """
    if self._is_ok:
        return await op(self._value)  # type: ignore
    return Result.Err(self._value)  # type: ignore

err()

Get the error value if available.

Returns:

Type Description
Union[E, None]

E | None: The Err value or None.

Source code in neopipe/result.py
77
78
79
80
81
82
83
84
def err(self) -> Union[E, None]:
    """
    Get the error value if available.

    Returns:
        E | None: The Err value or None.
    """
    return self._value if not self._is_ok else None

expect(msg)

Extract the success value or raise with a custom message.

Parameters:

Name Type Description Default
msg str

The message to include in the exception.

required

Returns:

Name Type Description
T T

The Ok value.

Raises:

Type Description
UnwrapError

If the result is Err.

Source code in neopipe/result.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def expect(self, msg: str) -> T:
    """
    Extract the success value or raise with a custom message.

    Args:
        msg: The message to include in the exception.

    Returns:
        T: The Ok value.

    Raises:
        UnwrapError: If the result is Err.
    """
    if self._is_ok:
        return self._value  # type: ignore
    raise UnwrapError(f"{msg}: {self._value}")

is_err()

Check if the result is Err.

Returns:

Name Type Description
bool bool

True if Err, False otherwise.

Source code in neopipe/result.py
59
60
61
62
63
64
65
66
def is_err(self) -> bool:
    """
    Check if the result is Err.

    Returns:
        bool: True if Err, False otherwise.
    """
    return not self._is_ok

is_ok()

Check if the result is Ok.

Returns:

Name Type Description
bool bool

True if Ok, False otherwise.

Source code in neopipe/result.py
50
51
52
53
54
55
56
57
def is_ok(self) -> bool:
    """
    Check if the result is Ok.

    Returns:
        bool: True if Ok, False otherwise.
    """
    return self._is_ok

map(op)

Apply a function to the Ok value.

Parameters:

Name Type Description Default
op Callable[[T], U]

A function that transforms the success value.

required

Returns:

Type Description
Result[U, E]

Result[U, E]: A new Result with transformed success or the original error.

Source code in neopipe/result.py
86
87
88
89
90
91
92
93
94
95
96
97
98
def map(self, op: Callable[[T], U]) -> Result[U, E]:
    """
    Apply a function to the Ok value.

    Args:
        op: A function that transforms the success value.

    Returns:
        Result[U, E]: A new Result with transformed success or the original error.
    """
    if self._is_ok:
        return Result.Ok(op(self._value))  # type: ignore
    return Result.Err(self._value)  # type: ignore

map_async(op) async

Asynchronously apply a function to the Ok value.

Parameters:

Name Type Description Default
op Callable[[T], Awaitable[U]]

An async function that transforms the success value.

required

Returns:

Type Description
Result[U, E]

Result[U, E]: A new Result with transformed success or the original error.

Source code in neopipe/result.py
100
101
102
103
104
105
106
107
108
109
110
111
112
async def map_async(self, op: Callable[[T], Awaitable[U]]) -> Result[U, E]:
    """
    Asynchronously apply a function to the Ok value.

    Args:
        op: An async function that transforms the success value.

    Returns:
        Result[U, E]: A new Result with transformed success or the original error.
    """
    if self._is_ok:
        return Result.Ok(await op(self._value))  # type: ignore
    return Result.Err(self._value)  # type: ignore

map_err(op)

Apply a function to the Err value.

Parameters:

Name Type Description Default
op Callable[[E], U]

A function that transforms the error value.

required

Returns:

Type Description
Result[T, U]

Result[T, U]: A new Result with transformed error or the original success.

Source code in neopipe/result.py
114
115
116
117
118
119
120
121
122
123
124
125
126
def map_err(self, op: Callable[[E], U]) -> Result[T, U]:
    """
    Apply a function to the Err value.

    Args:
        op: A function that transforms the error value.

    Returns:
        Result[T, U]: A new Result with transformed error or the original success.
    """
    if self._is_ok:
        return Result.Ok(self._value)  # type: ignore
    return Result.Err(op(self._value))  # type: ignore

map_err_async(op) async

Asynchronously apply a function to the Err value.

Parameters:

Name Type Description Default
op Callable[[E], Awaitable[U]]

An async function that transforms the error value.

required

Returns:

Type Description
Result[T, U]

Result[T, U]: A new Result with transformed error or the original success.

Source code in neopipe/result.py
128
129
130
131
132
133
134
135
136
137
138
139
140
async def map_err_async(self, op: Callable[[E], Awaitable[U]]) -> Result[T, U]:
    """
    Asynchronously apply a function to the Err value.

    Args:
        op: An async function that transforms the error value.

    Returns:
        Result[T, U]: A new Result with transformed error or the original success.
    """
    if self._is_ok:
        return Result.Ok(self._value)  # type: ignore
    return Result.Err(await op(self._value))  # type: ignore

match(ok_fn, err_fn)

Pattern match to handle both Ok and Err branches.

Parameters:

Name Type Description Default
ok_fn Callable[[T], U]

Function to handle Ok.

required
err_fn Callable[[E], U]

Function to handle Err.

required

Returns:

Name Type Description
U U

Result of executing the appropriate handler.

Source code in neopipe/result.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def match(self, ok_fn: Callable[[T], U], err_fn: Callable[[E], U]) -> U:
    """
    Pattern match to handle both Ok and Err branches.

    Args:
        ok_fn: Function to handle Ok.
        err_fn: Function to handle Err.

    Returns:
        U: Result of executing the appropriate handler.
    """
    if self._is_ok:
        return ok_fn(self._value)  # type: ignore
    return err_fn(self._value)  # type: ignore

ok()

Get the success value if available.

Returns:

Type Description
Union[T, None]

T | None: The Ok value or None.

Source code in neopipe/result.py
68
69
70
71
72
73
74
75
def ok(self) -> Union[T, None]:
    """
    Get the success value if available.

    Returns:
        T | None: The Ok value or None.
    """
    return self._value if self._is_ok else None

to_dict()

Converts the Result to a dictionary.

Returns:

Name Type Description
dict dict

The Result as a dictionary

Source code in neopipe/result.py
242
243
244
245
246
247
248
def to_dict(self) -> dict:
    """Converts the Result to a dictionary.

    Returns:
        dict: The Result as a dictionary
    """
    return asdict(self)

to_json()

Converts the Result to a JSON string.

Returns:

Name Type Description
str str

The Result as a JSON string

Source code in neopipe/result.py
250
251
252
253
254
255
256
def to_json(self) -> str:
    """Converts the Result to a JSON string.

    Returns:
        str: The Result as a JSON string
    """
    return json.dumps(self.to_dict())

unwrap()

Extract the success value or raise an error.

Returns:

Name Type Description
T T

The Ok value.

Raises:

Type Description
UnwrapError

If the result is Err.

Source code in neopipe/result.py
172
173
174
175
176
177
178
179
180
181
182
183
184
def unwrap(self) -> T:
    """
    Extract the success value or raise an error.

    Returns:
        T: The Ok value.

    Raises:
        UnwrapError: If the result is Err.
    """
    if self._is_ok:
        return self._value  # type: ignore
    raise UnwrapError(f"Called unwrap on Err: {self._value}")

unwrap_or(default)

Return the success value or a default.

Parameters:

Name Type Description Default
default T

The fallback value.

required

Returns:

Name Type Description
T T

The Ok value or the default.

Source code in neopipe/result.py
186
187
188
189
190
191
192
193
194
195
196
def unwrap_or(self, default: T) -> T:
    """
    Return the success value or a default.

    Args:
        default: The fallback value.

    Returns:
        T: The Ok value or the default.
    """
    return self._value if self._is_ok else default  # type: ignore

unwrap_or_else(op)

Return the success value or a value generated from the error.

Parameters:

Name Type Description Default
op Callable[[E], T]

A function that maps the error to a fallback value.

required

Returns:

Name Type Description
T T

The Ok value or a fallback derived from the error.

Source code in neopipe/result.py
198
199
200
201
202
203
204
205
206
207
208
def unwrap_or_else(self, op: Callable[[E], T]) -> T:
    """
    Return the success value or a value generated from the error.

    Args:
        op: A function that maps the error to a fallback value.

    Returns:
        T: The Ok value or a fallback derived from the error.
    """
    return self._value if self._is_ok else op(self._value)  # type: ignore

Trace dataclass

Bases: Generic[T, E]

A sequential trace of one pipeline: steps is a list of (task_name, Result[T, E]).

Source code in neopipe/result.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
@dataclass
class Trace(Generic[T, E]):
    """
    A sequential trace of one pipeline:
    steps is a list of (task_name, Result[T, E]).
    """
    steps: List[Tuple[str, Result[T, E]]]

    def __getitem__(self, index: int) -> Tuple[str, Result[T, E]]:
        """Get a step by index."""
        return self.steps[index]

    def __iter__(self):
        """Iterate over the steps."""
        return iter(self.steps)

    def __len__(self) -> int:
        return len(self.steps)

    def __repr__(self):
        return f"Trace(steps={self.steps!r})"

__getitem__(index)

Get a step by index.

Source code in neopipe/result.py
287
288
289
def __getitem__(self, index: int) -> Tuple[str, Result[T, E]]:
    """Get a step by index."""
    return self.steps[index]

__iter__()

Iterate over the steps.

Source code in neopipe/result.py
291
292
293
def __iter__(self):
    """Iterate over the steps."""
    return iter(self.steps)

Traces dataclass

Bases: Generic[T, E]

A collection of per-pipeline traces. pipelines is a list of Trace[T, E].

Source code in neopipe/result.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
@dataclass
class Traces(Generic[T, E]):
    """
    A collection of per-pipeline traces.
    pipelines is a list of Trace[T, E].
    """
    pipelines: List[Trace[T, E]]

    def __getitem__(self, index: int) -> Trace[T, E]:
        """Get a trace by index."""
        return self.pipelines[index]

    def __iter__(self):
        """Iterate over the traces."""
        return iter(self.pipelines)

    def __len__(self) -> int:
        return len(self.pipelines)

    def __repr__(self):
        return f"Traces(pipelines={self.pipelines!r})"

__getitem__(index)

Get a trace by index.

Source code in neopipe/result.py
310
311
312
def __getitem__(self, index: int) -> Trace[T, E]:
    """Get a trace by index."""
    return self.pipelines[index]

__iter__()

Iterate over the traces.

Source code in neopipe/result.py
314
315
316
def __iter__(self):
    """Iterate over the traces."""
    return iter(self.pipelines)

UnwrapError

Bases: Exception

Raised when unwrap is called on an Err value.

Source code in neopipe/result.py
12
13
14
class UnwrapError(Exception):
    """Raised when unwrap is called on an Err value."""
    pass

Err(error)

Creates an Err Result with the given error.

Source code in neopipe/result.py
274
275
276
def Err(error: E) -> Result[None, E]:
    """Creates an Err Result with the given error."""
    return Result(False, error)

Ok(value)

Creates an Ok Result with the given value.

Source code in neopipe/result.py
269
270
271
def Ok(value: T) -> Result[T, None]:
    """Creates an Ok Result with the given value."""
    return Result(True, value)