commit 6c047ebd64007ecf1a1f1e4569ef7e634c99bfce
parent 75c71528fa1dcae8f0e9b61d543f283c15cca6d6
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Fri, 30 May 2025 18:34:22 +0000
Internal topics don't reach the broker
Diffstat:
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/mqttagent.go b/mqttagent.go
@@ -49,6 +49,13 @@ type mqttMessage struct {
Message []byte
}
+const internalOffline = "$SYS/self/offline"
+const internalOnline = "$SYS/self/online"
+
+func isInternal(topic string) bool {
+ return strings.HasPrefix(topic, "$SYS/self")
+}
+
func Run(agent MqttAgent, main_script string, capacity int) {
fromMqtt := make(chan mqttMessage, capacity)
@@ -212,7 +219,7 @@ func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, closeSig
toLua <- mqttMessage{
Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
ClientId: id,
- Topic: []byte("$SYS/self/online"),
+ Topic: []byte(internalOnline),
Message: []byte{},
}
@@ -246,7 +253,7 @@ func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, closeSig
toLua <- mqttMessage{
Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
ClientId: id,
- Topic: []byte("$SYS/self/offline"),
+ Topic: []byte(internalOffline),
Message: []byte{},
}
}
@@ -431,7 +438,7 @@ func statePartialCleanup(staleL, keptL *lua.LState) {
staleL.ForEach(staleSub, func(key, _ lua.LValue) {
topic := string(key.(lua.LString))
- if keptL.GetField(keptSub, topic) == lua.LNil {
+ if !isInternal(topic) && keptL.GetField(keptSub, topic) == lua.LNil {
log.Println("Unsubscribing from stale topic", topic)
if err := client.Unsubscribe(nil, topic); err != nil {
log.Println("Failed to unsubscribe:", err)
@@ -441,7 +448,7 @@ func statePartialCleanup(staleL, keptL *lua.LState) {
keptL.ForEach(keptSub, func(key, _ lua.LValue) {
topic := string(key.(lua.LString))
- if staleL.GetField(staleSub, topic) == lua.LNil {
+ if !isInternal(topic) && staleL.GetField(staleSub, topic) == lua.LNil {
log.Println("Subscribing to new topic", topic)
if err := client.Subscribe(nil, topic); err != nil {
log.Println("Failed to subscribe:", err)
@@ -712,7 +719,7 @@ func luaSubscribe(L *lua.LState) int {
_, is_new := L.GetField(tbl, topic).(*lua.LNilType)
- if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
+ if !isInternal(topic) && lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
if callback == nil {
err = client.Unsubscribe(nil, topic)
} else if is_new {