mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

commit da737ac7c28b10da2693c8c76e9d5d25ba684d1b
parent 2e9d5a773fce033a3d2ac7e7b173c057e0d15a4f
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Tue,  7 Jan 2025 18:54:31 +0000

Code is shuffled around somewhat more logically
Diffstat:
Mmqttagent.go | 261++++++++++++++++++++++++++++++++++++++-----------------------------------------
1 file changed, 125 insertions(+), 136 deletions(-)

diff --git a/mqttagent.go b/mqttagent.go @@ -42,15 +42,110 @@ type MqttMessage struct { Message []byte } -type mqttConnection struct { - client *mqtt.Client - toLua chan MqttMessage +func Run(agent MqttAgent, main_script string) { + fromMqtt := make(chan MqttMessage) + + L := lua.NewState() + defer L.Close() + + agent.Setup(L) + defer agent.Teardown(L) + + hostname, err := os.Hostname() + if err != nil { + hostname = "<unknown>" + } + + registerMqttClientType(L) + registerState(L, fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()), &fromMqtt) + defer cleanupClients(L) + + if err := L.DoFile(main_script); err != nil { + panic(err) + } + + for { + msg, ok := <-fromMqtt + + if !ok { + break + } + + agent.Log(L, &msg) + + cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) + subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) + L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, &msg, cnx, key, value) }) + + if key, _ := subTbl.Next(lua.LNil); key == lua.LNil { + client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) + client.Disconnect(nil) + L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) + } + + if stateCnxTable(L).Len() == 0 { + break + } + } } -func dup(src []byte) []byte { - res := make([]byte, len(src)) - copy(res, src) - return res +func cleanupClients(L *lua.LState) { + cnxTbl := stateCnxTable(L) + if cnxTbl == nil { + return + } + + L.ForEach(cnxTbl, func(key, value lua.LValue) { + cnx := value.(*lua.LTable) + client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) + client.Disconnect(nil) + }) +} + +func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) { + skey, ok := key.(lua.LString) + topic := string(msg.Topic) + + if ok && match(topic, string(skey)) { + err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, + cnx, + lua.LString(string(msg.Message)), + lua.LString(topic), + lua.LNumber(msg.Timestamp)) + if err != nil { + panic(err) + } + } +} + +func matchSliced(actual, filter []string) bool { + if len(filter) == 0 { + return len(actual) == 0 + } + + if filter[0] == "#" { + if len(filter) == 1 { + return true + } + + for i := range actual { + if matchSliced(actual[i:], filter[1:]) { + return true + } + } + + return false + } + + if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { + return matchSliced(actual[1:], filter[1:]) + } + + return false +} + +func match(actual, filter string) bool { + return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) } func mqttRead(client *mqtt.Client, toLua chan MqttMessage, id int) error { @@ -77,28 +172,19 @@ func mqttRead(client *mqtt.Client, toLua chan MqttMessage, id int) error { } } -const luaMqttClientTypeName = "mqttclient" +func dup(src []byte) []byte { + res := make([]byte, len(src)) + copy(res, src) + return res +} + +/********** State Object in the Lua Interpreter **********/ -/* Per-LState global state */ const luaStateName = "_mqttagent" const keyChanToLua = 1 const keyClientPrefix = 2 const keyCnxTable = 3 -/* Per-Connection state */ -const keyClient = 1 -const keySubTable = 2 - -func registerMqttClientType(L *lua.LState) { - mt := L.NewTypeMetatable(luaMqttClientTypeName) - L.SetGlobal(luaMqttClientTypeName, mt) - L.SetField(mt, "new", L.NewFunction(newMqttClient)) - L.SetField(mt, "__gc", L.NewFunction(deleteMqttClient)) - L.SetField(mt, "__call", L.NewFunction(luaPublish)) - L.SetField(mt, "__index", L.NewFunction(luaQuery)) - L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) -} - func registerState(L *lua.LState, clientPrefix string, toLua *chan MqttMessage) { ud := L.NewUserData() ud.Value = toLua @@ -110,13 +196,6 @@ func registerState(L *lua.LState, clientPrefix string, toLua *chan MqttMessage) L.SetGlobal(luaStateName, st) } -func stateUpdateValue(L *lua.LState, key int, newValue lua.LValue) lua.LValue { - st := L.GetGlobal(luaStateName).(*lua.LTable) - oldValue := L.RawGetInt(st, key) - L.RawSetInt(st, key, newValue) - return oldValue -} - func stateValue(L *lua.LState, key int) lua.LValue { st := L.GetGlobal(luaStateName) return L.RawGetInt(st.(*lua.LTable), key) @@ -135,6 +214,22 @@ func stateCnxTable(L *lua.LState) *lua.LTable { return stateValue(L, keyCnxTable).(*lua.LTable) } +/********** Lua Object for MQTT client **********/ + +const luaMqttClientTypeName = "mqttclient" +const keyClient = 1 +const keySubTable = 2 + +func registerMqttClientType(L *lua.LState) { + mt := L.NewTypeMetatable(luaMqttClientTypeName) + L.SetGlobal(luaMqttClientTypeName, mt) + L.SetField(mt, "new", L.NewFunction(newMqttClient)) + L.SetField(mt, "__gc", L.NewFunction(deleteMqttClient)) + L.SetField(mt, "__call", L.NewFunction(luaPublish)) + L.SetField(mt, "__index", L.NewFunction(luaQuery)) + L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) +} + type mqttConfig struct { Connection string PauseTimeout string @@ -278,109 +373,3 @@ func luaSubscribe(L *lua.LState) int { return 1 } } - -func cleanupClients(L *lua.LState) { - cnxTbl := stateCnxTable(L) - if cnxTbl == nil { - return - } - - L.ForEach(cnxTbl, func(key, value lua.LValue) { - cnx := value.(*lua.LTable) - client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) - client.Disconnect(nil) - }) -} - -func Run(agent MqttAgent, main_script string) { - fromMqtt := make(chan MqttMessage) - - L := lua.NewState() - defer L.Close() - - agent.Setup(L) - defer agent.Teardown(L) - - hostname, err := os.Hostname() - if err != nil { - hostname = "<unknown>" - } - - registerMqttClientType(L) - registerState(L, fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()), &fromMqtt) - defer cleanupClients(L) - - if err := L.DoFile(main_script); err != nil { - panic(err) - } - - for { - msg, ok := <-fromMqtt - - if !ok { - break - } - - agent.Log(L, &msg) - - cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) - subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) - L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, &msg, cnx, key, value) }) - - if key, _ := subTbl.Next(lua.LNil); key == lua.LNil { - client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) - client.Disconnect(nil) - L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) - } - - if stateCnxTable(L).Len() == 0 { - break - } - } -} - -func matchSliced(actual, filter []string) bool { - if len(filter) == 0 { - return len(actual) == 0 - } - - if filter[0] == "#" { - if len(filter) == 1 { - return true - } - - for i := range actual { - if matchSliced(actual[i:], filter[1:]) { - return true - } - } - - return false - } - - if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { - return matchSliced(actual[1:], filter[1:]) - } - - return false -} - -func match(actual, filter string) bool { - return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) -} - -func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) { - skey, ok := key.(lua.LString) - topic := string(msg.Topic) - - if ok && match(topic, string(skey)) { - err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, - cnx, - lua.LString(string(msg.Message)), - lua.LString(topic), - lua.LNumber(msg.Timestamp)) - if err != nil { - panic(err) - } - } -}