orka

2.0.0 • Public • Published

orka

The module provides configuration based task management.

The module functionality is different from other modules that provide similar API, such as async or orchestrator. The main difference is how a task is executed in relation to the tasks it depends on. For example, if task A depends on tasks C and D, it will be executed only when tasks C and D are complete while orka may let A run till it really needs results from C or D. If it happens due to logic that task A does not need data from D or C it would run without waiting for the related tasks (mapped dependency).

The communication between tasks is done via cachable callbacks based on publisher/subscriber pattern. The task that needs data from other tasks, subscribes to the specific topics by declaring them in its task 'in' dependencies. The output (task 'out' section) is always single result that can be published to multiple topics. If one wants to split the output into multiple publish topics, one can use separate tasks to subscribe and process/format them into a separate topic.

Installation

npm install orka --save

Configuration

The configuration is called 'execution plan' and is based on one level of dependency tree, meaning each task specifies only direct dependencies.

Task

The task is simple unit of execution.

Propeties:

  • '@in' specifies topics that the task may listen to.
    • in case 'in' is missing, it is considered a leaf task without any dependencies on other topics.
  • '@out' specifies the topics it will publish results to.
    • in case 'out' is missing, it will publish under its name.
  • '@after' specifies the topics the task will wait for before starting the execution.

API

The task is expected to have the following execution API:

function taskA(input, output) {
    // waiting for data if needed
    input.get('input', function(err, result, complete) {
        // handle input data
        // publish result to output names
        if (err)
            output.set('error', new Error('Error'));
        else if (result)
            output.set('result', 'some result');
        else
            output.set('empty', 'default result');
    });
}

Behaviors:

  • task is started immediately unless '@after' is specified.
  • can publish multiple times to the topics.
  • different tasks can publish to the same topic.
  • can receive multiple events from topics subscribed.
  • task never directly depends on the other, only via TOPIC

Execution plan

There are a few use cases:

  • Task A does not depend on task B and both will start and execute in parallel

  • Task A directly depends on task B and will first execute task B and wait for completion before starting. This is useful for example to implement caching where one does not want to start expensive call before cache is found to have no data.

  • Task A does not directly dependds on task B, but will use data published by task B, both will start and if A needs data published by B it will go into callback waiting at some point.

  • One task

// A will publish results under topic 'A'
var plan = ['A'];
// or A will publish results under topic 'topicA'
var plan = {
    A: {
        @out: 'topicA'
    }
}
// or multiple topics
var plan = {
    A: {
        @out: ['topicA', 'topicA+']
    }
}
// decoupling by mapping output to specific topics
var plan = {
    A: {
        @out: {
            result: 'topicA',
            error: 'topicA-error'
        }
    }
}
  • Two parallel tasks
// these will execute in parallel and publish results under topics 'A' and 'B'
var plan = ['A', 'B'];
// or with empty attributes
var plan = {
    A: {},
    B: {}
};
// or when A and B publishes to specific topics
var plan = {
    A: {
        @out: 'topicA'
    },
    B: {
        @out: ['topicB', 'topicB+']
    }
};
  • Group of tasks, where one indirectly depends on other task via publishing to specific topics
// B uses results from A
var plan = {
    B: 'A'
};
// or
var plan = {
    B: {
        @in: 'A'
    }
};
// or decoupling by mapping B to topic published by A
var plan = {
    B: {
        @in: 'topicA'
    }
    A: {
        @out: 'topicA'
    }
};
// or more decoupling by mapping input of B to topic of published by A
var plan = {
    B: {
        @in: {
            inputB: 'topicA'
        }
    }
    A: {
        @out: 'topicA'
    }
};
  • Multiple dependencies
// or C depends on A and B, i.e. uses data published under 'A' and 'B'
var plan = {
    C: ['A', 'B']
};
// or
var plan = {
    C: {
        @in: ['A', 'B']
    }
};
// or completely decoupling by mapping inputs to tasks topics
var plan = {
    C: {
        @in: {
            inputA: 'A',
            inputB: 'B'
        }
    }
};
// or even more decoupling by mapping inputs to defined topics
var plan = {
    C: {
        @in: {
            inputA: 'topicA',
            inputB: 'topicB'
        }
    },
    A: {
        @out: 'topicA'
    },
    B: {
        @out: 'topicB'
    }
};
  • Mixed of parallel and series tasks
var plan = {
    B: ['A', 'C'],
    C: ['A']
};

Conditional execution with '@after'

Unless explicitly specified, the task would execute immediately. To delay task execution one can use 'after' section that would listen to the specific topics to complete, before the task can be executed.

var plan = {
    A: {
        @after: ['D', 'B']
    }
};

Usa cases

Series execution

This will execute tasks sequentially.

// A then B then C
var plan = {
    A: {}
    B: { @after: 'A' }
    C: { @after: 'B' }
}

