natsbot

NATS bot
git clone https://git.instinctive.eu/natsbot.git
Log | Files | Refs | README | LICENSE

commit 11b3993073836cde41eccd31206922b663789e5d
parent 843fe7b869909148418c763711c16e3e6ec78462
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Wed, 24 Sep 2025 18:07:16 +0000

Subscriptions are reused on reload
Diffstat:
Mnatsbot.go | 51++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 50 insertions(+), 1 deletion(-)

diff --git a/natsbot.go b/natsbot.go @@ -223,6 +223,7 @@ const ( keyTimerTable keyReloadRequest keyOldCfgMap + keyOldSubsCfgMap ) func registerState(L *lua.LState, evtChan chan *internalEvent, msgChan chan *nats.Msg) { @@ -243,9 +244,11 @@ func stateReloadBegin(oldL, newL *lua.LState) { evtChan := stateEvtChan(oldL) msgChan := stateMsgChan(oldL) cfgMap := stateCfgMap(oldL) + subsCfgMap := stateSubsCfgMap(oldL) registerState(newL, evtChan, msgChan) newL.RawSetInt(stateGet(newL), keyOldCfgMap, newUserData(newL, cfgMap)) + newL.RawSetInt(stateGet(newL), keyOldSubsCfgMap, newUserData(newL, subsCfgMap)) } func stateReloadAbort(oldL, newL *lua.LState) { @@ -255,18 +258,22 @@ func stateReloadAbort(oldL, newL *lua.LState) { func stateReloadEnd(oldL, newL *lua.LState) { stateClean(oldL, newL) newL.RawSetInt(stateGet(newL), keyOldCfgMap, lua.LNil) + newL.RawSetInt(stateGet(newL), keyOldSubsCfgMap, lua.LNil) } func stateClean(L, keptL *lua.LState) { _, connIdx := stateConnTable(L) + _, subsIdx := stateSubsTable(L) st := stateGet(L) L.RawSetInt(st, keyConnTable, newConnTbl(L)) L.RawSetInt(st, keySubsTable, newSubsTbl(L)) var keptConn connMap + var keptSubs subsMap if keptL != nil { _, keptConn = stateConnTable(keptL) + _, keptSubs = stateSubsTable(keptL) } for nc := range connIdx { @@ -281,6 +288,18 @@ func stateClean(L, keptL *lua.LState) { nc.SetReconnectHandler(nil) nc.Close() } + + for ns := range subsIdx { + if _, found := keptSubs[ns]; found { + continue + } + + if ns.IsValid() { + if err := ns.Unsubscribe(); err != nil { + log.Println("Unsubscribe:", err) + } + } + } } func stateUncheckedGet(L *lua.LState) *lua.LTable { @@ -402,6 +421,33 @@ func deleteFromSubsCfgMap(L *lua.LState, s *nats.Subscription) { } } +func findInOldSubsCfgMap(L *lua.LState, nc *nats.Conn, key subsCfg) *nats.Subscription { + ud := stateValue(L, keyOldSubsCfgMap) + if ud == lua.LNil { + return nil + } + + cMap, found := ud.(*lua.LUserData).Value.(subsCfgMap)[key] + if !found { + return nil + } + + subsArray, found := cMap[nc] + if !found { + return nil + } + + _, knownSubsMap := stateSubsTable(L) + for _, s := range subsArray { + _, found = knownSubsMap[s] + if !found { + return s + } + } + + return nil +} + /********** NATS Connection Configuration **********/ type natsConfig struct { @@ -724,7 +770,10 @@ func natsSubscribe(L *lua.LState) int { subject := L.CheckString(2) fn := L.CheckFunction(3) - if s, err := nc.ChanSubscribe(subject, stateMsgChan(L)); err != nil { + if s := findInOldSubsCfgMap(L, nc, subsCfg{subject: subject}); s != nil { + L.Push(wrapSubs(L, fn, s, nc)) + return 1 + } else if s, err := nc.ChanSubscribe(subject, stateMsgChan(L)); err != nil { log.Println("Subscribe:", err) L.Push(lua.LNil) L.Push(lua.LString(err.Error()))