mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs

commit ffc9c24a0843e7929b461332c30f08d9e157c26a
parent 7d28c3c245dd95dcf41fd18cb18823c9cb5cd58b
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Fri,  3 Jan 2025 16:05:05 +0000

First draft of the project
Diffstat:
Acmd/mqttagent-full/main.go | 177+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acmd/mqttagent-lite/main.go | 43+++++++++++++++++++++++++++++++++++++++++++
Ago.mod | 22++++++++++++++++++++++
Amqttagent.go | 340+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 582 insertions(+), 0 deletions(-)

diff --git a/cmd/mqttagent-full/main.go b/cmd/mqttagent-full/main.go @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2025, Natacha Porté + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +package main + +import ( + "database/sql" + "log" + "os" + + "instinctive.eu/go/mqttagent" + _ "github.com/glebarez/go-sqlite" + luajson "github.com/layeh/gopher-json" + "github.com/yuin/gopher-lua" +) + +type fullMqttAgent struct { + db *sql.DB + insertTopic *sql.Stmt + insertReceived *sql.Stmt +} + +func (agent *fullMqttAgent) Setup(L *lua.LState) { + luajson.Preload(L) + + L.SetGlobal("sqlog", L.NewFunction(func(L *lua.LState) int { + arg := L.CheckString(1) + if err := agent.connect(arg); err != nil { + log.Println(err) + L.Push(lua.LNil) + L.Push(lua.LString(err.Error())) + return 2 + } else { + L.Push(lua.LTrue) + return 1 + } + })) +} + +func (agent *fullMqttAgent) Log(L *lua.LState, msg *mqttagent.MqttMessage) { + if agent.insertTopic == nil || agent.insertReceived == nil { + return + } + + if _, err := agent.insertTopic.Exec(msg.Topic); err != nil { + log.Println(err) + return + } + + if _, err := agent.insertReceived.Exec((msg.Timestamp/86400.0)+2440587.5, msg.Topic, msg.Message); err != nil { + log.Println(err) + return + } +} + +func (agent *fullMqttAgent) Teardown(L *lua.LState) { + if agent.insertReceived != nil { + agent.insertReceived.Close() + } + + if agent.insertTopic != nil { + agent.insertTopic.Close() + } + + if agent.db != nil { + agent.db.Close() + } +} + +func (agent *fullMqttAgent) connect(connectionString string) error { + if agent.insertReceived != nil { + agent.insertReceived.Close() + agent.insertReceived = nil + } + + if agent.insertTopic != nil { + agent.insertTopic.Close() + agent.insertTopic = nil + } + + if agent.db != nil { + agent.db.Close() + agent.db = nil + } + + db, err := sql.Open("sqlite", connectionString) + if err != nil { + return err + } + + _, err = db.Exec(` +CREATE TABLE IF NOT EXISTS + topics(id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL); +`) + if err != nil { + db.Close() + return err + } + + _, err = db.Exec("CREATE INDEX IF NOT EXISTS i_topics ON topics(name);") + if err != nil { + db.Close() + return err + } + + _, err = db.Exec(` +CREATE TABLE IF NOT EXISTS + received(timestamp INTEGER NOT NULL DEFAULT CURRENT_TIMESTAMP, + topic_id INTEGER NOT NULL, + message TEXT NOT NULL, + FOREIGN KEY (topic_id) REFERENCES topics (id)); +`) + if err != nil { + db.Close() + return err + } + + _, err = db.Exec("CREATE INDEX IF NOT EXISTS i_time ON received(timestamp);") + if err != nil { + db.Close() + return err + } + + _, err = db.Exec("CREATE INDEX IF NOT EXISTS i_topicid ON received(topic_id);") + if err != nil { + db.Close() + return err + } + + s1, err := db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") + if err != nil { + db.Close() + return err + } + + s2, err := db.Prepare(` +INSERT INTO received (timestamp, topic_id, message) +VALUES (?, (SELECT id FROM topics WHERE name = ?), ?); +`) + if err != nil { + s1.Close() + db.Close() + return err + } + + agent.db = db + agent.insertTopic = s1 + agent.insertReceived = s2 + return nil +} + +func main() { + var agent fullMqttAgent + + main_script := "mqttagent.lua" + if len(os.Args) > 1 { + main_script = os.Args[1] + } + + mqttagent.Run(&agent, main_script) + + os.Exit(0) +} diff --git a/cmd/mqttagent-lite/main.go b/cmd/mqttagent-lite/main.go @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2025, Natacha Porté + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +package main + +import ( + "os" + + "instinctive.eu/go/mqttagent" + "github.com/yuin/gopher-lua" +) + +type liteMqttAgent struct{} + +func (agent liteMqttAgent) Setup(L *lua.LState) {} +func (agent liteMqttAgent) Log(L *lua.LState, msg *mqttagent.MqttMessage) {} +func (agent liteMqttAgent) Teardown(L *lua.LState) {} + +func main() { + var agent liteMqttAgent + + main_script := "mqttagent.lua" + if len(os.Args) > 1 { + main_script = os.Args[1] + } + + mqttagent.Run(agent, main_script) + + os.Exit(0) +} diff --git a/go.mod b/go.mod @@ -0,0 +1,22 @@ +module instinctive.eu/go/mqttagent + +go 1.21.9 + +require ( + github.com/glebarez/go-sqlite v1.22.0 + github.com/go-mqtt/mqtt v0.0.0-20210702165922-b33ea0451b0b + github.com/layeh/gopher-json v0.0.0-20201124131017-552bb3c4c3bf + github.com/yuin/gopher-lua v1.1.1 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.5.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/sys v0.15.0 // indirect + modernc.org/libc v1.37.6 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.28.0 // indirect +) diff --git a/mqttagent.go b/mqttagent.go @@ -0,0 +1,340 @@ +/* + * Copyright (c) 2025, Natacha Porté + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +package mqttagent + +import ( + "errors" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/go-mqtt/mqtt" + "github.com/yuin/gopher-lua" +) + +type MqttAgent interface { + Setup(L *lua.LState) + Log(L *lua.LState, msg *MqttMessage) + Teardown(L *lua.LState) +} + +type MqttMessage struct { + Timestamp float64 + ClientId int + Topic []byte + Message []byte +} + +type mqttConnection struct { + client *mqtt.Client + toLua chan MqttMessage +} + +func dup(src []byte) []byte { + res := make([]byte, len(src)) + copy(res, src) + return res +} + +func mqttRead(client *mqtt.Client, toLua chan MqttMessage, id int) error { + var big *mqtt.BigMessage + + for { + message, topic, err := client.ReadSlices() + t := float64(time.Now().UnixMicro()) * 1.0e-6 + + switch { + case err == nil: + toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)} + case errors.As(err, &big): + data, err := big.ReadAll() + if err != nil { + log.Println(err) + } else { + toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data} + } + default: + log.Println(err) + return err + } + } +} + +const luaMqttClientTypeName = "mqttclient" + +/* Per-LState global state */ +const luaStateName = "_mqttagent" +const keyChanToLua = 1 +const keyClientPrefix = 2 +const keyCnxTable = 3 + +/* Per-Connection state */ +const keyClient = 1 +const keySubTable = 2 + +func registerMqttClientType(L *lua.LState) { + mt := L.NewTypeMetatable(luaMqttClientTypeName) + L.SetGlobal(luaMqttClientTypeName, mt) + L.SetField(mt, "new", L.NewFunction(newMqttClient)) + L.SetField(mt, "__gc", L.NewFunction(deleteMqttClient)) + L.SetField(mt, "__call", L.NewFunction(luaPublish)) + L.SetField(mt, "__index", L.NewFunction(luaQuery)) + L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) +} + +func registerState(L *lua.LState, clientPrefix string, toLua *chan MqttMessage) { + ud := L.NewUserData() + ud.Value = toLua + + st := L.NewTable() + L.RawSetInt(st, keyChanToLua, ud) + L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix)) + L.RawSetInt(st, keyCnxTable, L.NewTable()) + L.SetGlobal(luaStateName, st) +} + +func stateUpdateValue(L *lua.LState, key int, newValue lua.LValue) lua.LValue { + st := L.GetGlobal(luaStateName).(*lua.LTable) + oldValue := L.RawGetInt(st, key) + L.RawSetInt(st, key, newValue) + return oldValue +} + +func stateValue(L *lua.LState, key int) lua.LValue { + st := L.GetGlobal(luaStateName) + return L.RawGetInt(st.(*lua.LTable), key) +} + +func stateChanToLua(L *lua.LState) *chan MqttMessage { + ud := stateValue(L, keyChanToLua) + return ud.(*lua.LUserData).Value.(*chan MqttMessage) +} + +func stateClientPrefix(L *lua.LState) string { + return lua.LVAsString(stateValue(L, keyClientPrefix)) +} + +func stateCnxTable(L *lua.LState) *lua.LTable { + return stateValue(L, keyCnxTable).(*lua.LTable) +} + +func newMqttClient(L *lua.LState) int { + server := L.CheckString(1) + user := L.CheckString(2) + pass := L.CheckString(3) + to := time.Duration(L.OptNumber(4, lua.LNumber(1.0))) * time.Second + mt := L.GetTypeMetatable(luaMqttClientTypeName) + id := stateCnxTable(L).Len() + 1 + + idString := fmt.Sprintf("%s-%d", stateClientPrefix(L), id) + + client, err := mqtt.VolatileSession(idString, &mqtt.Config{ + Dialer: mqtt.NewDialer("tcp", server), + PauseTimeout: to, + UserName: user, + Password: []byte(pass), + }) + if err != nil { + log.Println(err) + L.Push(lua.LNil) + L.Push(lua.LString(err.Error())) + return 2 + } + go mqttRead(client, *stateChanToLua(L), id) + + ud := L.NewUserData() + ud.Value = client + + res := L.NewTable() + L.RawSetInt(res, keyClient, ud) + L.RawSetInt(res, keySubTable, L.NewTable()) + L.SetMetatable(res, mt) + L.RawSetInt(stateCnxTable(L), id, res) + L.Push(res) + return 1 +} + +func deleteMqttClient(L *lua.LState) int { + log.Println("deleteMqttClient: TODO") + return 0 +} + +func luaPublish(L *lua.LState) int { + cnx := L.CheckTable(1) + message := L.CheckString(2) + topic := L.CheckString(3) + client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) + + err := client.Publish(nil, []byte(message), topic) + + if err != nil { + L.Push(lua.LNil) + L.Push(lua.LString(err.Error())) + return 2 + } else { + L.Push(lua.LTrue) + return 1 + } +} + +func luaQuery(L *lua.LState) int { + cnx := L.CheckTable(1) + topic := L.CheckString(2) + subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) + L.Push(L.GetField(subTbl, topic)) + return 1 +} + +func luaSubscribe(L *lua.LState) int { + var err error + cnx := L.CheckTable(1) + topic := L.CheckString(2) + callback := L.OptFunction(3, nil) + client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) + + if callback == nil { + err = client.Unsubscribe(nil, topic) + } else { + err = client.Subscribe(nil, topic) + } + + if err != nil { + log.Println(err) + L.Push(lua.LNil) + L.Push(lua.LString(err.Error())) + return 2 + } else { + tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) + + if callback == nil { + L.SetField(tbl, topic, lua.LNil) + } else { + L.SetField(tbl, topic, callback) + } + + L.Push(lua.LTrue) + return 1 + } +} + +func cleanupClients(L *lua.LState) { + cnxTbl := stateCnxTable(L) + if cnxTbl == nil { + return + } + + L.ForEach(cnxTbl, func(key, value lua.LValue) { + cnx := value.(*lua.LTable) + client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) + client.Disconnect(nil) + }) +} + +func Run(agent MqttAgent, main_script string) { + fromMqtt := make(chan MqttMessage) + + L := lua.NewState() + defer L.Close() + + agent.Setup(L) + defer agent.Teardown(L) + + hostname, err := os.Hostname() + if err != nil { + hostname = "<unknown>" + } + + registerMqttClientType(L) + registerState(L, fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()), &fromMqtt) + defer cleanupClients(L) + + if err := L.DoFile(main_script); err != nil { + panic(err) + } + + for { + msg, ok := <-fromMqtt + + if !ok { + break + } + + agent.Log(L, &msg) + + cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) + subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) + L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, &msg, cnx, key, value) }) + + if key, _ := subTbl.Next(lua.LNil); key == lua.LNil { + client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) + client.Disconnect(nil) + L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) + } + + if stateCnxTable(L).Len() == 0 { + break + } + } +} + +func matchSliced(actual, filter []string) bool { + if len(filter) == 0 { + return len(actual) == 0 + } + + if filter[0] == "#" { + if len(filter) == 1 { + return true + } + + for i := range actual { + if matchSliced(actual[i:], filter[1:]) { + return true + } + } + + return false + } + + if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { + return matchSliced(actual[1:], filter[1:]) + } + + return false +} + +func match(actual, filter string) bool { + return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) +} + +func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) { + skey, ok := key.(lua.LString) + topic := string(msg.Topic) + + if ok && match(topic, string(skey)) { + err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, + cnx, + lua.LString(string(msg.Message)), + lua.LString(topic), + lua.LNumber(msg.Timestamp)) + if err != nil { + panic(err) + } + } +}