Collation
Suspends the main workflow and collects all responses, only awakening upon receiving all expected results from the promise.
Note how Promise.all
is used to invoke the greet
activity to ensure all activities run in parallel. A sleep activity is also included to demonstrate that anything can be collated, even sleep.
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
const { greet } = workflow.proxyActivities<typeof activities>({
activities,
});
export async function example(): Promise<[string, string, string, number]> {
return await Promise.all([
greet('1'),
greet('2'),
greet('3'),
workflow.sleepFor('5 seconds'),
]);
}
View Example on GitHub
Composition
Uses child workflows to create modular and composable process structures, enhancing reusability.
Here is the PARENT Workflow.
// parent/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
import type * as activityTypes from './activities';
const { parentActivity } = workflow.proxyActivities<
typeof activityTypes
>({
activities,
});
export async function parentExample(
name: string,
signalIn: boolean,
): Promise<Record<string, string>> {
const activityOutput = await parentActivity(name);
//tests signal suppression within collated sets
const [childWorkflowOutput] = await Promise.all([
workflow.execChild<string>({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
signalIn,
}),
workflow.execChild<string>({
args: [`${name} to CHILD 2`],
taskQueue: 'child-world',
workflowName: 'childExample',
signalIn,
}),
]);
return { activityOutput, childWorkflowOutput };
}
Here is the CHILD.
// child/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
const { childActivity } = workflow.proxyActivities<typeof activities>({
activities,
});
export async function childExample(name: string): Promise<string> {
return await childActivity(name);
}
View Example on GitHub
Error (Unknown)
Handles unknown errors gracefully, ensuring workflows can log or recover from unexpected issues.
This example purposefully throws an error to simulate an unforeseen issue but recovers after the 'nth' attempt.
// src/workflows.ts
const state = {
count: 0,
};
async function example(count = 2): Promise<number> {
if (state.count++ < count) {
throw new Error('recurring-test-error');
}
return count;
}
export { example, state };
View Example on GitHub
Error (Fatal)
Shows how to throw a fatal error in a proxied activity, catch the error in the workflow and rethrow (as it's fatal).
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
const { myFatalActivity } = workflow.proxyActivities<
typeof activities
>({ activities });
async function example({ name }: Record<'name', string>): Promise<void> {
try {
return await myFatalActivity(name);
} catch (error) {
//this error is thrown to reveal the error / stack trace feature
// when activity execution fails on a remote host
console.error('rethrowing error >', error);
throw error;
}
}
export default { example };
View Example on GitHub
Everything
This example combines multiple Temporal workflow patterns to test suspension, collation, and replay principles:
- proxyActivity: suspend and await an idempotent activity
- startChild: suspend and await confirmation
- executeChild: suspend and await the execution result
- Promise.all: suspend and await all before reawakening
- signal/waitForSignal: suspend and await outside signal/s
- sleep: suspend and awaken after a duration
- random: return a deterministic random number
// workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
//...
const { greet } = workflow.proxyActivities<ActivitiesType>({
activities,
});
export async function example(name: string): Promise<responseType> {
//deterministic random number
const random1 = workflow.random();
//suspend and await proxyActivities result
const proxyGreeting = await greet(name);
//suspend and await proxyActivities result
await greet(`${name}2`)
//deterministic random number
const random2 = workflow.random();
//suspend and await workflow.sleepFor completion
await workflow.sleepFor('2 seconds');
//suspend and await workflow.execChild completion
await workflow.execChild<void>({
workflowName: 'childExample',
args: [name],
taskQueue: 'everything-world',
});
//...
};
export async function childExample(name: string): Promise<void> {
//test the 'void' return type
return;
}
View Example on GitHub
Transactional Hook
Executes a subprocess ('hook function') on a already-running workflow. Hook functions have access the same core data record in the backend datastore as the main workflow function. But they are thread safe and can run in parallel. They are functionally equivalent to the main workflow but do not return a value.
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
const { greet, bye } = workflow.proxyActivities<typeof activities>({
activities,
});
/**
* This is the main workflow function that will be executed. The workflow
* will end and self-clean once the final statement executes. While it
* is active other hook functions may also run in parallel. Once it ends
* no hook functions may run.
*
* @param {string} name
* @returns {Promise<string>}
*/
export async function example(name: string): Promise<string> {
//create a search session and add some job data (this is NOT the same as job state)
const search = await workflow.search();
await search.set('custom1', 'meshflow');
await search.set('custom2', '55');
//note: `exampleHook` function will change to 'jackson'
await search.set('jimbo', 'jones');
await search.incr('counter', 10);
await search.incr('counter', 1);
await search.mult('multer', 12);
await search.mult('multer', 10);
//start a child workflow and wait for the result
await workflow.execChild<string>({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
});
//start a child workflow and only confirm it started (don't wait for result)
await workflow.startChild({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
});
//call a few activities in parallel (proxyActivities)
const [hello, goodbye] = await Promise.all([greet(name), bye(name)]);
//wait for the `abcdefg` signal ('exampleHook' will send it)
await workflow.waitFor('abcdefg');
//sleep for 5
await workflow.sleepFor('5 seconds');
//return the result (the job state)
return `${hello} - ${goodbye}`;
}
/**
* This is a hook function that can be used to update the shared workflow state
* The function will run in a separate thread and has no blocking effect
* on the main thread outside of the signal command that is being used in
* this test. This function is called by the test runner a few seconds
* after it starts the main workflow.
* @param {string} name
*/
export async function exampleHook(name: string): Promise<void> {
//update shared job state (the workflow HASH)
const search = await workflow.search();
await search.incr('counter', 100);
await search.set('jimbo', 'jackson');
//Promise.all: call in parallel; use a sleepFor
// and compare to an activity that uses a standard
const [greeting, _timeInSeconds] = await Promise.all([
bye(name, 1_000),
workflow.sleepFor('1 second'),
]);
//start a child workflow and wait for the result
await workflow.execChild<string>({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
});
//start a child workflow and only confirm it started (don't wait for result)
await workflow.startChild({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
});
//test out sleeping
await workflow.sleepFor('2 seconds');
//awake the parent/main thread by sending the 'abcdefg' signal
await workflow.signal('abcdefg', { data: greeting });
}
View Example on GitHub
Idempotency
Ensures unique workflow IDs to prevent accidental duplication in execution. Useful in scenarios where workflows must be isolated by ID.
// workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
//NOTE: when `./activities` exports a `default` function,
// it is imported as shown here (using the type)
import type greetFunctionType from './activities';
type ActivitiesType = {
greet: typeof greetFunctionType;
};
const { greet } = workflow.workflow.proxyActivities<ActivitiesType>({
activities,
});
export async function example(name: string): Promise<string> {
const greet1 = await greet(name);
await workflow.workflow.execChild<string>({
args: ['Howdy!'],
taskQueue: 'idempotency-world',
workflowName: 'childExample',
workflowId: 'idempotency-child', //the parent is already named this,
expire: 10_000,
});
//this line will never be reached (workflow id collision)
return greet1;
}
export async function childExample(name: string): Promise<string> {
return await greet(name);
}
const STATE = {
count: 0,
};
export async function fixableExample(badCount: number): Promise<string> {
//add unique suffix to workflowId after `badCount` failures
const uniqueSuffix = STATE.count++ > badCount ? '-fixed' : '';
const childOutput = await workflow.workflow.execChild<string>({
args: ['FIXED'],
taskQueue: 'idempotency-world',
workflowName: 'childExample',
workflowId: `fixable-idempotency-child${uniqueSuffix}`,
});
return childOutput;
}
View Example on GitHub
Interrupt
Interrupts workflow execution under specific conditions, useful for adaptive workflows that react to real-time events. The example shows how to interrupt a workflow from another workflow.NOTE: If no target ID is provided, this method also works for targeting the current (self) workflow.
// parent/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
export async function parentExample(
name: string,
): Promise<Record<string, string>> {
const workflowId1 = 'jimbo1';
const workflowId2 = 'jimbo2';
//wait for this workflow to complete (execChild)
const childWorkflowOutput1 = await workflow.execChild<string>({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
workflowId: workflowId1,
});
//just start this workflow (startChild)
const childWorkflowOutput2 = await workflow.startChild({
args: [`${name} to CHILD`],
taskQueue: 'child-world',
workflowName: 'childExample',
workflowId: workflowId2,
expire: 600, //don't expire immediately once complete
});
//interrupt the second workflow using its ID
(await workflow.interrupt(workflowId2, {
throw: false,
expire: 600,
})) as string;
//return the results; the parent will test various aspects of the child workflows
return {
childWorkflowOutput: childWorkflowOutput1,
cancelledWorkflowId: childWorkflowOutput2,
};
}
View Example on GitHub
Random
Returns a deterministic random number, ensuring proper replay when designing workflows. Use instead of Math.random(). Does not suspend workflow execution, but does move the replay ledger forward.
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
//NOTE: when `./activities` exports a `default` function,
// it is imported as shown here (using the type)
import type greetFunctionType from './activities';
type ActivitiesType = {
greet: typeof greetFunctionType;
};
const { greet } = workflow.proxyActivities<ActivitiesType>({
activities,
});
export async function example(name: string): Promise<string> {
//generate a random number (implicit seed is 1)
const random1 = workflow.random();
//perform an idempotent activity
const proxyGreeting = await greet(name);
//generate a second random number (implicit seed is 3)
const random2 = workflow.random();
return `${random1} ${proxyGreeting} ${random2}`;
}
View Example on GitHub
Retry
Automatically retries workflows or tasks based on predefined policies, improving fault tolerance. Note the configuration for the activity retry policy in the workflow.
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
const { count } = workflow.proxyActivities<typeof activities>({
activities,
//retry policy for the proxied activity
retryPolicy: {
maximumAttempts: 2, //the succesful test retries twice.
maximumInterval: '1s', //keep short for testing
backoffCoefficient: 1, //keep short for testing
},
});
//workflow function
async function example({ amount }): Promise<number> {
return await count(amount);
}
export default { example };
The proxied activity is designed to fail 'n' times and then succeed (to simulate a flaky service). The source for the activity is here.
// src/activities.ts
import { state } from './state';
export async function count(limit: number): Promise<number> {
state.counter = state.counter + 1;
if (state.counter < limit) {
throw new Error('retry');
} else {
return state.counter;
}
}
View Example on GitHub
Search
Enables querying workflow records based on various criteria, providing insight into historical data. Aggregation is also supported when the backend supports it. For Redis implementations, the FT.Search module provides indexing and aggregation, while Postgres uses SQL.
// workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
const { greet, bye } = workflow.proxyActivities<typeof activities>({
activities,
});
export async function example(name: string): Promise<string> {
//set values (they're added to the workflow HASH AND are indexed)
//(`custom1` and `custom2` were added to the 'bye-bye' index)
const search = await workflow.search();
await search.set('custom1', 'meshflow');
await search.set('custom2', '55');
//'jimbo' is not indexed (but it can be retrieved)
await search.set('jimbo', 'jones');
const [hello, goodbye] = await Promise.all([greet(name), bye(name)]);
//all data is available to the workflow
await search.get('jimbo');
await search.incr('counter', 10); //increment
await search.get('jimbo');
await search.mget('jimbo');
await search.incr('counter', 1);
await search.get('counter');
await search.mult('multer', 12); //multiply
//val4 is 120.00000000009 (rounding error due to logarithmic math)
await search.mult('multer', 10);
await workflow.waitFor<{ data: string }>('abcdefg');
return `${hello} - ${goodbye}`;
}
/**
* This is an example of a hook that can be called from another workflow
* or via an external call to the HotMesh API. When a hook
* is invoked, it does not spawn a new workflow; rather
* it runs in the context of an existing, target workflow.
*
* This example, udpates shared job data (counter)
*/
export async function exampleHook(name: string): Promise<void> {
const search = await workflow.search();
await search.incr('counter', 100);
await workflow.sleepFor('1 second');
workflow.signal('abcdefg', { data: 'hello' });
}
View Example on GitHub
Signal
Allows workflows to respond dynamically to external events through signal handling.Note how the workflow waits for signals 'abcdefg' and 'hijklmnop' before returning the final greeting.
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
import type greetFunctionType from './activities';
type ActivitiesType = {
greet: typeof greetFunctionType;
};
const { greet } = workflow.proxyActivities<ActivitiesType>({
activities,
});
export async function example(
name: string,
): Promise<[string, Record<any, any>, Record<any, any>, string]> {
const strangerGreeting = await greet('stranger');
const [signal1, signal2] = await Promise.all([
workflow.waitFor<Record<any, any>>('abcdefg'),
workflow.waitFor<Record<any, any>>('hijklmnop'),
]);
return [strangerGreeting, signal1, signal2, await greet(name)];
}
The signals are emitted as part of the unit test to verify that workflows can pause and await external data before continuing.
// index.test.ts
//...
describe('WorkflowHandle', () => {
describe('result', () => {
it('should return the workflow execution result', async () => {
//signal using the original client handle
await sleepFor(3_000);
await handle.signal('abcdefg', { name: 'WarmMash' });
//signal by instancing a new client connection
await sleepFor(1_000);
const client = new Client({ connection: { class: PostgresClient, options } });
await client.workflow.signal('hijklmnop', { name: 'WarnCrash' });
const result = await handle.result();
expect(result).toEqual([
'Hello, stranger!',
{ name: 'WarmMash' },
{ name: 'WarnCrash' },
'Hello, ColdMush!',
]);
}, 15_000);
});
});
//...
View Example on GitHub
Sleep / Interrupt
Pauses workflow execution for a specified duration, enabling timed or delayed processing. Also shows interruption from an outside caller.
// src/workflows.ts
import { workflow } from '@hotmeshio/hotmesh';
import * as activities from './activities';
import type greetFunctionType from './activities';
type ActivitiesType = {
greet: typeof greetFunctionType;
};
const { greet } = workflow.proxyActivities<ActivitiesType>({
activities,
});
//test workflow that runs and suspends 3 times to test replay, sleep, etc
export async function example(name: string): Promise<string> {
//run a proxy activity (runs once)
const yo = await greet(name);
//sleep for 1 second (runs once)
await workflow.sleepFor('1 seconds');
//sleep for 2 seconds more to test replay integrity (runs once)
await workflow.sleepFor('2 seconds');
return yo; //this line is only reached on the 3rd execution cycle and is a `cached replay value`
}
View Example on GitHub