Temporal Patterns

Explore the Full Repository on GitHub

Pattern examples are organized as unit tests and Jest is used as the test runner. The main unit test for each suite is named index.test.ts. Each test includes some boilerplate beforeAll/afterAll code designed to reset the database; tests are otherwise focused on demonstrating setup and execution.

For example, the Composition unit test suite shows how to connect multiple workers (parent and child) when designing compositional workflows.

// patterns/composition/index.test.ts

//...

describe('Worker', () => {
  describe('create', () => {
    it('should create and run the PARENT workflow worker', async () => {
      const worker = await Worker.create({
        connection: { class: PostgresClient, options },
        taskQueue: 'parent-world',
        workflow: parentWorkflows.parentExample,
      });

      await worker.run();
      expect(worker).toBeDefined();
    });

    it('should create and run the CHILD workflow worker', async () => {
    const worker = await Worker.create({
        connection: { class: PostgresClient, options },
        taskQueue: 'child-world',
        workflow: childWorkflows.childExample,
      });

      await worker.run();
      expect(worker).toBeDefined();
    });
  });
});

//...

Pattern Example Test Suites

Collation

Suspends the main workflow and collects all responses, only awakening upon receiving all expected results from the promise.

Note how Promise.all is used to invoke the greet activity to ensure all activities run in parallel. A sleep activity is also included to demonstrate that anything can be collated, even sleep.

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';

const { greet } = workflow.proxyActivities<typeof activities>({
  activities,
});

export async function example(): Promise<[string, string, string, number]> {
  return await Promise.all([
  greet('1'),
  greet('2'),
  greet('3'),
  workflow.sleepFor('5 seconds'),
 ]);
}
View Example on GitHub

Composition

Uses child workflows to create modular and composable process structures, enhancing reusability.

Here is the PARENT Workflow.

// parent/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';
import type * as activityTypes from './activities';

const { parentActivity } = workflow.proxyActivities<
  typeof activityTypes
>({
  activities,
});

export async function parentExample(
  name: string,
  signalIn: boolean,
): Promise<Record<string, string>> {
  const activityOutput = await parentActivity(name);
  //tests signal suppression within collated sets
  const [childWorkflowOutput] = await Promise.all([
    workflow.execChild<string>({
      args: [`${name} to CHILD`],
      taskQueue: 'child-world',
      workflowName: 'childExample',
      signalIn,
    }),
    workflow.execChild<string>({
      args: [`${name} to CHILD 2`],
      taskQueue: 'child-world',
      workflowName: 'childExample',
      signalIn,
    }),
  ]);
  return { activityOutput, childWorkflowOutput };
}

Here is the CHILD.

// child/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';

const { childActivity } = workflow.proxyActivities<typeof activities>({
  activities,
});

export async function childExample(name: string): Promise<string> {
    return await childActivity(name);
}
View Example on GitHub

Error (Unknown)

Handles unknown errors gracefully, ensuring workflows can log or recover from unexpected issues.

This example purposefully throws an error to simulate an unforeseen issue but recovers after the 'nth' attempt.

// src/workflows.ts
const state = {
  count: 0,
};
  
async function example(count = 2): Promise<number> {
  if (state.count++ < count) {
    throw new Error('recurring-test-error');
  }
  return count;
}
    
export { example, state };
View Example on GitHub

Error (Fatal)

