commit 41db920dfc7261620d9c789e5d497fd41dec8bdf
parent 9c49acaed4653d2c0a6714df39399c52903bc3fd
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Tue, 13 May 2025 17:58:06 +0000
mqttMonitor is cleaned up when client is closed
Diffstat:
1 file changed, 26 insertions(+), 11 deletions(-)
diff --git a/mqttagent.go b/mqttagent.go
@@ -117,6 +117,7 @@ func cleanupClients(L *lua.LState) {
L.ForEach(cnxTbl, func(key, value lua.LValue) {
cnx := value.(*lua.LTable)
client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
+ close(L.RawGetInt(cnx, keyCloseSig).(*lua.LUserData).Value.(chan struct{}))
if err := client.Disconnect(nil); err != nil {
log.Printf("cleanup client %s: %v", lua.LVAsString(key), err)
}
@@ -181,6 +182,7 @@ func processMsg(L *lua.LState, msg *mqttMessage) {
if tableIsEmpty(subTbl) {
client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
+ close(L.RawGetInt(cnx, keyCloseSig).(*lua.LUserData).Value.(chan struct{}))
if err := client.Disconnect(nil); err != nil {
log.Println("disconnect empty client:", err)
}
@@ -188,7 +190,7 @@ func processMsg(L *lua.LState, msg *mqttMessage) {
}
}
-func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, keepAliveS uint16) {
+func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, closeSig <-chan struct{}, keepAliveS uint16) {
var tickerCh <-chan time.Time
var ticker *time.Ticker = nil
defer func() {
@@ -200,7 +202,12 @@ func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, keepAliv
keepAlive := time.Duration(keepAliveS) * time.Second
for {
- <-client.Online()
+ select {
+ case <-closeSig:
+ return
+ case <-client.Online():
+ }
+
log.Println("Online client", id)
toLua <- mqttMessage{
Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
@@ -218,6 +225,8 @@ func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, keepAliv
for online {
select {
+ case <-closeSig:
+ return
case <-client.Offline():
online = false
case <-tickerCh:
@@ -394,7 +403,9 @@ func statePartialCleanup(staleL, keptL *lua.LState) {
staleL.ForEach(staleCnxTable, func(key, value lua.LValue) {
clientId := int(key.(lua.LNumber))
if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil {
- client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client)
+ tbl := value.(*lua.LTable)
+ client := staleL.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
+ close(staleL.RawGetInt(tbl, keyCloseSig).(*lua.LUserData).Value.(chan struct{}))
client.Close()
}
})
@@ -502,8 +513,9 @@ func requestReload(L *lua.LState) int {
const luaMqttClientTypeName = "mqttclient"
const keyClient = 1
-const keySubTable = 2
-const keyReused = 3
+const keyCloseSig = 2
+const keySubTable = 3
+const keyReused = 4
func registerMqttClientType(L *lua.LState) {
mt := L.NewTypeMetatable(luaMqttClientTypeName)
@@ -534,8 +546,9 @@ type mqttConfig struct {
}
type mqttClientEntry struct {
- client *mqtt.Client
- id int
+ client *mqtt.Client
+ closeSig chan struct{}
+ id int
}
type mqttConfigMap map[mqttConfig]mqttClientEntry
@@ -589,9 +602,10 @@ func newClient(config *mqttConfig, id string) (*mqtt.Client, error) {
return mqtt.VolatileSession(id, &processed_cfg)
}
-func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue {
+func registerClient(L *lua.LState, id int, client *mqtt.Client, closeSig chan struct{}, reused bool) lua.LValue {
res := L.NewTable()
L.RawSetInt(res, keyClient, newUserData(L, client))
+ L.RawSetInt(res, keyCloseSig, newUserData(L, closeSig))
L.RawSetInt(res, keySubTable, L.NewTable())
if reused {
L.RawSetInt(res, keyReused, lua.LTrue)
@@ -626,7 +640,7 @@ func newMqttClient(L *lua.LState) int {
if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil {
if cfg, found := oldCfgMap[config]; found {
cfgMap[config] = cfg
- L.Push(registerClient(L, cfg.id, cfg.client, true))
+ L.Push(registerClient(L, cfg.id, cfg.client, cfg.closeSig, true))
return 1
}
}
@@ -639,12 +653,13 @@ func newMqttClient(L *lua.LState) int {
L.Push(lua.LString(err.Error()))
return 2
}
- go mqttMonitor(client, stateChanToLua(L), id, config.KeepAlive)
+ closeSig := make(chan struct{}, 1)
+ go mqttMonitor(client, stateChanToLua(L), id, closeSig, config.KeepAlive)
go mqttRead(client, stateChanToLua(L), id)
cfgMap[config] = mqttClientEntry{id: id, client: client}
- L.Push(registerClient(L, id, client, false))
+ L.Push(registerClient(L, id, client, closeSig, false))
return 1
}