commit a117cb30e7c06d8c0fbe1175d79ef5d682ca9bae
parent c65c57c60558e9b3d1cc04167d608dfbc26d9abd
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Sat, 10 May 2025 18:29:44 +0000
Online and offline syntheic messages
Diffstat:
2 files changed, 36 insertions(+), 0 deletions(-)
diff --git a/README.md b/README.md
@@ -73,6 +73,19 @@ twice, each callback will be called twice).
Since QoS is not supported yet, callbacks should always be ready to handle
multiple and missing messages anyway.
+### Internal event callbacks
+
+When the connection of a client comes up or down, Lua callback is triggered
+as if an empty message has been sent on topic `$SYS/self/online` or
+`$SYS/self/offline`.
+
+Note that due to being offline, the client should not be used in the offline
+callback.
+
+Also note that mqttagent assumes without checking that subscriptions are
+preserved by the broker when coming back online.
+The online callback is a good place to resubscribe if needed.
+
### MQTT message sending
The client has a function-like API to send messages:
diff --git a/mqttagent.go b/mqttagent.go
@@ -188,6 +188,28 @@ func processMsg(L *lua.LState, msg *mqttMessage) {
}
}
+func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int) {
+ for {
+ <-client.Online()
+ log.Println("Online client", id)
+ toLua <- mqttMessage{
+ Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
+ ClientId: id,
+ Topic: []byte("$SYS/self/online"),
+ Message: []byte{},
+ }
+
+ <-client.Offline()
+ log.Println("Offline client", id)
+ toLua <- mqttMessage{
+ Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
+ ClientId: id,
+ Topic: []byte("$SYS/self/offline"),
+ Message: []byte{},
+ }
+ }
+}
+
func mqttRead(client *mqtt.Client, toLua chan<- mqttMessage, id int) {
var big *mqtt.BigMessage
@@ -584,6 +606,7 @@ func newMqttClient(L *lua.LState) int {
L.Push(lua.LString(err.Error()))
return 2
}
+ go mqttMonitor(client, stateChanToLua(L), id)
go mqttRead(client, stateChanToLua(L), id)
cfgMap[config] = mqttClientEntry{id: id, client: client}