Easily run distributed workflows with AWS Simple Workflow Service and Lambda
A minimum of node 10 is required
yarn add soflow
You can find implementation examples over at Skalar/soflow-examples
tasks.js
async function addOrderToDatabase(data) {
// do your stuff
return result
}
addOrderToDatabase.config = {
concurrency: 100, // ReservedConcurrentExecutions in lambda context
type: 'both', // deploy as lambda function and register activity type
memorySize: 128, // only enforced by lambda
scheduleToStartTimeout: 10, // only applies when run as activity
startToCloseTimeout: 60,
scheduleToCloseTimeout: 20, // only applies to activities
}
exports.addOrderToDatabase = addOrderToDatabase
workflows.js
async function CreateOrder({
input: {confirmationEmailRecipient, products, customerId},
tasks: {addOrderToDatabase, sendOrderConfirmation},
}) {
const orderData = await addOrderToDatabase(customerId, products)
await sendOrderConfirmation(orderData)
return orderData
}
CreateOrder.config = {
startToCloseTimeout: 30,
tasks: {
addOrderToDatabase: {type: 'faas'},
sendOrderConfirmation: {type: 'activity'},
},
}
exports.CreateOrder = CreateOrder
const {SWF} = require('soflow')
const deployPromise = SWF.Orchestration.setup({
progressIndicator: true // default: false
deciderEnvironment: {
// environment variables available in lambda decider worklow functions
MY_CUSTOM_ENVIRONMENT_VARIABLE: 'myvalue',
}
// File glob patterns to include in the lambda package.
// Everything needed by your tasks must be included (including the soflow npm module).
// Default: [`${tasksPath}/**`, `${workflowsPath}/**`]
files: [
[
'stripped_node_modules/**',
// Provided callback can return a new filename or true/false for whether to include the file
path => path.replace('stripped_node_modules', 'node_modules')
],
'workflows/**',
'tasks/**',
'lib/**',
],
})
This worker serves as the workflow decider/conductor.
#!/usr/bin/env node
const {SWF} = require('soflow')
const workflows = require('./workflows')
const tasks = require('./tasks')
const deciderWorker = new SWF.DeciderWorker({
workflows,
tasks,
concurrency: 2,
})
deciderWorker.on('error', error => {
console.error(error)
process.exit(1)
})
deciderWorker.start()
This worker executes scheduled activity tasks.
#!/usr/bin/env node
const {SWF} = require('soflow')
const workflows = require('./workflows')
const tasks = require('./tasks')
const activityWorker = new SWF.ActivityWorker({
workflows,
tasks,
concurrency: 2,
})
activityWorker.on('error', error => {
console.error(error)
process.exit(1)
})
activityWorker.start()
Soflow supports running SWF deciders as Lamda functions.
Due to the nature of Lambda and SWF, the implementation has some important details.
When the lambda decider is enabled, soflow enables a scheduled CloudWatch event rule that triggers the decider lambda function every minute.
The lambda function will run for 65-130 seconds, exiting when there no longer time (60s + 5s slack) to do an empty poll. This is to prevent decision tasks being temporarily "stuck". This means between 1 and 2 deciders are running at any given time, each able to handle multiple decision tasks concurrenctly.Note that it can take up to 1 minute for the first invocation to happen.
await SWF.Orchestration.Lambda.enableDecider()
Disables the CloudWatch event rule. It can take up to 2 minutes for all deciders to be shut down.
await SWF.Orchestration.Lambda.disableDecider()
May be used with enableDecider() to ensure a decider is running immediately, or to temporarily scale up the decider capacity.
await SWF.Orchestration.Lambda.invokeDecider()
await SWF.Orchestration.Lambda.shutdownDeciders()
const {SWF} = require('soflow')
async function startCreateOrderWorkflow() {
// Initiate workflow execution
const execution = await SWF.executeWorkflow({
type: 'CreateOrder',
workflowId: 'CreateOrder-12345',
input: {productIds: [1, 2, 3]},
})
// Optionally await workflow result
const result = await execution.promise
}
const {SWF} = require('soflow')
async function terminationExample() {
await SWF.terminateAllExecutions() // terminate ALL workflow executions within namespace
await SWF.terminateExecution({workflowId: 'myid'})
}
Warning: this removes all AWS resources within the given namespace
const {SWF} = require('soflow')
async function teardownExample() {
await SWF.Orchstration.teardown({
removeBucket: true, // default: false
progressIndicator: true, // default: false
})
}
Soflow provides a limited LocalWorkflow backend, with the same API as the SWF backend.
This can be useful during development or testing, but be aware that it:
- runs all workflow (decider) functions in the current process
- does not enforce worklow timeouts
- only allows workflow signaling within the same process
- runs tasks in separate child processes, on the local node
- only enforces task startToCloseTimeout
- is not able to terminate workflow executions
const {LocalWorkflow} = require('soflow')
async function runWorkflowWithoutSWF() {
const execution = await LocalWorkflow.executeWorkflow({
type: 'CreateOrder'
workflowId: 'order-1234'
input: {}
// ...
})
// Optionally await workflow result
const result = await execution.promise
}
You can provide configuration as
environment variables
, viasoflow.config
orpassed as an argument
to a soflow function.
const {SWF, config} = require('soflow')
config.update({
namespace: 'mynamespace',
swfDomain: 'MyDomain'
})
// above code must be required/invoked before your code that uses soflow.
SWF.executeWorkflow({namespace: 'othernamespace', ...})
Variable name | ENV variable | Description |
---|---|---|
namespace |
SOFLOW_NAMESPACE |
Prefix for all AWS resources (globally unique) default: undefined |
version |
SOFLOW_WORKFLOWS_VERSION |
Developer-specified workflows version to use default: undefined |
swfDomain |
SOFLOW_SWF_DOMAIN |
Under which AWS SWF Domain to operate default: 'SoFlow' |
codeRoot |
SOFLOW_CODE_ROOT |
Path to root directory of code to be deployed default: process.cwd() |
tasksPath |
SOFLOW_TASKS_PATH |
Requireable path to tasks, relative to codeRoot default: 'tasks' |
workflowsPath |
SOFLOW_WORKFLOWS_PATH |
Requireable path to workflows, relative to codeRoot default: 'workflows' |
soflowPath |
SOFLOW_PATH |
Requireable path to soflow, relative to codeRoot default: 'node_modules/soflow' |
s3Bucket |
SOFLOW_S3_BUCKET |
Name of S3 bucket to for lambda packages default: namespace |
s3Prefix |
SOFLOW_S3_PREFIX |
Key prefix for S3 objects default: 'soflow/' |
awsRegion |
SOFLOW_AWS_REGION |
Which AWS region to operate in default: 'eu-west-1' |
executionRetention |
SOFLOW_EXECUTION_RETENTION |
Number of days to keep workflow executions. note: Can only be set the first time an SWF domain is created, after which it is immutable default: 1 |
# Bring up a local dynamodb and s3 as well as linting every time the code changes.
docker-compose up --build
# Or you could use the tmux-session script:
ln -s $PWD/scripts/tmux-session ~/.bin/soflow
soflow # start or resume tmux dev session
# brings up linting, unit and integration tests with file watching
soflow clean # stops/cleans docker containers, tmux session
Requires the development environment to be running
# Unit and integration tests for all node targets
docker-compose exec dev scripts/test
# Unit tests with file watching and verbose output
docker-compose exec dev ash -c \
"scripts/unit-tests --watch --verbose"
# Integration tests with 'local' profile
docker-compose exec dev ash -c \
"PROFILES=local scripts/integration-tests --verbose"