@effect/workflow
TypeScript icon, indicating that this package has built-in type declarations

0.8.1 • Public • Published

@effect/workflow

Build and run durable workflows in TypeScript with Effect.

Example

import { ClusterWorkflowEngine } from "@effect/cluster"
import { NodeClusterRunnerSocket, NodeRuntime } from "@effect/platform-node"
import { PgClient } from "@effect/sql-pg"
import {
  Activity,
  DurableClock,
  DurableDeferred,
  Workflow
} from "@effect/workflow"
import { Effect, Layer, Redacted, Schema } from "effect"

// Define a custom error for the SendEmail activity
class SendEmailError extends Schema.TaggedError<SendEmailError>(
  "SendEmailError"
)("SendEmailError", {
  message: Schema.String
}) {}

// Define a workflow using the `Workflow.make` api.
const EmailWorkflow = Workflow.make({
  // Every workflow needs a unique name
  name: "EmailWorkflow",
  // Add a success schema. You can omit this to use the default value `Schema.Void`
  success: Schema.Void,
  // Add an error schema. You can omit this to use the default value `Schema.Never`
  error: SendEmailError,
  // Define the payload for the workflow
  payload: {
    id: Schema.String,
    to: Schema.String
  },
  // Define the idempotency key for the workflow. This is used to ensure that
  // the workflow is not duplicated if it is retried.
  idempotencyKey: ({ id }) => id
})

// Once you have defined the workflow, you can create a layer for by providing
// the implementation.
const EmailWorkflowLayer = EmailWorkflow.toLayer(
  Effect.fn(function* (payload, executionId) {
    // An `Activity` represents an unit of work in the workflow.
    // They will only ever be executed once, unless you use `Activity.retry`.
    yield* Activity.make({
      name: "SendEmail",
      error: SendEmailError,
      execute: Effect.gen(function* () {
        // You can access the current attempt number of the activity.
        const attempt = yield* Activity.CurrentAttempt

        yield* Effect.annotateLogs(Effect.log(`Sending email`), {
          id: payload.id,
          executionId,
          attempt
        })

        if (attempt !== 5) {
          return yield* new SendEmailError({
            message: `Failed to send email for ${payload.id} on attempt ${attempt}`
          })
        }
      })
    }).pipe(
      Activity.retry({ times: 5 }),
      EmailWorkflow.withCompensation(
        Effect.fn(function* (value, cause) {
          // This is a compensation finalizer that will be executed if the workflow
          // fails.
          //
          // You can use the success `value` of the wrapped effect, as well as the
          // Cause of the workflow failure.
          yield* Effect.log(`Compensating activity SendEmail`)
        })
      )
    )

    // Use the `DurableClock` to sleep for a specified duration.
    // The workflow will pause execution for the specified duration.
    //
    // You can sleep for as long as you want - when the workflow pauses it
    // consumes no resources.
    yield* Effect.log("Sleeping for 10 seconds")
    yield* DurableClock.sleep({
      name: "Some sleep",
      duration: "10 seconds"
    })
    yield* Effect.log("Woke up")

    // You can use `DurableDeferred` to create a signal that can be awaited later.
    const EmailTrigger = DurableDeferred.make("EmailTrigger")

    // You can use the `DurableDeferred.token` api to acquire the token that can
    // later be used with `DurableDeferred.done / succeed / fail`
    const token = yield* DurableDeferred.token(EmailTrigger)

    // You then use the token to send a result to the deferred.
    //
    // This doesn't need to be done inside the workflow, it just needs access to
    // the `WorkflowEngine` service.
    yield* DurableDeferred.succeed(EmailTrigger, {
      token,
      value: void 0
    }).pipe(
      Effect.delay("1 second"), // Simulate some delay before completing the deferred
      Effect.forkDaemon
    )

    // Finally, you can await the deferred to get the result.
    //
    // It will pause the workflow until the deferred is completed.
    yield* DurableDeferred.await(EmailTrigger)
  })
)

// To integrate with @effect/cluster, you can use the
// `ClusterWorkflowEngine.layer` Layer, and provide it with your cluster Runner
// layer.
const WorkflowEngineLayer = ClusterWorkflowEngine.layer.pipe(
  Layer.provideMerge(NodeClusterRunnerSocket.layer({ storage: "sql" })),
  Layer.provideMerge(
    PgClient.layer({
      database: "effect_cluster",
      username: "cluster",
      password: Redacted.make("cluster")
    })
  )
)

const EnvLayer = Layer.mergeAll(
  EmailWorkflowLayer
  // You can add any other cluster entities or workflow layers here
).pipe(Layer.provide(WorkflowEngineLayer))

// Finally, you can execute a workflow using the `.execute` method.
EmailWorkflow.execute({ id: "123", to: "hello@timsmart.co" }).pipe(
  Effect.provide(EnvLayer),
  NodeRuntime.runMain
)

Readme

Keywords

none

Package Sidebar

Install

npm i @effect/workflow

Weekly Downloads

139,134

Version

0.8.1

License

MIT

Unpacked Size

274 kB

Total Files

74

Last publish

Collaborators

  • schickling
  • michael.arnaldi
  • effect-bot