commit 3955b1bc21d3ce048ecaf63100ac28181c1fbc79
parent 2a2b65e2effca87b95e59cba283a25231da2a609
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Sat, 1 Mar 2025 22:55:11 +0000
MQTT subscriptions are paused during reload and compensated afterwards
Diffstat:
M | mqttagent.go | | | 60 | +++++++++++++++++++++++++++++++++++++++++++++++++++++------- |
1 file changed, 53 insertions(+), 7 deletions(-)
diff --git a/mqttagent.go b/mqttagent.go
@@ -317,6 +317,46 @@ func statePartialCleanup(staleL, keptL *lua.LState) {
client.Close()
}
})
+
+ keptL.ForEach(keptCnxTable, func(key, value lua.LValue) {
+ clientId := int(key.(lua.LNumber))
+ keptCnx := value.(*lua.LTable)
+ if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) {
+ return
+ }
+
+ client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
+ staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable)
+
+ if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) {
+ panic("This shouldn't happen")
+ }
+
+ keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable)
+ staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable)
+
+ staleL.ForEach(staleSub, func(key, _ lua.LValue) {
+ topic := string(key.(lua.LString))
+ if keptL.GetField(keptSub, topic) == lua.LNil {
+ log.Println("Unsubscribing from stale topic", topic)
+ if err := client.Unsubscribe(nil, topic); err != nil {
+ log.Println("Failed to unsubscribe:", err)
+ }
+ }
+ })
+
+ keptL.ForEach(keptSub, func(key, _ lua.LValue) {
+ topic := string(key.(lua.LString))
+ if staleL.GetField(staleSub, topic) == lua.LNil {
+ log.Println("Subscribing to new topic", topic)
+ if err := client.Subscribe(nil, topic); err != nil {
+ log.Println("Failed to subscribe:", err)
+ }
+ }
+ })
+
+ keptL.RawSetInt(keptCnx, keyReused, lua.LNil)
+ })
}
func stateValue(L *lua.LState, key int) lua.LValue {
@@ -377,6 +417,7 @@ func requestReload(L *lua.LState) int {
const luaMqttClientTypeName = "mqttclient"
const keyClient = 1
const keySubTable = 2
+const keyReused = 3
func registerMqttClientType(L *lua.LState) {
mt := L.NewTypeMetatable(luaMqttClientTypeName)
@@ -453,10 +494,13 @@ func newClient(config *mqttConfig, id string) (*mqtt.Client, error) {
return mqtt.VolatileSession(id, &processed_cfg)
}
-func registerClient(L *lua.LState, id int, client *mqtt.Client) lua.LValue {
+func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue {
res := L.NewTable()
L.RawSetInt(res, keyClient, newUserData(L, client))
L.RawSetInt(res, keySubTable, L.NewTable())
+ if reused {
+ L.RawSetInt(res, keyReused, lua.LTrue)
+ }
L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName))
L.RawSetInt(stateCnxTable(L), id, res)
return res
@@ -487,7 +531,7 @@ func newMqttClient(L *lua.LState) int {
if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil {
if cfg, found := oldCfgMap[config]; found {
cfgMap[config] = cfg
- L.Push(registerClient(L, cfg.id, cfg.client))
+ L.Push(registerClient(L, cfg.id, cfg.client, true))
return 1
}
}
@@ -504,7 +548,7 @@ func newMqttClient(L *lua.LState) int {
cfgMap[config] = mqttClientEntry{id: id, client: client}
- L.Push(registerClient(L, id, client))
+ L.Push(registerClient(L, id, client, false))
return 1
}
@@ -555,10 +599,12 @@ func luaSubscribe(L *lua.LState) int {
_, is_new := L.GetField(tbl, topic).(*lua.LNilType)
- if callback == nil {
- err = client.Unsubscribe(nil, topic)
- } else if is_new {
- err = client.Subscribe(nil, topic)
+ if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
+ if callback == nil {
+ err = client.Unsubscribe(nil, topic)
+ } else if is_new {
+ err = client.Subscribe(nil, topic)
+ }
}
if err != nil {