BYODB
Bring your own database and take ownership of your data. Choose Postgres, Redis, NATS, ...
Support Matrix
Store | Stream | Events | Search | |
---|---|---|---|---|
TypeScript |
Redis
Postgres
|
Redis
Postgres
NATS
|
Redis
Postgres
NATS
|
Redis
Postgres
|
Python |
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Rust |
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Go |
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Java |
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Redis
Postgres
|
Any database provider can serve as the backend as long as they adhere to the HotMesh interface contracts (described below).
Getting Started with TypeScript
HotMesh can be deployed as a drop-in Temporal replacement with minimal modification. Just change the connection script to point to your database instead of the Temporal App Server.
Expanded and Concise connection formats are shown below.
Client Initialization
import { Client } from '@hotmeshio/hotmesh';
import Redis from 'ioredis';
import { Client as Postgres } from 'pg';
impot { connect as NATS } from 'nats';
//...other imports
// Option 1a) Concise (connect to a single backend...redis)
const client = new Client({
connection: {
class: Redis,
options: { host: 'redis', port: 6379 }
}
});
// Option 1b) Concise (connect to a single backend...postgres)
const client = new Client({
connection: {
class: Postgres,
options: { host: 'postgres', port: 5432 }
}
});
// Option 2) Explicit (connect with different, purpose-driven backends)
const client = new Client({
connections: {
store: {
class: Postgres,
options: { host: 'postgres', port: 5432 }
},
stream: {
class: Postgres,
options: { host: 'postgres', port: 5432 }
},
sub: {
class: NATS,
options: { servers: ['nats:4222'] }
}
},
});
Worker Initialization
import { Worker } from '@hotmeshio/hotmesh';
import Redis from 'ioredis';
import { Client as Postgres } from 'pg';
impot { connect as NATS } from 'nats';
import * as workflows from './workflows';
// Option 1) Concise
const worker = await Worker.create({
connection: {
class: Redis,
options: { host: 'redis', port: 6379 },
},
taskQueue: 'default',
workflow: workflows.example,
options: {
backoffCoefficient: 2,
maximumAttempts: 1_000,
maximumInterval: '5 seconds'
},
});
// Option 2) Explicit (allows for multiple connections)
const worker = await Worker.create({
connections: {
store: {
class: Postgres,
options: { host: 'postgres', port: 5432 }
},
stream: {
class: Postgres,
options: { host: 'postgres', port: 5432 }
},
sub: {
class: NATS,
options: { servers: ['nats:4222'] }
}
},
taskQueue: 'default',
workflow: workflows.example,
options: {
backoffCoefficient: 2,
maximumAttempts: 1_000,
maximumInterval: '5 seconds'
},
});
Interfaces
The pluggable backend is organized by transport type.
// streams.ts (Redis, Postgres, etc)
import { ILogger } from '../logger';
import {
PublishMessageConfig,
StreamConfig,
StreamMessage,
StreamStats,
} from '../../types/stream';
import { StringAnyType } from '../../types';
import { KeyStoreParams, KeyType } from '../../modules/key';
import { ProviderClient, ProviderTransaction } from '../../types/provider';
export abstract class StreamService<
ClientProvider extends ProviderClient,
TransactionProvider extends ProviderTransaction,
> {
protected streamClient: ClientProvider;
protected storeClient: ProviderClient;
protected namespace: string;
protected logger: ILogger;
protected appId: string;
protected config: StreamConfig;
constructor(
streamClient: ClientProvider,
storeClient: ProviderClient,
config: StreamConfig = {},
) {
this.streamClient = streamClient;
this.storeClient = storeClient;
this.config = config;
}
abstract init(
namespace: string,
appId: string,
logger: ILogger,
): Promise<void>;
abstract mintKey(type: KeyType, params: KeyStoreParams): string;
// Core streaming operations
abstract createStream(streamName: string): Promise<boolean>;
abstract deleteStream(streamName: string): Promise<boolean>;
// Consumer group operations
abstract createConsumerGroup(
streamName: string,
groupName: string,
): Promise<boolean>;
abstract deleteConsumerGroup(
streamName: string,
groupName: string,
): Promise<boolean>;
// Message operations
abstract publishMessages(
streamName: string,
messages: string[],
options?: PublishMessageConfig,
): Promise<string[] | ProviderTransaction>;
abstract consumeMessages(
streamName: string,
groupName: string,
consumerName: string,
options?: {
batchSize?: number;
blockTimeout?: number;
autoAck?: boolean;
},
): Promise<StreamMessage[]>;
abstract transact(): ProviderTransaction;
// Message acknowledgment and deletion
abstract ackAndDelete(
streamName: string,
groupName: string,
messageIds: string[],
options?: StringAnyType,
): Promise<number>;
abstract acknowledgeMessages(
streamName: string,
groupName: string,
messageIds: string[],
options?: StringAnyType,
): Promise<number | TransactionProvider>;
abstract deleteMessages(
streamName: string,
groupName: string,
messageIds: string[],
options?: StringAnyType,
): Promise<number | TransactionProvider>;
// **Generic Retry Method**
abstract retryMessages(
streamName: string,
groupName: string,
options?: {
consumerName?: string;
minIdleTime?: number;
messageIds?: string[];
delay?: number;
maxRetries?: number;
limit?: number;
},
): Promise<StreamMessage[]>;
// Monitoring and maintenance
abstract getStreamStats(streamName: string): Promise<StreamStats>;
abstract getStreamDepth(streamName: string): Promise<number>;
abstract getStreamDepths(
streamName: { stream: string }[],
): Promise<{ stream: string; depth: number }[]>;
abstract trimStream(
streamName: string,
options: {
maxLen?: number;
maxAge?: number;
exactLimit?: boolean;
},
): Promise<number>;
// Provider-specific helpers
abstract getProviderSpecificFeatures(): {
supportsBatching: boolean;
supportsDeadLetterQueue: boolean;
supportsOrdering: boolean;
supportsTrimming: boolean;
supportsRetry: boolean;
maxMessageSize: number;
maxBatchSize: number;
};
}
// pubsub.ts (IO/Redis, Nats, Postgres, etc)
import { KeyStoreParams, KeyType } from '../../modules/key';
import { ILogger } from '../logger';
import { SubscriptionCallback } from '../../types/quorum';
abstract class SubService<Client, TransactionProvider> {
protected eventClient: Client;
protected storeClient: Client;
protected namespace: string;
protected logger: ILogger;
protected appId: string;
constructor(eventClient: Client, storeClient: Client) {
this.eventClient = eventClient;
this.storeClient = storeClient;
}
abstract init(
namespace: string,
appId: string,
engineId: string,
logger: ILogger,
): Promise<void>;
abstract transaction(): TransactionProvider;
abstract mintKey(type: KeyType, params: KeyStoreParams): string;
abstract subscribe(
keyType: KeyType.QUORUM,
callback: SubscriptionCallback,
appId: string,
topic?: string,
): Promise<void>;
abstract unsubscribe(
keyType: KeyType.QUORUM,
appId: string,
topic?: string,
): Promise<void>;
abstract psubscribe(
keyType: KeyType.QUORUM,
callback: SubscriptionCallback,
appId: string,
topic?: string,
): Promise<void>;
abstract punsubscribe(
keyType: KeyType.QUORUM,
appId: string,
topic?: string,
): Promise<void>;
abstract publish(
keyType: KeyType,
message: Record,
appId: string,
topic?: string
): Promise<boolean>;
}
// store.ts (Redis, Postgres, etc)
import { ILogger } from '../logger';
import { SerializerService as Serializer } from '../serializer';
import { HotMeshSettings } from '../../types/hotmesh';
import { Cache } from './cache';
import { KeyStoreParams, KeyType } from '../../modules/key';
import { Consumes } from '../../types/activity';
import {
StringAnyType,
Symbols,
StringStringType,
SymbolSets,
} from '../../types/serializer';
import { IdsData, JobStatsRange, StatsType } from '../../types/stats';
import { AppVID } from '../../types/app';
import { HookRule, HookSignal } from '../../types/hook';
import { ThrottleOptions } from '../../types/quorum';
import { WorkListTaskType } from '../../types/task';
abstract class StoreService<Client, TransactionProvider> {
storeClient: Client;
namespace: string;
appId: string;
logger: ILogger;
cache: Cache;
serializer: Serializer;
constructor(client: Client) {
this.storeClient = client;
}
abstract transaction(): TransactionProvider;
//domain-level methods
abstract mintKey(type: KeyType, params: KeyStoreParams): string;
abstract getSettings(bCreate?: boolean): Promise<HotMeshSettings>;
abstract setSettings(manifest: HotMeshSettings): Promise<any>;
abstract getApp(id: string, refresh?: boolean): Promise<any>;
abstract setApp(id: string, version: string): Promise<any>;
abstract activateAppVersion(id: string, version: string): Promise<boolean>;
abstract reserveScoutRole(
scoutType: 'time' | 'signal' | 'activate',
delay?: number
): Promise<boolean>;
abstract releaseScoutRole(
scoutType: 'time' | 'signal' | 'activate'
): Promise<boolean>;
abstract reserveSymbolRange(
target: string,
size: number,
type: 'JOB' | 'ACTIVITY',
tryCount?: number
): Promise<[number, number, Symbols]>;
abstract getSymbols(activityId: string): Promise<Symbols>;
abstract addSymbols(activityId: string, symbols: Symbols): Promise<boolean>;
abstract getSymbolValues(): Promise<Symbols>;
abstract addSymbolValues(symvals: Symbols): Promise<boolean>;
abstract getSymbolKeys(symbolNames: string[]): Promise<SymbolSets>;
abstract setStats(
jobKey: string,
jobId: string,
dateTime: string,
stats: StatsType,
appVersion: AppVID,
transaction?: TransactionProvider
): Promise<any>;
abstract getJobStats(jobKeys: string[]): Promise<JobStatsRange>;
abstract getJobIds(
indexKeys: string[],
idRange: [number, number]
): Promise<IdsData>;
abstract setStatus(
collationKeyStatus: number,
jobId: string,
appId: string,
transaction?: TransactionProvider
): Promise<any>;
abstract getStatus(jobId: string, appId: string): Promise<number>;
abstract setStateNX(jobId: string, appId: string, status?: number): Promise<boolean>;
abstract setState(
state: StringAnyType,
status: number | null,
jobId: string,
symbolNames: string[],
dIds: StringStringType,
transaction?: TransactionProvider
): Promise<string>;
abstract getQueryState(jobId: string, fields: string[]): Promise<StringAnyType>;
abstract getState(
jobId: string,
consumes: Consumes,
dIds: StringStringType
): Promise<[StringAnyType, number] | undefined>;
abstract getRaw(jobId: string): Promise<StringStringType>;
abstract collate(
jobId: string,
activityId: string,
amount: number,
dIds: StringStringType,
transaction?: TransactionProvider
): Promise<number>;
abstract collateSynthetic(
jobId: string,
guid: string,
amount: number,
transaction?: TransactionProvider
): Promise<number>;
abstract getSchema(
activityId: string,
appVersion: AppVID
): Promise<any>;
abstract getSchemas(appVersion: AppVID): Promise<Record;
abstract setSchemas(
schemas: Record,
appVersion: AppVID
): Promise<any>;
abstract setSubscriptions(
subscriptions: Record,
appVersion: AppVID
): Promise<boolean>;
abstract getSubscriptions(appVersion: AppVID): Promise<Record;
abstract getSubscription(
topic: string,
appVersion: AppVID
): Promise<string | undefined>;
abstract setTransitions(
transitions: Record,
appVersion: AppVID
): Promise<any>;
abstract getTransitions(appVersion: AppVID): Promise<any>;
abstract setHookRules(hookRules: Record): Promise<any>;
abstract getHookRules(): Promise<Record;
abstract getAllSymbols(): Promise<Symbols>;
abstract setHookSignal(hook: HookSignal, transaction?: TransactionProvider): Promise<any>;
abstract getHookSignal(
topic: string,
resolved: string
): Promise<string | undefined>;
abstract deleteHookSignal(
topic: string,
resolved: string
): Promise<number | undefined>;
abstract addTaskQueues(keys: string[]): Promise<void>;
abstract getActiveTaskQueue(): Promise<string | null>;
abstract deleteProcessedTaskQueue(
workItemKey: string,
key: string,
processedKey: string,
scrub?: boolean
): Promise<void>;
abstract processTaskQueue(
sourceKey: string,
destinationKey: string
): Promise<any>;
abstract expireJob(
jobId: string,
inSeconds: number,
redistransaction?: TransactionProvider
): Promise<void>;
abstract getDependencies(jobId: string): Promise<string[]>;
abstract delistSignalKey(key: string, target: string): Promise<void>;
abstract registerTimeHook(
jobId: string,
gId: string,
activityId: string,
type: WorkListTaskType,
deletionTime: number,
dad: string,
transaction?: TransactionProvider
): Promise<void>;
abstract getNextTask(
listKey?: string
): Promise<
| [
listKey: string,
jobId: string,
gId: string,
activityId: string,
type: WorkListTaskType
]
| boolean
>;
abstract interrupt(
topic: string,
jobId: string,
options: { [key: string]: any }
): Promise<void>;
abstract scrub(jobId: string): Promise<void>;
abstract findJobs(
queryString?: string,
limit?: number,
batchSize?: number,
cursor?: string
): Promise<[string, string[]]>;
abstract findJobFields(
jobId: string,
fieldMatchPattern?: string,
limit?: number,
batchSize?: number,
cursor?: string
): Promise<[string, StringStringType]>;
abstract setThrottleRate(options: ThrottleOptions): Promise<void>;
abstract getThrottleRates(): Promise<StringStringType>;
abstract getThrottleRate(topic: string): Promise<number>;
}
// search.ts (Redis, Postgres, etc)
import { ILogger } from '../logger';
abstract class SearchService<Client> {
protected searchClient: Client;
protected storeClient: Client;
protected namespace: string;
protected logger: ILogger;
protected appId: string;
constructor(searchClient: Client, storeClient?: Client) {
this.searchClient = searchClient;
this.storeClient = storeClient;
}
abstract init(namespace: string, appId: string, logger: ILogger): Promise<void>;
// Index management
abstract createSearchIndex(indexName: string, prefixes: string[], schema: string[]): Promise<void>;
abstract listSearchIndexes(): Promise<string[]>;
// Field operations
abstract setFields(key: string, fields: Record): Promise<number>;
abstract getField(key: string, field: string): Promise<string>;
abstract getFields(key: string, fields: string[]): Promise<string[]>;
abstract getAllFields(key: string): Promise<Record;
abstract deleteFields(key: string, fields: string[]): Promise<number>;
abstract incrementFieldByFloat(key: string, field: string, increment: number): Promise<number>;
// Query operations
abstract sendQuery(query: any): Promise<any>;
abstract sendIndexedQuery(index: string, query: any[]): Promise<any>;
}