mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

commit 0a8bcf3ccfdef779af12c60577021f202fc22809
parent 015b3fd77d812041196e1bcd082c1bbc0b62699b
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Wed, 16 Apr 2025 21:29:47 +0000

SQL logger is extracted from full agent
Diffstat:
Mcmd/mqttagent-full/main.go | 73++++++++++++++++++++++++++++++++++++++-----------------------------------
1 file changed, 38 insertions(+), 35 deletions(-)

diff --git a/cmd/mqttagent-full/main.go b/cmd/mqttagent-full/main.go @@ -21,16 +21,14 @@ import ( "log" "os" - "instinctive.eu/go/mqttagent" _ "github.com/glebarez/go-sqlite" luajson "github.com/layeh/gopher-json" "github.com/yuin/gopher-lua" + "instinctive.eu/go/mqttagent" ) type fullMqttAgent struct { - db *sql.DB - insertTopic *sql.Stmt - insertReceived *sql.Stmt + logger *sqlogger } func (agent *fullMqttAgent) Setup(L *lua.LState) { @@ -38,12 +36,14 @@ func (agent *fullMqttAgent) Setup(L *lua.LState) { L.SetGlobal("sqlog", L.NewFunction(func(L *lua.LState) int { arg := L.CheckString(1) - if err := agent.connect(arg); err != nil { + if logger, err := connect(arg); err != nil { log.Println(err) L.Push(lua.LNil) L.Push(lua.LString(err.Error())) return 2 } else { + agent.logger.Close() + agent.logger = logger L.Push(lua.LTrue) return 1 } @@ -51,54 +51,60 @@ func (agent *fullMqttAgent) Setup(L *lua.LState) { } func (agent *fullMqttAgent) Log(L *lua.LState, msg *mqttagent.MqttMessage) { - if agent.insertTopic == nil || agent.insertReceived == nil { + if agent.logger != nil { + agent.logger.Received(msg) + } +} + +func (logger *sqlogger) Received(msg *mqttagent.MqttMessage) { + if logger.insertTopic == nil || logger.insertReceived == nil { return } - if _, err := agent.insertTopic.Exec(msg.Topic); err != nil { + if _, err := logger.insertTopic.Exec(msg.Topic); err != nil { log.Println(err) return } - if _, err := agent.insertReceived.Exec((msg.Timestamp/86400.0)+2440587.5, msg.Topic, msg.Message); err != nil { + if _, err := logger.insertReceived.Exec((msg.Timestamp/86400.0)+2440587.5, msg.Topic, msg.Message); err != nil { log.Println(err) return } } func (agent *fullMqttAgent) Teardown(L *lua.LState) { - if agent.insertReceived != nil { - agent.insertReceived.Close() - } + agent.logger.Close() + agent.logger = nil +} - if agent.insertTopic != nil { - agent.insertTopic.Close() - } +type sqlogger struct { + db *sql.DB + insertTopic *sql.Stmt + insertReceived *sql.Stmt +} - if agent.db != nil { - agent.db.Close() +func (logger *sqlogger) Close() { + if logger == nil { + return } -} -func (agent *fullMqttAgent) connect(connectionString string) error { - if agent.insertReceived != nil { - agent.insertReceived.Close() - agent.insertReceived = nil + if logger.insertReceived != nil { + logger.insertReceived.Close() } - if agent.insertTopic != nil { - agent.insertTopic.Close() - agent.insertTopic = nil + if logger.insertTopic != nil { + logger.insertTopic.Close() } - if agent.db != nil { - agent.db.Close() - agent.db = nil + if logger.db != nil { + logger.db.Close() } +} +func connect(connectionString string) (*sqlogger, error) { db, err := sql.Open("sqlite", connectionString) if err != nil { - return err + return nil, err } for _, cmd := range []string{ @@ -116,14 +122,14 @@ func (agent *fullMqttAgent) connect(connectionString string) error { } { if _, err = db.Exec(cmd); err != nil { db.Close() - return err + return nil, err } } s1, err := db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") if err != nil { db.Close() - return err + return nil, err } s2, err := db.Prepare(` @@ -133,13 +139,10 @@ VALUES (?, (SELECT id FROM topics WHERE name = ?), ?); if err != nil { s1.Close() db.Close() - return err + return nil, err } - agent.db = db - agent.insertTopic = s1 - agent.insertReceived = s2 - return nil + return &sqlogger{db: db, insertTopic: s1, insertReceived: s2}, nil } func main() {