Kafka
Overview
The @opra/kafka package provides KafkaAdapter — a platform adapter that integrates an OPRA ApiDocument into a Kafka consumer setup. The adapter subscribes to topics derived from your operation channel declarations, decodes incoming messages against your schema, dispatches to your operation handlers, and manages consumer lifecycles — all driven by the type declarations you made in your schema.
Installation
npm install @opra/kafka
Setup
Create your ApiDocument with a transport: 'mq' and platform: 'kafka' API, pass it to KafkaAdapter, then call start().
import { KafkaAdapter } from '@opra/kafka';
import { ApiDocumentFactory } from '@opra/common';
import { OrdersController } from './api/orders.controller.js';
const document = await ApiDocumentFactory.createDocument({
info: { title: 'Order API', version: '1.0' },
api: {
transport: 'mq',
platform: 'kafka',
controllers: [OrdersController],
},
});
const adapter = new KafkaAdapter(document, {
client: {
bootstrapBrokers: ['localhost:9092'],
},
});
await adapter.start();
process.on('SIGTERM', async () => {
await adapter.close();
process.exit(0);
});
KafkaAdapter connects to the brokers, lists available topics, subscribes to matching channels, and starts dispatching messages — no manual consumer wiring needed.
Adapter Options (KafkaAdapter.Config)
| Option | Type | Description |
|---|---|---|
client | ClientOptions | Kafka client connection options. Must include bootstrapBrokers. |
consumers | Record<string, ConsumerOptions> | Named consumer group configurations. Operations can reference a group by name. |
defaults | { consumer?, subscribe? } | Default consumer and subscription options applied to all operations unless overridden. |
scope | string | Validation scope applied during message decoding. |
interceptors | (InterceptorFunction | IKafkaInterceptor)[] | Interceptor chain executed on every message. |
logExtra | boolean | Log additional diagnostic output from the Kafka client. |
Defining Operations
Use @MQOperation() on a controller method to declare an operation. The channel property maps to one or more Kafka topics. Import @opra/kafka to unlock the .Kafka() chained decorator for per-operation consumer configuration.
import { MQController, MQOperation, ApiField, ComplexType } from '@opra/common';
import '@opra/kafka'; // augments MQOperationDecorator with .Kafka()
@ComplexType()
class OrderPayload {
@ApiField() declare orderId: string;
@ApiField() declare amount: number;
}
@MQController()
export class OrdersController {
@MQOperation({ channel: 'orders.created', type: OrderPayload })
async onOrderCreated(ctx: KafkaContext) {
const { payload, key, headers, topic, partition } = ctx;
console.log('Received order:', payload.orderId);
}
}
Channel patterns
The channel field accepts a string, a RegExp, or an array of either. Regex patterns are matched against the list of available topics fetched from the Kafka admin at startup.
@MQOperation({ channel: /^orders\..+/, type: OrderPayload })
async onAnyOrder(ctx: KafkaContext) { ... }
@MQOperation({ channel: ['orders.created', 'orders.updated'], type: OrderPayload })
async onOrderChange(ctx: KafkaContext) { ... }
Per-operation Kafka config (.Kafka())
Chain .Kafka() on @MQOperation() to override the consumer group or subscription options for a specific operation.
@(MQOperation({ channel: 'payments.processed', type: PaymentPayload })
.Kafka({
consumer: { groupId: 'payments-group', bootstrapBrokers: ['localhost:9092'] },
subscribe: { mode: 'latest' },
}))
async onPayment(ctx: KafkaContext) { ... }
You can also pass a resolver function for dynamic configuration:
@(MQOperation({ channel: 'orders.created', type: OrderPayload })
.Kafka(async () => {
const cfg = await loadConfig();
return { consumer: { groupId: cfg.groupId, bootstrapBrokers: cfg.brokers } };
}))
async onOrderCreated(ctx: KafkaContext) { ... }
KafkaContext
Every operation handler receives a KafkaContext as its first argument.
import { KafkaContext } from '@opra/kafka';
async onOrderCreated(ctx: KafkaContext) {
ctx.topic // Kafka topic name
ctx.partition // partition number
ctx.key // decoded message key
ctx.payload // decoded message payload
ctx.headers // decoded message headers as a plain object
ctx.rawMessage // raw Kafka message object
}
| Property | Type | Description |
|---|---|---|
topic | string | The topic the message arrived on. |
partition | number | The partition number. |
key | any | Decoded message key (validated against the operation's keyType). |
payload | any | Decoded message payload (validated against the operation's type). |
headers | Record<string, any> | Decoded message headers. |
rawMessage | KafkaAdapter.Message | The raw message object from the Kafka client. |
Lifecycle
| Method | Description |
|---|---|
adapter.initialize() | Connects to brokers and creates all consumers. Called automatically by start(). |
adapter.start() | Initializes consumers and begins subscribing to topics. |
adapter.close(force?) | Closes all consumers and resets the adapter to idle. Pass true to force-close. |
adapter.status | Current status: 'idle' | 'starting' | 'started' | 'closing'. |
Interceptors
Interceptors run before the operation handler on every incoming message. Configure them once on the adapter.
import { KafkaContext } from '@opra/kafka';
const adapter = new KafkaAdapter(document, {
client: { bootstrapBrokers: ['localhost:9092'] },
interceptors: [
// Function form
async (ctx: KafkaContext, next) => {
console.log('Received on topic:', ctx.topic);
await next();
},
],
});
Use the class form for interceptors that need shared state or dependency injection:
import { IKafkaInterceptor, KafkaContext } from '@opra/kafka';
class LoggingInterceptor implements IKafkaInterceptor {
async intercept(ctx: KafkaContext, next: () => Promise<any>) {
console.log(`[${ctx.topic}] partition=${ctx.partition}`);
await next();
}
}
const adapter = new KafkaAdapter(document, {
client: { bootstrapBrokers: ['localhost:9092'] },
interceptors: [new LoggingInterceptor()],
});
Error Handling
Throw any OpraException subclass from a handler — the adapter catches it, emits an error event, and logs it without crashing the consumer.
import { OpraException } from '@opra/common';
async onOrderCreated(ctx: KafkaContext) {
if (!ctx.payload.orderId) {
throw new OpraException('Missing orderId');
}
}
Listen to the adapter's error event to handle errors centrally:
adapter.on('error', (error, ctx) => {
console.error('Kafka error:', error.message, ctx?.topic);
});
Events
| Event | Payload | Description |
|---|---|---|
message | KafkaAdapter.Message | Emitted when a raw message is received, before decoding. |
execute | KafkaContext | Emitted just before the operation handler is called. |
finish | KafkaContext, result | Emitted after the operation handler completes successfully. |
error | Error, KafkaContext | undefined | Emitted when an error occurs in a handler or interceptor. |