
Lightweight MQTT tooling for easy connect/subscribe/publish with JSON serialization, Flow-based message streams, automatic reconnects and dynamic broker reconfiguration via config flows.
A library for MQTT easy plug and play usage, based on ktor-mqtt library. This library provides a simple, Kotlin Multiplatform-ready client for interacting with MQTT brokers.
Add the dependency to your build.gradle.kts file:
implementation("io.github.hussein-al-zuhile:kmp-mqtt:1.0.2")First, you need to create a class that inherits from BaseMQTTClient.
import com.hussein.mqtt.BaseMQTTClient
class MyMQTTClient(
ip: String,
port: Int,
clientIdentifier: String = "my-unique-client-id"
) : BaseMQTTClient(ip, port, clientIdentifier) {
// You can add custom logic here if needed
}Define your topics and the data models for the messages. The library uses Kotlinx Serialization for JSON conversion.
import com.hussein.mqtt.MQTTTopic
import kotlinx.serialization.Serializable
@Serializable
data class MyMessage(val content: String)
val myTopic = MQTTTopic<MyMessage>("my/awesome/topic")Instantiate your client and connect to the MQTT broker.
val client = MyMQTTClient("broker.emqx.io", 1883)
// To connect once
val connectionResult = client.connect()
// Or, to automatically handle reconnects
client.connectOrTryReconnect()You can easily subscribe to a topic and get a Flow of messages. The library will handle JSON decoding for you.
import com.hussein.mqtt.utils.subscribeAndListenWhenConnected
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
// ... inside a CoroutineScope
client.subscribeAndListenWhenConnected(myTopic)
.onEach { message: MyMessage ->
println("Received message: ${message.content}")
}
.launchIn(CoroutineScope(Dispatchers.Default))Publishing a message is just as simple. The library handles the JSON encoding.
import de.kempmobil.ktor.mqtt.QoS
// ... inside a suspend function or CoroutineScope
val messageToSend = MyMessage(content = "Hello, MQTT!")
client.publish(
topic = myTopic,
payload = messageToSend,
desiredQoS = QoS.AT_LEAST_ONCE
)To disconnect from the broker:
client.disconnect()If you need to dynamically change the MQTT broker's IP address or port, you can use MqttConfigurationManager. This is useful when the connection details are not known at compile time or can change during the application's lifecycle.
First, create a class that inherits from MqttConfigurationManager:
import com.hussein.mqtt.MqttConfigurationManager
import kotlinx.coroutines.flow.Flow
class MyMqttConfigManager(
ipFlow: Flow<String>,
portFlow: Flow<Int>
) : MqttConfigurationManager<MyMQTTClient>(
ipFlow = ipFlow,
portFlow = portFlow,
initiator = { ip, port -> MyMQTTClient(ip, port) }
) Then, you can use it like this:
import kotlinx.coroutines.flow.MutableStateFlow
// Create flows for your dynamic configuration
val ipFlow = MutableStateFlow("broker.emqx.io")
val portFlow = MutableStateFlow(1883)
// Initialize the manager
val configManager = MyMqttConfigManager(ipFlow, portFlow)
// You can then collect the client and use it
configManager.currentClientStateFlow.collect { client ->
if (client != null) {
// You can now use the client to subscribe, publish, etc.
}
}
// To change the configuration, just emit a new value to the flows
ipFlow.value = "new.broker.address"
portFlow.value = 8883A library for MQTT easy plug and play usage, based on ktor-mqtt library. This library provides a simple, Kotlin Multiplatform-ready client for interacting with MQTT brokers.
Add the dependency to your build.gradle.kts file:
implementation("io.github.hussein-al-zuhile:kmp-mqtt:1.0.2")First, you need to create a class that inherits from BaseMQTTClient.
import com.hussein.mqtt.BaseMQTTClient
class MyMQTTClient(
ip: String,
port: Int,
clientIdentifier: String = "my-unique-client-id"
) : BaseMQTTClient(ip, port, clientIdentifier) {
// You can add custom logic here if needed
}Define your topics and the data models for the messages. The library uses Kotlinx Serialization for JSON conversion.
import com.hussein.mqtt.MQTTTopic
import kotlinx.serialization.Serializable
@Serializable
data class MyMessage(val content: String)
val myTopic = MQTTTopic<MyMessage>("my/awesome/topic")Instantiate your client and connect to the MQTT broker.
val client = MyMQTTClient("broker.emqx.io", 1883)
// To connect once
val connectionResult = client.connect()
// Or, to automatically handle reconnects
client.connectOrTryReconnect()You can easily subscribe to a topic and get a Flow of messages. The library will handle JSON decoding for you.
import com.hussein.mqtt.utils.subscribeAndListenWhenConnected
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
// ... inside a CoroutineScope
client.subscribeAndListenWhenConnected(myTopic)
.onEach { message: MyMessage ->
println("Received message: ${message.content}")
}
.launchIn(CoroutineScope(Dispatchers.Default))Publishing a message is just as simple. The library handles the JSON encoding.
import de.kempmobil.ktor.mqtt.QoS
// ... inside a suspend function or CoroutineScope
val messageToSend = MyMessage(content = "Hello, MQTT!")
client.publish(
topic = myTopic,
payload = messageToSend,
desiredQoS = QoS.AT_LEAST_ONCE
)To disconnect from the broker:
client.disconnect()If you need to dynamically change the MQTT broker's IP address or port, you can use MqttConfigurationManager. This is useful when the connection details are not known at compile time or can change during the application's lifecycle.
First, create a class that inherits from MqttConfigurationManager:
import com.hussein.mqtt.MqttConfigurationManager
import kotlinx.coroutines.flow.Flow
class MyMqttConfigManager(
ipFlow: Flow<String>,
portFlow: Flow<Int>
) : MqttConfigurationManager<MyMQTTClient>(
ipFlow = ipFlow,
portFlow = portFlow,
initiator = { ip, port -> MyMQTTClient(ip, port) }
) Then, you can use it like this:
import kotlinx.coroutines.flow.MutableStateFlow
// Create flows for your dynamic configuration
val ipFlow = MutableStateFlow("broker.emqx.io")
val portFlow = MutableStateFlow(1883)
// Initialize the manager
val configManager = MyMqttConfigManager(ipFlow, portFlow)
// You can then collect the client and use it
configManager.currentClientStateFlow.collect { client ->
if (client != null) {
// You can now use the client to subscribe, publish, etc.
}
}
// To change the configuration, just emit a new value to the flows
ipFlow.value = "new.broker.address"
portFlow.value = 8883