BYODB

Bring your own database and take ownership of your data. Choose Postgres, Redis, NATS, ...

Support Matrix

Store Stream Events Search
TypeScript
Redis Postgres
Redis Postgres NATS
Redis Postgres NATS
Redis Postgres
Python
Redis Postgres
Redis Postgres
Redis Postgres
Redis Postgres
Rust
Redis Postgres
Redis Postgres
Redis Postgres
Redis Postgres
Go
Redis Postgres
Redis Postgres
Redis Postgres
Redis Postgres
Java
Redis Postgres
Redis Postgres
Redis Postgres
Redis Postgres

Any database provider can serve as the backend as long as they adhere to the HotMesh interface contracts (described below).

Getting Started with TypeScript

HotMesh can be deployed as a drop-in Temporal replacement with minimal modification. Just change the connection script to point to your database instead of the Temporal App Server.

Expanded and Concise connection formats are shown below.

Client Initialization

import { Client } from '@hotmeshio/hotmesh';
import Redis from 'ioredis';
import { Client as Postgres } from 'pg';
impot { connect as NATS } from 'nats';
//...other imports

// Option 1a) Concise (connect to a single backend...redis)
const client = new Client({
  connection: {
    class: Redis,
    options: { host: 'redis', port: 6379 }
  }
});

// Option 1b) Concise (connect to a single backend...postgres)
const client = new Client({
  connection: {
    class: Postgres,
    options: { host: 'postgres', port: 5432 }
  }
});

// Option 2) Explicit (connect with different, purpose-driven backends)
const client = new Client({
  connections: {
    store: {
      class: Postgres,
      options: { host: 'postgres', port: 5432 }
    },
    stream: {
      class: Postgres,
      options: { host: 'postgres', port: 5432 }
    },
    sub: {
      class: NATS,
      options: { servers: ['nats:4222'] }
    }
  },
});

Worker Initialization

import { Worker } from '@hotmeshio/hotmesh';
import Redis from 'ioredis';
import { Client as Postgres } from 'pg';
impot { connect as NATS } from 'nats';
import * as workflows from './workflows';

// Option 1) Concise
const worker = await Worker.create({
  connection: {
    class: Redis,
    options: { host: 'redis', port: 6379 },
  },
  taskQueue: 'default',
  workflow: workflows.example,
  options: {
    backoffCoefficient: 2,
    maximumAttempts: 1_000,
    maximumInterval: '5 seconds'
  },
});

// Option 2) Explicit (allows for multiple connections)
const worker = await Worker.create({
  connections: {
    store: {
      class: Postgres,
      options: { host: 'postgres', port: 5432 }
    },
    stream: {
      class: Postgres,
      options: { host: 'postgres', port: 5432 }
    },
    sub: {
      class: NATS,
      options: { servers: ['nats:4222'] }
    }
  },
  taskQueue: 'default',
  workflow: workflows.example,
  options: {
    backoffCoefficient: 2,
    maximumAttempts: 1_000,
    maximumInterval: '5 seconds'
  },
});

Interfaces

The pluggable backend is organized by transport type.

// streams.ts (Redis, Postgres, etc)
import { ILogger } from '../logger';
import {
  PublishMessageConfig,
  StreamConfig,
  StreamMessage,
  StreamStats,
} from '../../types/stream';
import { StringAnyType } from '../../types';
import { KeyStoreParams, KeyType } from '../../modules/key';
import { ProviderClient, ProviderTransaction } from '../../types/provider';

export abstract class StreamService<
  ClientProvider extends ProviderClient,
  TransactionProvider extends ProviderTransaction,
