
Ktor plugin integrating RabbitMQ clients with coroutine-aware dispatcher, connection/channel management, DSL-based publish/consume, direct Java-client interop, serialization fallback, dead-letter handling, and custom coroutine-scope support.
com.rabbitmq:amqp-client and
dev.kourier:amqp-client libraries.This library is available in multiple distributions. Choose the one that best fits your needs:
This distribution is an alias to the Java Client distribution for the JVM platform, and to the Kourier Client distribution for Kotlin Native platforms.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq:<version>")
}This distribution uses the official RabbitMQ Java client library
(com.rabbitmq:amqp-client) under the hood, and is available only for the JVM platform.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-java:<version>")
}This distribution uses the pure Kotlin Kourier client library
(dev.kourier:amqp-client) under the hood, and is available for both JVM and Kotlin Native platforms.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-kourier:<version>")
}install(RabbitMQ) {
uri = "amqp://<user>:<password>@<address>:<port>"
defaultConnectionName = "<default_connection>"
connectionAttempts = 20
attemptDelay = 10
dispatcherThreadPollSize = 4
tlsEnabled = false
}rabbitmq {
queueBind {
queue = "demo-queue"
exchange = "demo-exchange"
routingKey = "demo-routing-key"
queueDeclare {
queue = "demo-queue"
durable = true
}
exchangeDeclare {
exchange = "demo-exchange"
type = "direct"
}
}
}rabbitmq {
repeat(10) {
basicPublish {
exchange = "demo-exchange"
routingKey = "demo-routing-key"
message { "Hello World!" }
}
}
}rabbitmq {
basicConsume {
autoAck = true
queue = "demo-queue"
deliverCallback<String> { message ->
logger.info("Received message: $message")
}
}
}rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}rabbitmq {
libChannel(id = 2) {
basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
}
}
basicConsume("demo-queue", true, consumer)
}
}rabbitmq {
libConnection(id = "lib-connection") {
val channel = createChannel()
channel.basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
}
}
channel.basicConsume("demo-queue", true, consumer)
}
}fun Application.module() {
// Production RabbitMQ cluster
install(RabbitMQ(instanceName = "production")) {
uri = "amqp://prod-user:prod-pass@prod-rabbitmq:5672"
dispatcherThreadPollSize = 8
}
// Analytics RabbitMQ cluster
install(RabbitMQ(instanceName = "analytics")) {
uri = "amqp://analytics-user:analytics-pass@analytics-rabbitmq:5672"
dispatcherThreadPollSize = 4
}
// Setup production queues
rabbitmq(instanceName = "production") {
queueBind {
queue = "orders"
exchange = "orders-exchange"
routingKey = "order.created"
queueDeclare {
queue = "orders"
durable = true
}
exchangeDeclare {
exchange = "orders-exchange"
type = "direct"
}
}
}
// Setup analytics queues
rabbitmq(instanceName = "analytics") {
queueBind {
queue = "events"
exchange = "analytics-exchange"
routingKey = "user.action"
queueDeclare {
queue = "events"
durable = false
}
exchangeDeclare {
exchange = "analytics-exchange"
type = "topic"
}
}
}
// Process critical orders
rabbitmq(instanceName = "production") {
basicConsume {
queue = "orders"
autoAck = false
deliverCallback<String> { message ->
// Process order
processOrder(message.body)
basicAck {
deliveryTag = message.envelope.deliveryTag
}
// Send analytics event
rabbitmq(instanceName = "analytics") {
basicPublish {
exchange = "analytics-exchange"
routingKey = "user.action"
message { "Order processed: ${message.body}" }
}
}
}
}
}
}fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
defaultConnectionName = "default"
}
rabbitmq {
// Setup queues and exchanges
queueBind {
queue = "orders-queue"
exchange = "orders-exchange"
routingKey = "order.created"
queueDeclare {
queue = "orders-queue"
durable = true
}
exchangeDeclare {
exchange = "orders-exchange"
type = "direct"
}
}
}
// Producer connection
rabbitmq {
connection(id = "producer") {
repeat(100) {
basicPublish {
exchange = "orders-exchange"
routingKey = "order.created"
message { "Order #$it created" }
}
}
}
}
// Consumer connection with high throughput
rabbitmq {
connection(id = "consumer") {
basicConsume {
queue = "orders-queue"
autoAck = false
dispatcher = Dispatchers.IO
coroutinePollSize = 10
deliverCallback<String> { message ->
// Process order
println("Processing: ${message.body}")
delay(100) // Simulate processing time
basicAck {
deliveryTag = message.envelope.deliveryTag
}
}
}
}
}
}val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
println("ExceptionHandler got $throwable")
}
val rabbitMQScope = CoroutineScope(SupervisorJob() + exceptionHandler)
// ...
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
scope = rabbitMQScope
}
// ...
rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
throw Exception("business logic exception")
}
}
}
}
@Serializable
data class Message(
var content: String
)
fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
dispatcherThreadPollSize = 3
}
rabbitmq {
queueBind {
queue = "test-queue"
exchange = "test-exchange"
queueDeclare {
queue = "test-queue"
arguments = mapOf(
"x-dead-letter-exchange" to "dlx",
"x-dead-letter-routing-key" to "dlq-dlx"
)
}
exchangeDeclare {
exchange = "test-exchange"
type = "fanout"
}
}
}
rabbitmq {
repeat(10) {
basicPublish {
exchange = "test-exchange"
message {
Message(content = "Hello world!")
}
}
}
repeat(10) {
basicPublish {
exchange = "test-exchange"
message { "Hello world!" }
}
}
}
rabbitmq {
basicConsume {
queue = "test-queue"
autoAck = false
deliverCallback<Message> { message ->
println("Received as Message: ${message.body}")
}
deliverFailureCallback { message ->
println("Could not serialize, received as ByteArray: ${message.body}")
}
}
}
}@Serializable
data class Message(
var content: String
)
fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
dispatcherThreadPollSize = 3
}
rabbitmq {
queueBind {
queue = "dlq"
exchange = "dlx"
routingKey = "dlq-dlx"
queueDeclare {
queue = "dlq"
durable = true
}
exchangeDeclare {
exchange = "dlx"
type = "direct"
}
}
queueBind {
queue = "test-queue"
exchange = "test-exchange"
queueDeclare {
queue = "test-queue"
arguments = mapOf(
"x-dead-letter-exchange" to "dlx",
"x-dead-letter-routing-key" to "dlq-dlx"
)
}
exchangeDeclare {
exchange = "test-exchange"
type = "fanout"
}
}
}
rabbitmq {
repeat(100) {
basicPublish {
exchange = "test-exchange"
message {
Message(content = "Hello world!")
}
}
}
}
rabbitmq {
basicConsume {
queue = "test-queue"
autoAck = false
deliverCallback<Message> { message ->
basicReject {
deliveryTag = message.envelope.deliveryTag
requeue = false
}
}
}
basicConsume {
queue = "dlq"
autoAck = true
deliverCallback<Message> { message ->
println("Received message in dead letter queue: ${message.body}")
}
}
}
}logback.xml file:<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>com.rabbitmq:amqp-client and
dev.kourier:amqp-client libraries.This library is available in multiple distributions. Choose the one that best fits your needs:
This distribution is an alias to the Java Client distribution for the JVM platform, and to the Kourier Client distribution for Kotlin Native platforms.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq:<version>")
}This distribution uses the official RabbitMQ Java client library
(com.rabbitmq:amqp-client) under the hood, and is available only for the JVM platform.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-java:<version>")
}This distribution uses the pure Kotlin Kourier client library
(dev.kourier:amqp-client) under the hood, and is available for both JVM and Kotlin Native platforms.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-kourier:<version>")
}install(RabbitMQ) {
uri = "amqp://<user>:<password>@<address>:<port>"
defaultConnectionName = "<default_connection>"
connectionAttempts = 20
attemptDelay = 10
dispatcherThreadPollSize = 4
tlsEnabled = false
}rabbitmq {
queueBind {
queue = "demo-queue"
exchange = "demo-exchange"
routingKey = "demo-routing-key"
queueDeclare {
queue = "demo-queue"
durable = true
}
exchangeDeclare {
exchange = "demo-exchange"
type = "direct"
}
}
}rabbitmq {
repeat(10) {
basicPublish {
exchange = "demo-exchange"
routingKey = "demo-routing-key"
message { "Hello World!" }
}
}
}rabbitmq {
basicConsume {
autoAck = true
queue = "demo-queue"
deliverCallback<String> { message ->
logger.info("Received message: $message")
}
}
}rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}rabbitmq {
libChannel(id = 2) {
basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
}
}
basicConsume("demo-queue", true, consumer)
}
}rabbitmq {
libConnection(id = "lib-connection") {
val channel = createChannel()
channel.basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
}
}
channel.basicConsume("demo-queue", true, consumer)
}
}fun Application.module() {
// Production RabbitMQ cluster
install(RabbitMQ(instanceName = "production")) {
uri = "amqp://prod-user:prod-pass@prod-rabbitmq:5672"
dispatcherThreadPollSize = 8
}
// Analytics RabbitMQ cluster
install(RabbitMQ(instanceName = "analytics")) {
uri = "amqp://analytics-user:analytics-pass@analytics-rabbitmq:5672"
dispatcherThreadPollSize = 4
}
// Setup production queues
rabbitmq(instanceName = "production") {
queueBind {
queue = "orders"
exchange = "orders-exchange"
routingKey = "order.created"
queueDeclare {
queue = "orders"
durable = true
}
exchangeDeclare {
exchange = "orders-exchange"
type = "direct"
}
}
}
// Setup analytics queues
rabbitmq(instanceName = "analytics") {
queueBind {
queue = "events"
exchange = "analytics-exchange"
routingKey = "user.action"
queueDeclare {
queue = "events"
durable = false
}
exchangeDeclare {
exchange = "analytics-exchange"
type = "topic"
}
}
}
// Process critical orders
rabbitmq(instanceName = "production") {
basicConsume {
queue = "orders"
autoAck = false
deliverCallback<String> { message ->
// Process order
processOrder(message.body)
basicAck {
deliveryTag = message.envelope.deliveryTag
}
// Send analytics event
rabbitmq(instanceName = "analytics") {
basicPublish {
exchange = "analytics-exchange"
routingKey = "user.action"
message { "Order processed: ${message.body}" }
}
}
}
}
}
}fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
defaultConnectionName = "default"
}
rabbitmq {
// Setup queues and exchanges
queueBind {
queue = "orders-queue"
exchange = "orders-exchange"
routingKey = "order.created"
queueDeclare {
queue = "orders-queue"
durable = true
}
exchangeDeclare {
exchange = "orders-exchange"
type = "direct"
}
}
}
// Producer connection
rabbitmq {
connection(id = "producer") {
repeat(100) {
basicPublish {
exchange = "orders-exchange"
routingKey = "order.created"
message { "Order #$it created" }
}
}
}
}
// Consumer connection with high throughput
rabbitmq {
connection(id = "consumer") {
basicConsume {
queue = "orders-queue"
autoAck = false
dispatcher = Dispatchers.IO
coroutinePollSize = 10
deliverCallback<String> { message ->
// Process order
println("Processing: ${message.body}")
delay(100) // Simulate processing time
basicAck {
deliveryTag = message.envelope.deliveryTag
}
}
}
}
}
}val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
println("ExceptionHandler got $throwable")
}
val rabbitMQScope = CoroutineScope(SupervisorJob() + exceptionHandler)
// ...
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
scope = rabbitMQScope
}
// ...
rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
throw Exception("business logic exception")
}
}
}
}
@Serializable
data class Message(
var content: String
)
fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
dispatcherThreadPollSize = 3
}
rabbitmq {
queueBind {
queue = "test-queue"
exchange = "test-exchange"
queueDeclare {
queue = "test-queue"
arguments = mapOf(
"x-dead-letter-exchange" to "dlx",
"x-dead-letter-routing-key" to "dlq-dlx"
)
}
exchangeDeclare {
exchange = "test-exchange"
type = "fanout"
}
}
}
rabbitmq {
repeat(10) {
basicPublish {
exchange = "test-exchange"
message {
Message(content = "Hello world!")
}
}
}
repeat(10) {
basicPublish {
exchange = "test-exchange"
message { "Hello world!" }
}
}
}
rabbitmq {
basicConsume {
queue = "test-queue"
autoAck = false
deliverCallback<Message> { message ->
println("Received as Message: ${message.body}")
}
deliverFailureCallback { message ->
println("Could not serialize, received as ByteArray: ${message.body}")
}
}
}
}@Serializable
data class Message(
var content: String
)
fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
dispatcherThreadPollSize = 3
}
rabbitmq {
queueBind {
queue = "dlq"
exchange = "dlx"
routingKey = "dlq-dlx"
queueDeclare {
queue = "dlq"
durable = true
}
exchangeDeclare {
exchange = "dlx"
type = "direct"
}
}
queueBind {
queue = "test-queue"
exchange = "test-exchange"
queueDeclare {
queue = "test-queue"
arguments = mapOf(
"x-dead-letter-exchange" to "dlx",
"x-dead-letter-routing-key" to "dlq-dlx"
)
}
exchangeDeclare {
exchange = "test-exchange"
type = "fanout"
}
}
}
rabbitmq {
repeat(100) {
basicPublish {
exchange = "test-exchange"
message {
Message(content = "Hello world!")
}
}
}
}
rabbitmq {
basicConsume {
queue = "test-queue"
autoAck = false
deliverCallback<Message> { message ->
basicReject {
deliveryTag = message.envelope.deliveryTag
requeue = false
}
}
}
basicConsume {
queue = "dlq"
autoAck = true
deliverCallback<Message> { message ->
println("Received message in dead letter queue: ${message.body}")
}
}
}
}logback.xml file:<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>