Caching example

Task without caching

The task A uses result from task B.

var plan = {
    A: {
        @in: 'TOPIC'
    },
    B: {
        @out: 'TOPIC'
    }
};

Caching example

var plan = {
    A: {
        @in: 'TOPIC'
    },
    C: {
        @out: {
            result: 'TOPIC',
            error: 'CACHE_ERROR'
        }
    },
    B: {
        @after: 'CACHE_ERROR',
        @out: 'TOPIC'
    }
};

Task loading

Given the caching example

A: {
    @in: 'TOPIC'
},
C: {
    @out: {
        result: 'TOPIC',
        error: 'CACHE_ERROR'
    }
},
B: {
    @after: 'CACHE_ERROR',
    @out: 'TOPIC'
}

The tasks can be loaded via options.load function or require by default:

var orka = require('orka');
orka.start(executionPlan, {
    load: function (taskName) {
        return someRegistry[taskName];
    }
}, function (err, output) {
    output.get('A', function (err, result) {
        console.log('Task A err:', err);
        console.log('Task A result:', result);
    });
    output.get('B', function (err, result) {
        console.log('Task B err:', err);
        console.log('Task B result:', result);
    });
    output.get('C', function (err, result) {
        console.log('Task C err:', err);
        console.log('Task C result:', result);
    });
    output.get('TOPIC', function (err, result) {
        console.log('TOPIC err:', err);
        console.log('TOPIC result:', result);
    });
});

Examples

Single task execution

var orka = require('orka');
var executionPlan = {
    A: []
};
orka.start(executionPlan, {
    load: function (taskName) {
        return someRegistry[taskName];
    }
}, function (err, results) {
    results.get('A', function (err, result) {
        console.log('Task A err:', err);
        console.log('Task A result:', result);
    });
});

Two independent tasks

var orka = require('orka');
var executionPlan = ['A', 'B'];
orka.start(executionPlan, {
    load: function (taskName) {
        return someRegistry[taskName];
    }
}, function (err, results) {
    results.get('A', function (err, result) {
        console.log('Task A err:', err);
        console.log('Task A result:', result);
    });
    results.get('B', function (err, result) {
        console.log('Task B err:', err);
        console.log('Task B result:', result);
    });
});

Two dependent tasks

var orka = require('orka');
var executionPlan = {
    A: {
        @in: {
            data: 'B'
        }
    }
};
orka.start(executionPlan, {
    load: function (taskName) {
        return someRegistry[taskName];
    }
}, function (err, results) {
    results.get('A', function (err, result) {
        console.log('Task A err:', err);
        console.log('Task A result:', result);
    });
    results.get('B', function (err, result) {
        console.log('Task B err:', err);
        console.log('Task B result:', result);
    });
});
// example of task A that depends on B
function taskA(input, callback) {
    input.get('data', function (err, data) {
        // do some data process for task B results
        // complete task A
        callback(err, {
            // some results from task A and B
        });
    });
}

You can also cancel tasks in case they take too much time. The tasks that have already been completed will contain results.

var orka = require('orka');
var executionPlan = {
    A: ['B']
};
var control = orka.start(executionPlan, {
    load: function (taskName) {
        return someRegistry[taskName];
    }
}, function (err, results) {
    results.get('A', function (err, result) {
        console.log('Task A err:', err);
        console.log('Task A result:', result);
    });
    results.get('B', function (err, result) {
        console.log('Task B err:', err);
        console.log('Task B result:', result);
    });
});

control.stop(new Error('The tasks has been canceled'));

You can cancel tasks gracefully, with default result.

var orka = require('orka');
var executionPlan = {
    A: ['B']
};
var control = orka.start(executionPlan, {
    load: function (taskName) {
        return someRegistry[taskName];
    }
}, function (err, results) {
    results.get('A', function (err, result) {
        console.log('Task A err:', err);
        console.log('Task A result:', result);
    });
    results.get('B', function (err, result) {
        console.log('Task B err:', err);
        console.log('Task B result:', result);
    });
});

control.stop({
    tasks: {
        A: {
            result: (default result for A)
        },
        B: {
            result: (default result for B)
        }
    }
});

From the above API you can see that you can cancel specific tasks while the rest will still run.

control.stop({
    tasks: {
        A: {
            err: new Error('Only task A has been stopped')
        }
    }
});

To do:

  • if input and output are singles, allow mapping any input and output
var plan = {
    A: {
        "@in": "T1",
        "@out": "T2"
    }
}
  • Validate input/output declared parameters vs what is passed to the task

Maintainers

License

Apache License v2.0

Package Sidebar

Install

npm i orka

Weekly Downloads

0

Version

2.0.0

License

Apache License v2.0

Last publish

Collaborators

  • dimichgh