Skip to main content

MQ Controllers

MQ controllers handle message queue operations for transports such as Kafka and RabbitMQ. Each method maps to a channel or topic via @MQOperation decorators. OPRA validates incoming message payloads and encodes outgoing responses automatically.


Defining a Controller

Decorate the class with @MQController(). The controller name defaults to the class name with the Controller suffix stripped.

import { MQController } from '@opra/common';

@MQController()
export class CustomersController {}

@MQController() applies @Injectable() automatically in a NestJS application.

@MQController() returns a chainable builder with the following methods:

Declares a message header shared across all operations in the controller.

@MQController()
.Header('x-tenant-id', { type: 'string', required: true })
export class CustomersController {}

.UseType()

Registers types scoped to this controller without exposing them globally.

import { CustomerStatus } from '../models/enum/customer-status.js';

@MQController()
.UseType(CustomerStatus)
export class CustomersController {}

Defining Operations

Each operation is declared with @MQOperation. The first argument is the payload type. The channel name defaults to the method name.

import { MQController, MQOperation } from '@opra/common';
import { MQContext } from '@opra/kafka'; // or @opra/rabbitmq
import { Customer, CustomerCreate } from '../models/customer.js';

@MQController()
export class CustomersController {
@MQOperation(CustomerCreate)
async create(ctx: MQContext) { ... }

@MQOperation(Customer)
.Response(Customer)
async update(ctx: MQContext) { ... }
}

.Header()

Declares a header specific to this operation.

@MQOperation(CustomerCreate)
.Header('x-idempotency-key', { type: 'string', required: true })
async create(ctx: MQContext) { ... }

.Response()

Declares the response payload type sent back to the caller (RPC pattern). After .Response(), .Header() can be chained to declare response-specific headers.

@MQOperation(CustomerCreate)
.Response(Customer)
.Header('x-request-id', { type: 'string' })
async create(ctx: MQContext) { ... }

.UseType()

Registers types scoped to this operation without exposing them globally.

@MQOperation(CustomerCreate)
.UseType(CustomerStatus)
.Response(Customer)
async create(ctx: MQContext) { ... }

NestJS Integration

Register the controller and its dependencies in the transport module:

// Kafka
import { OpraKafkaModule } from '@opra/nestjs-kafka';

OpraKafkaModule.forRoot({
controllers: [CustomersController],
providers: [CustomersController, CustomersService],
})

// RabbitMQ
import { OpraRabbitmqModule } from '@opra/nestjs-rabbitmq';

OpraRabbitmqModule.forRoot({
controllers: [CustomersController],
providers: [CustomersController, CustomersService],
})