mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

commit 2a2b65e2effca87b95e59cba283a25231da2a609
parent 0e124a7a07ccef32b7c2f8e5694c3839c5727c16
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Wed, 26 Feb 2025 22:12:25 +0000

MQTT clients are re-used across reloads
Diffstat:
Mmqttagent.go | 63+++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 55 insertions(+), 8 deletions(-)

diff --git a/mqttagent.go b/mqttagent.go @@ -228,12 +228,12 @@ func reload(oldL *lua.LState, agent MqttAgent, main_script string) *lua.LState { if err := newL.DoFile(main_script); err != nil { log.Println("Reload failed:", err) - cleanupClients(newL) + stateReloadAbort(oldL, newL) agent.Teardown(newL) newL.Close() return oldL } else { - cleanupClients(oldL) + stateReloadEnd(oldL, newL) agent.Teardown(oldL) oldL.Close() log.Println("Reload successful") @@ -263,6 +263,7 @@ const keyCfgMap = 4 const keyCnxTable = 5 const keyTimerTable = 6 const keyReloadRequest = 7 +const keyOldCfgMap = 8 func registerState(L *lua.LState, clientPrefix string, toLua chan<- MqttMessage) { st := L.NewTable() @@ -281,6 +282,7 @@ func stateReloadBegin(oldL, newL *lua.LState) { toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- MqttMessage) clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix) nextId := oldL.RawGetInt(oldSt, keyClientNextId) + cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) st := newL.NewTable() newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua)) @@ -289,10 +291,34 @@ func stateReloadBegin(oldL, newL *lua.LState) { newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap))) newL.RawSetInt(st, keyCnxTable, newL.NewTable()) newL.RawSetInt(st, keyTimerTable, newL.NewTable()) + newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap)) newL.SetGlobal(luaStateName, st) newL.SetGlobal("reload", newL.NewFunction(requestReload)) } +func stateReloadAbort(oldL, newL *lua.LState) { + statePartialCleanup(newL, oldL) +} + +func stateReloadEnd(oldL, newL *lua.LState) { + statePartialCleanup(oldL, newL) + newSt := newL.GetGlobal(luaStateName).(*lua.LTable) + newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil) +} + +func statePartialCleanup(staleL, keptL *lua.LState) { + staleCnxTable := stateCnxTable(staleL) + keptCnxTable := stateCnxTable(keptL) + + staleL.ForEach(staleCnxTable, func(key, value lua.LValue) { + clientId := int(key.(lua.LNumber)) + if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil { + client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client) + client.Close() + } + }) +} + func stateValue(L *lua.LState, key int) lua.LValue { st := L.GetGlobal(luaStateName) return L.RawGetInt(st.(*lua.LTable), key) @@ -332,6 +358,15 @@ func stateRequestReload(L *lua.LState, v lua.LValue) { L.RawSetInt(st, keyReloadRequest, v) } +func stateOldCfgMap(L *lua.LState) mqttConfigMap { + val := stateValue(L, keyOldCfgMap) + if val == lua.LNil { + return nil + } else { + return val.(*lua.LUserData).Value.(mqttConfigMap) + } +} + func requestReload(L *lua.LState) int { stateRequestReload(L, lua.LTrue) return 0 @@ -418,6 +453,15 @@ func newClient(config *mqttConfig, id string) (*mqtt.Client, error) { return mqtt.VolatileSession(id, &processed_cfg) } +func registerClient(L *lua.LState, id int, client *mqtt.Client) lua.LValue { + res := L.NewTable() + L.RawSetInt(res, keyClient, newUserData(L, client)) + L.RawSetInt(res, keySubTable, L.NewTable()) + L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName)) + L.RawSetInt(stateCnxTable(L), id, res) + return res +} + func newMqttClient(L *lua.LState) int { var config mqttConfig if err := gluamapper.Map(L.CheckTable(1), &config); err != nil { @@ -440,6 +484,14 @@ func newMqttClient(L *lua.LState) int { return 1 } + if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil { + if cfg, found := oldCfgMap[config]; found { + cfgMap[config] = cfg + L.Push(registerClient(L, cfg.id, cfg.client)) + return 1 + } + } + id, idString := stateClientNextId(L) client, err := newClient(&config, idString) if err != nil { @@ -452,12 +504,7 @@ func newMqttClient(L *lua.LState) int { cfgMap[config] = mqttClientEntry{id: id, client: client} - res := L.NewTable() - L.RawSetInt(res, keyClient, newUserData(L, client)) - L.RawSetInt(res, keySubTable, L.NewTable()) - L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName)) - L.RawSetInt(stateCnxTable(L), id, res) - L.Push(res) + L.Push(registerClient(L, id, client)) return 1 }