commit 843fe7b869909148418c763711c16e3e6ec78462
parent 3e0a3f9a95f1fd4de72ebf0347845dfcc81db47d
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Tue, 23 Sep 2025 17:22:29 +0000
Subscriptions are tracked by their configuration
Diffstat:
M | natsbot.go | | | 51 | +++++++++++++++++++++++++++++++++++++++++++++++++++ |
1 file changed, 51 insertions(+), 0 deletions(-)
diff --git a/natsbot.go b/natsbot.go
@@ -98,6 +98,7 @@ func Loop(cb NatsBot, mainScript string, capacity int) {
tbl, idx := stateSubsTable(L)
L.RawSetInt(tbl, idx[s], lua.LNil)
delete(idx, s)
+ deleteFromSubsCfgMap(L, s)
} else {
log.Printf("Subscription %q is still valid", s.Subject)
}
@@ -217,6 +218,7 @@ const (
keyMsgChan
keyCfgMap
keyConnTable
+ keySubsCfgMap
keySubsTable
keyTimerTable
keyReloadRequest
@@ -229,6 +231,7 @@ func registerState(L *lua.LState, evtChan chan *internalEvent, msgChan chan *nat
L.RawSetInt(st, keyMsgChan, newUserData(L, msgChan))
L.RawSetInt(st, keyCfgMap, newUserData(L, make(natsConfigMap)))
L.RawSetInt(st, keyConnTable, newConnTbl(L))
+ L.RawSetInt(st, keySubsCfgMap, newUserData(L, make(subsCfgMap)))
L.RawSetInt(st, keySubsTable, newSubsTbl(L))
L.RawSetInt(st, keyTimerTable, L.NewTable())
stateSet(L, st)
@@ -328,6 +331,11 @@ func stateConnTable(L *lua.LState) (*lua.LTable, connMap) {
return tbl, idx
}
+func stateSubsCfgMap(L *lua.LState) subsCfgMap {
+ ud := stateValue(L, keySubsCfgMap)
+ return ud.(*lua.LUserData).Value.(subsCfgMap)
+}
+
func stateSubsTable(L *lua.LState) (*lua.LTable, subsMap) {
tbl := stateValue(L, keySubsTable).(*lua.LTable)
idx := L.RawGetInt(tbl, keyIndex).(*lua.LUserData).Value.(subsMap)
@@ -360,6 +368,40 @@ func requestReload(L *lua.LState) int {
return 0
}
+func addToSubsCfgMap(L *lua.LState, nc *nats.Conn, s *nats.Subscription) {
+ cfgMap := stateSubsCfgMap(L)
+ subsKey := subsCfg{subject: s.Subject, queue: s.Queue}
+ cmap, found := cfgMap[subsKey]
+ if !found {
+ cmap = make(map[*nats.Conn][]*nats.Subscription)
+ cfgMap[subsKey] = cmap
+ }
+ cmap[nc] = append(cmap[nc], s)
+}
+
+func deleteFromSubsCfgMap(L *lua.LState, s *nats.Subscription) {
+ wholeMap := stateSubsCfgMap(L)
+ subsKey := subsCfg{subject: s.Subject, queue: s.Queue}
+ cMap := wholeMap[subsKey]
+ for nc, subsArray := range cMap {
+ n := 0
+ for _, ns := range subsArray {
+ if ns != s {
+ subsArray[n] = s
+ n++
+ }
+ }
+ if n > 0 {
+ cMap[nc] = subsArray[:n]
+ } else {
+ delete(cMap, nc)
+ }
+ }
+ if len(cMap) == 0 {
+ delete(wholeMap, subsKey)
+ }
+}
+
/********** NATS Connection Configuration **********/
type natsConfig struct {
@@ -703,6 +745,13 @@ type natsSubs struct {
type subsMap map[*nats.Subscription]int
+type subsCfg struct {
+ subject string
+ queue string
+}
+
+type subsCfgMap map[subsCfg]map[*nats.Conn][]*nats.Subscription
+
func newSubsTbl(L *lua.LState) lua.LValue {
subsTbl := L.NewTable()
L.RawSetInt(subsTbl, keyIndex, newUserData(L, make(subsMap)))
@@ -717,6 +766,8 @@ func wrapSubs(L *lua.LState, fn lua.LValue, ns *nats.Subscription, nc *nats.Conn
subsIdx[ns] = id
L.RawSetInt(tbl, id, luaSub)
+ addToSubsCfgMap(L, nc, ns)
+
index := L.NewTable()
L.SetField(index, "callback", fn)
L.SetField(index, "id", lua.LNumber(id))