> {
  protected streamClient: ClientProvider;
  protected storeClient: ProviderClient;
  protected namespace: string;
  protected logger: ILogger;
  protected appId: string;
  protected config: StreamConfig;

  constructor(
    streamClient: ClientProvider,
    storeClient: ProviderClient,
    config: StreamConfig = {},
  ) {
    this.streamClient = streamClient;
    this.storeClient = storeClient;
    this.config = config;
  }

  abstract init(
    namespace: string,
    appId: string,
    logger: ILogger,
  ): Promise<void>;

  abstract mintKey(type: KeyType, params: KeyStoreParams): string;

  // Core streaming operations
  abstract createStream(streamName: string): Promise<boolean>;
  abstract deleteStream(streamName: string): Promise<boolean>;

  // Consumer group operations
  abstract createConsumerGroup(
    streamName: string,
    groupName: string,
  ): Promise<boolean>;
  abstract deleteConsumerGroup(
    streamName: string,
    groupName: string,
  ): Promise<boolean>;

  // Message operations
  abstract publishMessages(
    streamName: string,
    messages: string[],
    options?: PublishMessageConfig,
  ): Promise<string[] | ProviderTransaction>;

  abstract consumeMessages(
    streamName: string,
    groupName: string,
    consumerName: string,
    options?: {
      batchSize?: number;
      blockTimeout?: number;
      autoAck?: boolean;
    },
  ): Promise<StreamMessage[]>;

  abstract transact(): ProviderTransaction;

  // Message acknowledgment and deletion
  abstract ackAndDelete(
    streamName: string,
    groupName: string,
    messageIds: string[],
    options?: StringAnyType,
  ): Promise<number>;

  abstract acknowledgeMessages(
    streamName: string,
    groupName: string,
    messageIds: string[],
    options?: StringAnyType,
  ): Promise<number | TransactionProvider>;

  abstract deleteMessages(
    streamName: string,
    groupName: string,
    messageIds: string[],
    options?: StringAnyType,
  ): Promise<number | TransactionProvider>;

  // **Generic Retry Method**
  abstract retryMessages(
    streamName: string,
    groupName: string,
    options?: {
      consumerName?: string;
      minIdleTime?: number;
      messageIds?: string[];
      delay?: number;
      maxRetries?: number;
      limit?: number;
    },
  ): Promise<StreamMessage[]>;

  // Monitoring and maintenance
  abstract getStreamStats(streamName: string): Promise<StreamStats>;

  abstract getStreamDepth(streamName: string): Promise<number>;
  abstract getStreamDepths(
    streamName: { stream: string }[],
  ): Promise<{ stream: string; depth: number }[]>;

  abstract trimStream(
    streamName: string,
    options: {
      maxLen?: number;
      maxAge?: number;
      exactLimit?: boolean;
    },
  ): Promise<number>;

  // Provider-specific helpers
  abstract getProviderSpecificFeatures(): {
    supportsBatching: boolean;
    supportsDeadLetterQueue: boolean;
    supportsOrdering: boolean;
    supportsTrimming: boolean;
    supportsRetry: boolean;
    maxMessageSize: number;
    maxBatchSize: number;
  };
}
// pubsub.ts (IO/Redis, Nats, Postgres, etc)
import { KeyStoreParams, KeyType } from '../../modules/key';
import { ILogger } from '../logger';
import { SubscriptionCallback } from '../../types/quorum';

abstract class SubService<Client, TransactionProvider> {
  protected eventClient: Client;
  protected storeClient: Client;
  protected namespace: string;
  protected logger: ILogger;
  protected appId: string;

  constructor(eventClient: Client, storeClient: Client) {
    this.eventClient = eventClient;
    this.storeClient = storeClient;
  }

  abstract init(
    namespace: string,
    appId: string,
    engineId: string,
    logger: ILogger,
  ): Promise<void>;
  
  abstract transaction(): TransactionProvider;
  
  abstract mintKey(type: KeyType, params: KeyStoreParams): string;
  
  abstract subscribe(
    keyType: KeyType.QUORUM,
    callback: SubscriptionCallback,
    appId: string,
    topic?: string,
  ): Promise<void>;
  
  abstract unsubscribe(
    keyType: KeyType.QUORUM,
    appId: string,
    topic?: string,
  ): Promise<void>;
  
  abstract psubscribe(
    keyType: KeyType.QUORUM,
    callback: SubscriptionCallback,
    appId: string,
    topic?: string,
  ): Promise<void>;
  
  abstract punsubscribe(
    keyType: KeyType.QUORUM,
    appId: string,
    topic?: string,
  ): Promise<void>;
  
  abstract publish(
    keyType: KeyType,
    message: Record,
    appId: string,
    topic?: string
  ): Promise<boolean>;
}
// store.ts (Redis, Postgres, etc)
import { ILogger } from '../logger';
import { SerializerService as Serializer } from '../serializer';
import { HotMeshSettings } from '../../types/hotmesh';
import { Cache } from './cache';
import { KeyStoreParams, KeyType } from '../../modules/key';
import { Consumes } from '../../types/activity';

import {
  StringAnyType,
  Symbols,
  StringStringType,
  SymbolSets,
} from '../../types/serializer';
import { IdsData, JobStatsRange, StatsType } from '../../types/stats';
import { AppVID } from '../../types/app';
import { HookRule, HookSignal } from '../../types/hook';
import { ThrottleOptions } from '../../types/quorum';
import { WorkListTaskType } from '../../types/task';

abstract class StoreService<Client, TransactionProvider> {
  storeClient: Client;
  namespace: string;
  appId: string;
  logger: ILogger;
  cache: Cache;
  serializer: Serializer;

  constructor(client: Client) {
    this.storeClient = client;
  }
  abstract transaction(): TransactionProvider;  

