commit 34d01919b74ac4ad99d0e8a8359fcc9cdbe4b416
parent 68177be8b9e610181b6ae5e5bcf744db29671670
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Thu, 28 Aug 2025 18:56:43 +0000
NATS subscription binding
Diffstat:
M | natsbot.go | | | 99 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- |
1 file changed, 96 insertions(+), 3 deletions(-)
diff --git a/natsbot.go b/natsbot.go
@@ -90,17 +90,24 @@ const (
keyMsgChan
keyCfgMap
keyConnTable
+ keySubsTable
keyTimerTable
)
-func registerState(L *lua.LState, msgChan chan<- *nats.Msg) {
+type subsMap map[*nats.Subscription]int
+
+func registerState(L *lua.LState, msgChan chan *nats.Msg) {
conns := L.NewTable()
L.RawSetInt(conns, 1, newUserData(L, make(connMap)))
+ subs := L.NewTable()
+ L.RawSetInt(subs, 1, newUserData(L, make(subsMap)))
+
st := L.NewTable()
L.RawSetInt(st, keyMsgChan, newUserData(L, msgChan))
L.RawSetInt(st, keyCfgMap, newUserData(L, make(natsConfigMap)))
L.RawSetInt(st, keyConnTable, conns)
+ L.RawSetInt(st, keySubsTable, subs)
L.RawSetInt(st, keyTimerTable, L.NewTable())
stateSet(L, st)
}
@@ -133,9 +140,9 @@ func stateValue(L *lua.LState, key int) lua.LValue {
return L.RawGetInt(stateGet(L), key)
}
-func stateMsgChan(L *lua.LState) chan<- *nats.Msg {
+func stateMsgChan(L *lua.LState) chan *nats.Msg {
ud := stateValue(L, keyMsgChan)
- return ud.(*lua.LUserData).Value.(chan<- *nats.Msg)
+ return ud.(*lua.LUserData).Value.(chan *nats.Msg)
}
func stateCfgMap(L *lua.LState) natsConfigMap {
@@ -146,6 +153,10 @@ func stateConnTable(L *lua.LState) *lua.LTable {
return stateValue(L, keyConnTable).(*lua.LTable)
}
+func stateSubsTable(L *lua.LState) *lua.LTable {
+ return stateValue(L, keySubsTable).(*lua.LTable)
+}
+
func stateTimerTable(L *lua.LState) *lua.LTable {
return stateValue(L, keyTimerTable).(*lua.LTable)
}
@@ -285,6 +296,7 @@ type connMap map[*nats.Conn]int
func registerConnType(L *lua.LState) {
index := L.NewTable()
L.SetField(index, "publish", L.NewFunction(natsPublish))
+ L.SetField(index, "subscribe", L.NewFunction(natsSubscribe))
mt := L.NewTypeMetatable(luaNatsConnTypeName)
L.SetField(mt, "__index", index)
@@ -350,6 +362,10 @@ func wrapConn(L *lua.LState, nc *nats.Conn) lua.LValue {
func checkConn(L *lua.LState, index int) *nats.Conn {
ud := L.CheckUserData(index)
+ if v, ok := ud.Value.(*natsSubs); ok {
+ return v.nc
+ }
+
if v, ok := ud.Value.(*nats.Conn); ok {
return v
}
@@ -374,6 +390,83 @@ func natsPublish(L *lua.LState) int {
}
}
+func natsSubscribe(L *lua.LState) int {
+ nc := checkConn(L, 1)
+ subject := L.CheckString(2)
+ fn := L.CheckFunction(3)
+
+ if s, err := nc.ChanSubscribe(subject, stateMsgChan(L)); err != nil {
+ log.Println("Subscribe:", err)
+ L.Push(lua.LNil)
+ L.Push(lua.LString(err.Error()))
+ return 2
+ } else {
+ L.Push(wrapSubs(L, fn, s, nc))
+ return 1
+ }
+}
+
+/********** Lua Object for NATS subscription **********/
+
+type natsSubs struct {
+ id int
+ nc *nats.Conn
+ subs *nats.Subscription
+}
+
+func wrapSubs(L *lua.LState, fn lua.LValue, ns *nats.Subscription, nc *nats.Conn) lua.LValue {
+ tbl := stateSubsTable(L)
+ id := tbl.Len() + 1
+ luaSub := newUserData(L, &natsSubs{id: id, nc: nc, subs: ns})
+
+ L.RawGetInt(tbl, keyIndex).(*lua.LUserData).Value.(subsMap)[ns] = id
+ L.RawSetInt(tbl, id, luaSub)
+
+ index := L.NewTable()
+ L.SetField(index, "callback", fn)
+ L.SetField(index, "id", lua.LNumber(id))
+ L.SetField(index, "subject", lua.LString(string(ns.Subject)))
+
+ L.SetField(index, "publish", L.NewFunction(natsPublish))
+ L.SetField(index, "subscribe", L.NewFunction(natsSubscribe))
+
+ mt := L.NewTable()
+ L.SetField(mt, "__call", fn)
+ L.SetField(mt, "__index", index)
+ L.SetField(mt, "__newindex", L.NewFunction(subsNewIndex))
+
+ L.SetMetatable(luaSub, mt)
+ return luaSub
+}
+
+func subsNewIndex(L *lua.LState) int {
+ _ = checkSubs(L, 1)
+ mt := L.GetMetatable(L.Get(1))
+ key := L.Get(2)
+
+ if s, ok := key.(lua.LString); !ok || string(s) != "callback" {
+ L.RaiseError("attempt to change bad subscription field")
+ return 0
+ }
+
+ fn := L.CheckFunction(3)
+ L.SetField(mt, "__call", fn)
+ index := L.GetField(mt, "__index").(*lua.LTable)
+ L.SetField(index, "callback", fn)
+ return 0
+}
+
+func checkSubs(L *lua.LState, index int) *natsSubs {
+ ud := L.CheckUserData(index)
+
+ if v, ok := ud.Value.(*natsSubs); ok {
+ return v
+ }
+
+ L.ArgError(index, "subscription expected")
+ return nil
+}
+
/********** Lua Object for timers **********/
const luaTimerTypeName = "timer"