RabbitMQ
Overview
The @opra/rabbitmq package provides RabbitmqAdapter — a platform adapter that integrates an OPRA ApiDocument into a RabbitMQ consumer setup. The adapter connects over AMQP, subscribes to queues derived from your operation channel declarations, decodes incoming messages against your schema (including gzip, deflate, brotli, and base64 encodings), dispatches to your operation handlers, and manages the connection lifecycle — all driven by the type declarations in your schema.
Installation
npm install @opra/rabbitmq
Setup
Create your ApiDocument with transport: 'mq' and platform: 'rabbitmq', pass it to RabbitmqAdapter, then call start().
import { RabbitmqAdapter } from '@opra/rabbitmq';
import { ApiDocumentFactory } from '@opra/common';
import { OrdersController } from './api/orders.controller.js';
const document = await ApiDocumentFactory.createDocument({
info: { title: 'Order API', version: '1.0' },
api: {
transport: 'mq',
platform: 'rabbitmq',
controllers: [OrdersController],
},
});
const adapter = new RabbitmqAdapter(document, {
connection: 'amqp://guest:guest@localhost:5672',
});
await adapter.start();
process.on('SIGTERM', async () => {
await adapter.close();
process.exit(0);
});
RabbitmqAdapter establishes the AMQP connection, creates a consumer for each declared operation channel, and starts dispatching messages. No manual queue wiring is needed.
Adapter Options (RabbitmqAdapter.Config)
| Option | Type | Description |
|---|---|---|
connection | string | string[] | ConnectionOptions | AMQP connection URL(s) or a ConnectionOptions object. |
defaults | { consumer?: ConsumerConfig } | Default consumer options applied to all operations unless overridden. |
scope | string | Validation scope applied during message decoding. |
interceptors | (InterceptorFunction | IRabbitmqInterceptor)[] | Interceptor chain executed on every message. |
logExtra | boolean | Log additional diagnostic output from the AMQP client. |
ConnectionOptions
| Option | Type | Description |
|---|---|---|
urls | string[] | List of AMQP broker URLs. |
username | string | AMQP username. |
password | string | AMQP password. |
heartbeat | number | Heartbeat interval in seconds. |
connectionTimeout | number | Connection timeout in milliseconds. |
retryLow | number | Minimum retry delay in milliseconds. |
retryHigh | number | Maximum retry delay in milliseconds. |
tls | object | TLS options. |
Defining Operations
Use @MQOperation() on a controller method to declare an operation. The channel property maps to a RabbitMQ queue name. Import @opra/rabbitmq to unlock the .RabbitMQ() chained decorator for per-operation consumer configuration.
import { MQController, MQOperation, ApiField, ComplexType } from '@opra/common';
import '@opra/rabbitmq'; // augments MQOperationDecorator with .RabbitMQ()
@ComplexType()
class OrderPayload {
@ApiField() declare orderId: string;
@ApiField() declare amount: number;
}
@MQController()
export class OrdersController {
@MQOperation({ channel: 'orders.created', type: OrderPayload })
async onOrderCreated(ctx: RabbitmqContext) {
const { content, headers, queue } = ctx;
console.log('Received order:', content.orderId);
}
}
Per-operation consumer config (.RabbitMQ())
Chain .RabbitMQ() on @MQOperation() to override consumer options for a specific operation.
@(MQOperation({ channel: 'payments.processed', type: PaymentPayload })
.RabbitMQ({
concurrency: 5,
requeue: false,
qos: { prefetchCount: 10 },
queueOptions: { durable: true },
}))
async onPayment(ctx: RabbitmqContext) { ... }
You can also pass a resolver function for dynamic configuration:
@(MQOperation({ channel: 'orders.created', type: OrderPayload })
.RabbitMQ(async () => {
const cfg = await loadConfig();
return { concurrency: cfg.concurrency };
}))
async onOrderCreated(ctx: RabbitmqContext) { ... }
ConsumerConfig options
| Option | Type | Description |
|---|---|---|
concurrency | number | Number of messages processed in parallel. |
requeue | boolean | Whether to requeue messages on failure. |
qos | { prefetchCount?: number } | Quality-of-service settings. |
queueOptions | object | AMQP queue declaration options (e.g. durable, arguments). |
exchanges | object[] | Exchange bindings to declare. |
exchangeBindings | object[] | Additional exchange-to-queue bindings. |
exclusive | boolean | Declare the queue as exclusive to this connection. |
RabbitmqContext
Every operation handler receives a RabbitmqContext as its first argument.
import { RabbitmqContext } from '@opra/rabbitmq';
async onOrderCreated(ctx: RabbitmqContext) {
ctx.queue // queue name the message arrived on
ctx.content // decoded message payload
ctx.headers // decoded message headers as a plain object
ctx.message // raw AsyncMessage from the AMQP client
ctx.consumer // the Consumer instance
await ctx.reply({ status: 'ok' }) // send a reply (RPC pattern)
}
| Property | Type | Description |
|---|---|---|
queue | string | The queue the message arrived on. |
content | any | Decoded message payload (validated against the operation's type). Content-encoding (gzip, deflate, brotli, base64) and content-type (JSON, text) are handled automatically. |
headers | Record<string, any> | Decoded message headers. |
message | rabbit.AsyncMessage | The raw AMQP message object. |
consumer | rabbit.Consumer | The consumer that received the message. |
reply | ReplyFunction | Sends a reply message — used for RPC-style request/reply flows. |
Request / Reply (RPC pattern)
If an operation returns a value, the adapter automatically calls reply() with the return value. You can also call ctx.reply() directly for more control.
@MQOperation({ channel: 'orders.get', type: OrderPayload })
async getOrder(ctx: RabbitmqContext) {
const order = await this.service.findById(ctx.content.orderId);
return order; // automatically sent as reply
}
Lifecycle
| Method | Description |
|---|---|
adapter.start() | Establishes the AMQP connection and subscribes to all queues. |
adapter.close() | Closes all consumers and the connection, resets to idle. |
adapter.status | Current status: 'idle' | 'starting' | 'started'. |
adapter.connection | The active rabbit.Connection instance, or undefined if not started. |
Interceptors
Interceptors run before the operation handler on every incoming message.
import { RabbitmqContext } from '@opra/rabbitmq';
const adapter = new RabbitmqAdapter(document, {
connection: 'amqp://localhost',
interceptors: [
async (ctx: RabbitmqContext, next) => {
console.log('Message on queue:', ctx.queue);
await next();
},
],
});
Class form:
import { IRabbitmqInterceptor, RabbitmqContext } from '@opra/rabbitmq';
class AuthInterceptor implements IRabbitmqInterceptor {
async intercept(ctx: RabbitmqContext, next: () => Promise<any>) {
if (!ctx.headers['x-api-key']) throw new Error('Unauthorized');
await next();
}
}
const adapter = new RabbitmqAdapter(document, {
connection: 'amqp://localhost',
interceptors: [new AuthInterceptor()],
});
Error Handling
Throw any OpraException from a handler — the adapter catches it, emits an error event, and logs it without crashing the consumer.
import { OpraException } from '@opra/common';
async onOrderCreated(ctx: RabbitmqContext) {
if (!ctx.content.orderId) {
throw new OpraException('Missing orderId');
}
}
Listen to the adapter's error event to handle errors centrally:
adapter.on('error', (error, ctx) => {
console.error('RabbitMQ error:', error.message, ctx?.queue);
});
Events
| Event | Payload | Description |
|---|---|---|
message | rabbit.AsyncMessage, queue: string | Emitted when a raw message is received, before decoding. |
execute | RabbitmqContext | Emitted just before the operation handler is called. |
finish | RabbitmqContext, result | Emitted after the operation handler completes successfully. |
error | Error, RabbitmqContext | undefined | Emitted when an error occurs in a handler or interceptor. |