Shows how to throw a fatal error in a proxied activity, catch the error in the workflow and rethrow (as it's fatal).

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';

const { myFatalActivity } = workflow.proxyActivities<
    typeof activities
>({ activities });

async function example({ name }: Record<'name', string>): Promise<void> {
  try {
    return await myFatalActivity(name);
  } catch (error) {
    //this error is thrown to reveal the error / stack trace feature
    // when activity execution fails on a remote host
    console.error('rethrowing error >', error);
    throw error;
  }
}

export default { example };
View Example on GitHub

Everything

This example combines multiple Temporal workflow patterns to test suspension, collation, and replay principles:

  • proxyActivity: suspend and await an idempotent activity
  • startChild: suspend and await confirmation
  • executeChild: suspend and await the execution result
  • Promise.all: suspend and await all before reawakening
  • signal/waitForSignal: suspend and await outside signal/s
  • sleep: suspend and awaken after a duration
  • random: return a deterministic random number
// workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';

//...

const { greet } = workflow.proxyActivities<ActivitiesType>({
    activities,
});
                
export async function example(name: string): Promise<responseType> {
  //deterministic random number
  const random1 = workflow.random();

  //suspend and await proxyActivities result
  const proxyGreeting = await greet(name);

  //suspend and await proxyActivities result
  await greet(`${name}2`)

  //deterministic random number
  const random2 = workflow.random();

  //suspend and await workflow.sleepFor completion
  await workflow.sleepFor('2 seconds');

  //suspend and await workflow.execChild completion
  await workflow.execChild<void>({
    workflowName: 'childExample',
    args: [name],
    taskQueue: 'everything-world',
  });

  //...               
};

export async function childExample(name: string): Promise<void> {
  //test the 'void' return type
  return;
}
View Example on GitHub

Transactional Hook

Executes a subprocess ('hook function') on a already-running workflow. Hook functions have access the same core data record in the backend datastore as the main workflow function. But they are thread safe and can run in parallel. They are functionally equivalent to the main workflow but do not return a value.

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';

const { greet, bye } = workflow.proxyActivities<typeof activities>({
    activities,
});

/**
 * This is the main workflow function that will be executed. The workflow
 * will end and self-clean once the final statement executes. While it
 * is active other hook functions may also run in parallel. Once it ends
 * no hook functions may run.
 *
 * @param {string} name
 * @returns {Promise<string>}
 */
export async function example(name: string): Promise<string> {
  //create a search session and add some job data (this is NOT the same as job state)
  const search = await workflow.search();
  await search.set('custom1', 'meshflow');
  await search.set('custom2', '55');
  //note: `exampleHook` function will change to 'jackson'
  await search.set('jimbo', 'jones');
  await search.incr('counter', 10);
  await search.incr('counter', 1);
  await search.mult('multer', 12);
  await search.mult('multer', 10);

  //start a child workflow and wait for the result
  await workflow.execChild<string>({
  args: [`${name} to CHILD`],
  taskQueue: 'child-world',
  workflowName: 'childExample',
  });

  //start a child workflow and only confirm it started (don't wait for result)
  await workflow.startChild({
  args: [`${name} to CHILD`],
  taskQueue: 'child-world',
  workflowName: 'childExample',
  });

  //call a few activities in parallel (proxyActivities)
  const [hello, goodbye] = await Promise.all([greet(name), bye(name)]);

  //wait for the `abcdefg` signal ('exampleHook' will send it)
  await workflow.waitFor('abcdefg');

  //sleep for 5
  await workflow.sleepFor('5 seconds');

  //return the result (the job state)
  return `${hello} - ${goodbye}`;
}

/**
 * This is a hook function that can be used to update the shared workflow state
 * The function will run in a separate thread and has no blocking effect
 * on the main thread outside of the signal command that is being used in
 * this test. This function is called by the test runner a few seconds
 * after it starts the main workflow.
 * @param {string} name
 */
export async function exampleHook(name: string): Promise<void> {
  //update shared job state (the workflow HASH)
  const search = await workflow.search();
  await search.incr('counter', 100);
  await search.set('jimbo', 'jackson');

  //Promise.all: call in parallel; use a sleepFor
  // and compare to an activity that uses a standard
  const [greeting, _timeInSeconds] = await Promise.all([
    bye(name, 1_000),
    workflow.sleepFor('1 second'),
  ]);

  //start a child workflow and wait for the result
  await workflow.execChild<string>({
    args: [`${name} to CHILD`],
    taskQueue: 'child-world',
    workflowName: 'childExample',
  });

  //start a child workflow and only confirm it started (don't wait for result)
  await workflow.startChild({
    args: [`${name} to CHILD`],
    taskQueue: 'child-world',
    workflowName: 'childExample',
  });

  //test out sleeping
  await workflow.sleepFor('2 seconds');

  //awake the parent/main thread by sending the 'abcdefg' signal
  await workflow.signal('abcdefg', { data: greeting });
}
View Example on GitHub

Idempotency

Ensures unique workflow IDs to prevent accidental duplication in execution. Useful in scenarios where workflows must be isolated by ID.

// workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';

//NOTE: when `./activities` exports a `default` function,
//      it is imported as shown here (using the type)
import type greetFunctionType from './activities';
type ActivitiesType = {
  greet: typeof greetFunctionType;
};

const { greet } = workflow.workflow.proxyActivities<ActivitiesType>({
  activities,
});

export async function example(name: string): Promise<string> {
  const greet1 = await greet(name);
  await workflow.workflow.execChild<string>({
    args: ['Howdy!'],
    taskQueue: 'idempotency-world',
    workflowName: 'childExample',
    workflowId: 'idempotency-child', //the parent is already named this,
    expire: 10_000,
  });

  //this line will never be reached (workflow id collision)
  return greet1;
}

export async function childExample(name: string): Promise<string> {
  return await greet(name);
}

const STATE = {
  count: 0,
};

export async function fixableExample(badCount: number): Promise<string> {
  //add unique suffix to workflowId after `badCount` failures
  const uniqueSuffix = STATE.count++ > badCount ? '-fixed' : '';

  const childOutput = await workflow.workflow.execChild<string>({
    args: ['FIXED'],
    taskQueue: 'idempotency-world',
    workflowName: 'childExample',
    workflowId: `fixable-idempotency-child${uniqueSuffix}`,
  });

  return childOutput;
}
View Example on GitHub

Interrupt

Interrupts workflow execution under specific conditions, useful for adaptive workflows that react to real-time events. The example shows how to interrupt a workflow from another workflow.NOTE: If no target ID is provided, this method also works for targeting the current (self) workflow.

// parent/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

export async function parentExample(
  name: string,
): Promise<Record<string, string>> {
  const workflowId1 = 'jimbo1';
  const workflowId2 = 'jimbo2';

  //wait for this workflow to complete (execChild)
  const childWorkflowOutput1 = await workflow.execChild<string>({
    args: [`${name} to CHILD`],
    taskQueue: 'child-world',
    workflowName: 'childExample',
    workflowId: workflowId1,
  });

  //just start this workflow (startChild)
  const childWorkflowOutput2 = await workflow.startChild({
    args: [`${name} to CHILD`],
    taskQueue: 'child-world',
    workflowName: 'childExample',
    workflowId: workflowId2,
    expire: 600, //don't expire immediately once complete
  });

  //interrupt the second workflow using its ID
  (await workflow.interrupt(workflowId2, {
    throw: false,
    expire: 600,
  })) as string;

  //return the results; the parent will test various aspects of the child workflows
  return {
    childWorkflowOutput: childWorkflowOutput1,
    cancelledWorkflowId: childWorkflowOutput2,
  };
}
View Example on GitHub

Random

Returns a deterministic random number, ensuring proper replay when designing workflows. Use instead of Math.random(). Does not suspend workflow execution, but does move the replay ledger forward.

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';

//NOTE: when `./activities` exports a `default` function,
//      it is imported as shown here (using the type)
import type greetFunctionType from './activities';
type ActivitiesType = {
  greet: typeof greetFunctionType;
};

const { greet } = workflow.proxyActivities<ActivitiesType>({
  activities,
});

export async function example(name: string): Promise<string> {
  //generate a random number (implicit seed is 1)
  const random1 = workflow.random();

  //perform an idempotent activity
  const proxyGreeting = await greet(name);

  //generate a second random number (implicit seed is 3)
  const random2 = workflow.random();
  return `${random1} ${proxyGreeting} ${random2}`;
}
View Example on GitHub

Retry

Automatically retries workflows or tasks based on predefined policies, improving fault tolerance. Note the configuration for the activity retry policy in the workflow.

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';

const { count } = workflow.proxyActivities<typeof activities>({
  activities,

  //retry policy for the proxied activity
  retryPolicy: {
    maximumAttempts: 2,    //the succesful test retries twice.
    maximumInterval: '1s', //keep short for testing
    backoffCoefficient: 1, //keep short for testing
  },
});

//workflow function
async function example({ amount }): Promise<number> {
  return await count(amount);
}

export default { example };
                

The proxied activity is designed to fail 'n' times and then succeed (to simulate a flaky service). The source for the activity is here.

// src/activities.ts
import { state } from './state';

export async function count(limit: number): Promise<number> {
  state.counter = state.counter + 1;
  if (state.counter < limit) {
    throw new Error('retry');
  } else {
    return state.counter;
  }
}
View Example on GitHub

Signal

Allows workflows to respond dynamically to external events through signal handling.Note how the workflow waits for signals 'abcdefg' and 'hijklmnop' before returning the final greeting.

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';
import type greetFunctionType from './activities';
type ActivitiesType = {
  greet: typeof greetFunctionType;
};

const { greet } = workflow.proxyActivities<ActivitiesType>({
  activities,
});

export async function example(
  name: string,
): Promise<[string, Record<any, any>, Record<any, any>, string]> {
  const strangerGreeting = await greet('stranger');

  const [signal1, signal2] = await Promise.all([
    workflow.waitFor<Record<any, any>>('abcdefg'),
    workflow.waitFor<Record<any, any>>('hijklmnop'),
  ]);

  return [strangerGreeting, signal1, signal2, await greet(name)];
}

The signals are emitted as part of the unit test to verify that workflows can pause and await external data before continuing.

// index.test.ts

//...

describe('WorkflowHandle', () => {
  describe('result', () => {
    it('should return the workflow execution result', async () => {
      //signal using the original client handle
      await sleepFor(3_000);
      await handle.signal('abcdefg', { name: 'WarmMash' });

      //signal by instancing a new client connection
      await sleepFor(1_000);
      const client = new Client({ connection: { class: PostgresClient, options } });
      await client.workflow.signal('hijklmnop', { name: 'WarnCrash' });

      const result = await handle.result();
      expect(result).toEqual([
        'Hello, stranger!',
        { name: 'WarmMash' },
        { name: 'WarnCrash' },
        'Hello, ColdMush!',
      ]);
    }, 15_000);
  });
});

//...
View Example on GitHub

Sleep / Interrupt

Pauses workflow execution for a specified duration, enabling timed or delayed processing. Also shows interruption from an outside caller.

// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';

import * as activities from './activities';
import type greetFunctionType from './activities';
type ActivitiesType = {
  greet: typeof greetFunctionType;
};

const { greet } = workflow.proxyActivities<ActivitiesType>({
  activities,
});

//test workflow that runs and suspends 3 times to test replay, sleep, etc
export async function example(name: string): Promise<string> {
  //run a proxy activity (runs once)
  const yo = await greet(name);

  //sleep for 1 second (runs once)
  await workflow.sleepFor('1 seconds');

  //sleep for 2 seconds more to test replay integrity (runs once)
  await workflow.sleepFor('2 seconds');

  return yo; //this line is only reached on the 3rd execution cycle and is a `cached replay value`
}
View Example on GitHub