@neuralcontext/pivex
TypeScript icon, indicating that this package has built-in type declarations

0.3.0 • Public • Published

pivex

pivex helps you orchestrate pipelines and workflows that consist of complex asynchronous interactions. This is especially helpful in multi-agent AI pipelines, RAG, ETL, or anything where you need to orchestrate asyncronous (and synchronous) calls.

What does pivex do?

Makes it easier to

1. Create complicated pipelines in code
1. Handle coordination of asynchronous calls
1. Use steps across pipelines
1. Debug complicated pipelines
    - NOTE: Use 'inline breakpoints' for lambdas

Provides:

1. Declarative syntax for creating pipelines
1. Native async interface that also supports synchronous actions
1. Some type safety for pipelines and steps when using TypeScript

Examples

The syntax for creating pipelines leverages your IDE to help you understand how your pipeline works.

All pipelines expose an asynchronous start function but pipelines handle both synchronous and asynchronous steps correctly.

See the Important considerations section for more information.

Example 1 (sequential asynchronous flow)

const testPipeline = pipeline<number, number>()
    .sequential(
        run((x: number) => x + 1),
        run((x: number) => x + 2)
    )
const result = await testPipeline.start(1)
expect(result).eq(4)
flowchart TB
    start(("Input=2"))
        subgraph Step 1
            add1["AsyncAction<hr>Multiply by 3"]
            add2["AsyncAction<hr>Multiply by 5"]
    end
    stop(("Output=30"))
    start-->add1-->add2-->stop

Example 2 (parallel asynchronous flow)

const testPipeline = pipeline<number, number[]>()
    .parallel(
        defer((x: number) => Promise.resolve(x * 3)),
        defer((x: number) => Promise.resolve(x * 5))
    )
const result = await testPipeline.start(2)
expect(result[0]).eq(6)
expect(result[1]).eq(10)
flowchart TB
    start(("Input=2"))
    subgraph step1["Step 1 (Actions in Parallel)"]
        mult1["AsyncAction<hr>Multiply by 3"]
        mult2["AsyncAction<hr>Multiply by 5"]
    end
    step1Output["--pivex coalesces output--"]
    stop(("Output=[6, 10]"))
    start-->step1
    mult1-->step1Output
    mult2-->step1Output
    step1Output-->stop
    style step1Output fill:#FFFFFF

Example 3 (parallel and sequential flow)

const testPipeline = pipeline<number, number>()
    .sequential(
        parallel(
            defer((x: number) => Promise.resolve(x * 2)),
            defer((x: number) => Promise.resolve(x * 3))
        ),
        sequential(
            run((nums: number[]) => nums.reduce((p, c) => p + c)),
            run((x: number) => x / 2)
        )
    )
const result = await testPipeline.start(2)
expect(result).eq(5)
flowchart TB
    start(("Input=2"))
    subgraph step1["Step 1 (Actions in Parallel)"]
        mult1["AsyncAction<hr>Multiply by 3"]
        mult2["AsyncAction<hr>Multiply by 5"]
    end
    subgraph step2["Step 2 (Sequential actions)"]
        sum["Sum the numbers"]
        div["Divide the sum by 2"]
    end
    step1Output["--pivex coalesces output--"]
    stop(("Output=5"))
    start-->step1
    mult1-->step1Output
    mult2-->step1Output
    step1Output-->sum
    sum-->div
    div-->stop
    style step1Output fill:#FFFFFF

Naming and logging

You can provide names and enable logging for any step in the pipeline

       const testPipeline = pipeline<number, number>()
            .sequential(
                name('Step 1')
                    .log(true)
                    .run((x: number) => x + 1),
                name('Step 2')
                    .log(true)
                    .run((x: number) => x + 2)
            )
        const result = await testPipeline.start(1)
        expect(result).eq(4)

Design overview

Definitions

pipeline

  • Takes initial input
  • Contains steps
  • Will execute child steps in sequential order
  • Output from the first step is passed to the second, whose output is passed to the third, etc.
  • Returns output from the last step

step

  • Contains a single action or an array of child steps
  • An action can be either synchronous or asynchronous. Either way the step will not complete until the action has completed
  • Child steps can be executed in parallel or sequentially (in order)
  • If parallel, the step starts all its children and awaits all before coalescing the results and returning an array of the results
  • If sequential, executes all child steps in order, awaiting the results from each child step before passing results from previous step to the next step. The parent step returns the value returned from the last step.

action

  • Contains a single function
  • Can be synchronous or asynchronous

Important considerations

WARNING: pivex does not check for infinite loops. We recommmend you use the constructor-builder syntax (see code examples above) to ensure no infinite loops are created.

  • The pipeline start function is natively async but handles tasks with synchronous or asynchronous actions. If you have pipelines that are entirely synchronous this imposes a very small performance penalty (~65ms for 1M steps)

TODO

  1. Improve typings between steps
  2. Update typings for step.sync so they don't allow passing asynchronous calls
  3. Add a stop function to cancel the pipeline

Package Sidebar

Install

npm i @neuralcontext/pivex

Weekly Downloads

2

Version

0.3.0

License

MIT

Unpacked Size

15.8 kB

Total Files

14

Last publish

Collaborators

  • neuralcontext