  //domain-level methods
  abstract mintKey(type: KeyType, params: KeyStoreParams): string;
  abstract getSettings(bCreate?: boolean): Promise<HotMeshSettings>;
  abstract setSettings(manifest: HotMeshSettings): Promise<any>;
  abstract getApp(id: string, refresh?: boolean): Promise<any>;
  abstract setApp(id: string, version: string): Promise<any>;
  abstract activateAppVersion(id: string, version: string): Promise<boolean>;
  abstract reserveScoutRole(
    scoutType: 'time' | 'signal' | 'activate',
    delay?: number
  ): Promise<boolean>;
  abstract releaseScoutRole(
    scoutType: 'time' | 'signal' | 'activate'
  ): Promise<boolean>;
  abstract reserveSymbolRange(
    target: string,
    size: number,
    type: 'JOB' | 'ACTIVITY',
    tryCount?: number
  ): Promise<[number, number, Symbols]>;
  abstract getSymbols(activityId: string): Promise<Symbols>;
  abstract addSymbols(activityId: string, symbols: Symbols): Promise<boolean>;
  abstract getSymbolValues(): Promise<Symbols>;
  abstract addSymbolValues(symvals: Symbols): Promise<boolean>;
  abstract getSymbolKeys(symbolNames: string[]): Promise<SymbolSets>;
  abstract setStats(
    jobKey: string,
    jobId: string,
    dateTime: string,
    stats: StatsType,
    appVersion: AppVID,
    transaction?: TransactionProvider
  ): Promise<any>;
  abstract getJobStats(jobKeys: string[]): Promise<JobStatsRange>;
  abstract getJobIds(
    indexKeys: string[],
    idRange: [number, number]
  ): Promise<IdsData>;
  abstract setStatus(
    collationKeyStatus: number,
    jobId: string,
    appId: string,
    transaction?: TransactionProvider
  ): Promise<any>;
  abstract getStatus(jobId: string, appId: string): Promise<number>;
  abstract setStateNX(jobId: string, appId: string, status?: number): Promise<boolean>;
  abstract setState(
    state: StringAnyType,
    status: number | null,
    jobId: string,
    symbolNames: string[],
    dIds: StringStringType,
    transaction?: TransactionProvider
  ): Promise<string>;
  abstract getQueryState(jobId: string, fields: string[]): Promise<StringAnyType>;
  abstract getState(
    jobId: string,
    consumes: Consumes,
    dIds: StringStringType
  ): Promise<[StringAnyType, number] | undefined>;
  abstract getRaw(jobId: string): Promise<StringStringType>;
  abstract collate(
    jobId: string,
    activityId: string,
    amount: number,
    dIds: StringStringType,
    transaction?: TransactionProvider
  ): Promise<number>;
  abstract collateSynthetic(
    jobId: string,
    guid: string,
    amount: number,
    transaction?: TransactionProvider
  ): Promise<number>;
  abstract getSchema(
    activityId: string,
    appVersion: AppVID
  ): Promise<any>;
  abstract getSchemas(appVersion: AppVID): Promise<Record;
  abstract setSchemas(
    schemas: Record,
    appVersion: AppVID
  ): Promise<any>;
  abstract setSubscriptions(
    subscriptions: Record,
    appVersion: AppVID
  ): Promise<boolean>;
  abstract getSubscriptions(appVersion: AppVID): Promise<Record;
  abstract getSubscription(
    topic: string,
    appVersion: AppVID
  ): Promise<string | undefined>;
  abstract setTransitions(
    transitions: Record,
    appVersion: AppVID
  ): Promise<any>;
  abstract getTransitions(appVersion: AppVID): Promise<any>;
  abstract setHookRules(hookRules: Record): Promise<any>;
  abstract getHookRules(): Promise<Record;
  abstract getAllSymbols(): Promise<Symbols>;
  abstract setHookSignal(hook: HookSignal, transaction?: TransactionProvider): Promise<any>;
  abstract getHookSignal(
    topic: string,
    resolved: string
  ): Promise<string | undefined>;
  abstract deleteHookSignal(
    topic: string,
    resolved: string
  ): Promise<number | undefined>;
  abstract addTaskQueues(keys: string[]): Promise<void>;
  abstract getActiveTaskQueue(): Promise<string | null>;
  abstract deleteProcessedTaskQueue(
    workItemKey: string,
    key: string,
    processedKey: string,
    scrub?: boolean
  ): Promise<void>;
  abstract processTaskQueue(
    sourceKey: string,
    destinationKey: string
  ): Promise<any>;
  abstract expireJob(
    jobId: string,
    inSeconds: number,
    redistransaction?: TransactionProvider
  ): Promise<void>;
  abstract getDependencies(jobId: string): Promise<string[]>;
  abstract delistSignalKey(key: string, target: string): Promise<void>;
  abstract registerTimeHook(
    jobId: string,
    gId: string,
    activityId: string,
    type: WorkListTaskType,
    deletionTime: number,
    dad: string,
    transaction?: TransactionProvider
  ): Promise<void>;
  abstract getNextTask(
    listKey?: string
  ): Promise<
    | [
    listKey: string,
    jobId: string,
    gId: string,
    activityId: string,
    type: WorkListTaskType
  ]
    | boolean
  >;
  abstract interrupt(
    topic: string,
    jobId: string,
    options: { [key: string]: any }
  ): Promise<void>;
  abstract scrub(jobId: string): Promise<void>;
  abstract findJobs(
    queryString?: string,
    limit?: number,
    batchSize?: number,
    cursor?: string
  ): Promise<[string, string[]]>;
  abstract findJobFields(
    jobId: string,
    fieldMatchPattern?: string,
    limit?: number,
    batchSize?: number,
    cursor?: string
  ): Promise<[string, StringStringType]>;
  abstract setThrottleRate(options: ThrottleOptions): Promise<void>;
  abstract getThrottleRates(): Promise<StringStringType>;
  abstract getThrottleRate(topic: string): Promise<number>;
}