commit 2f3d07ffafae5fb14c6631601b9bb8f8373cea73
parent 54e0cac233b1ee7f2668b5014bfdc4ff9e5ecdee
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Thu, 24 Apr 2025 19:06:19 +0000
sent method for SQL logger
Diffstat:
1 file changed, 59 insertions(+), 27 deletions(-)
diff --git a/cmd/mqttagent-full/main.go b/cmd/mqttagent-full/main.go
@@ -82,20 +82,10 @@ func (agent *fullMqttAgent) ReloadEnd(oldL, newL *lua.LState) {
agent.oldLoggers = nil
}
-func (logger *sqlogger) Received(msg *mqttagent.MqttMessage) {
- if logger == nil || logger.insert == nil {
- return
- }
-
- if _, err := logger.insert.Exec((msg.Timestamp/86400.0)+2440587.5, msg.Topic, msg.Message); err != nil {
- log.Println(err)
- return
- }
-}
-
type sqlogger struct {
- db *sql.DB
- insert *sql.Stmt
+ db *sql.DB
+ insertReceived *sql.Stmt
+ insertSent *sql.Stmt
}
func (logger *sqlogger) Close() {
@@ -103,8 +93,12 @@ func (logger *sqlogger) Close() {
return
}
- if logger.insert != nil {
- logger.insert.Close()
+ if logger.insertReceived != nil {
+ logger.insertReceived.Close()
+ }
+
+ if logger.insertSent != nil {
+ logger.insertSent.Close()
}
if logger.db != nil {
@@ -128,13 +122,25 @@ func connect(connectionString string) (*sqlogger, error) {
" topic_id INTEGER NOT NULL," +
" message TEXT NOT NULL," +
" FOREIGN KEY (topic_id) REFERENCES topics (id));",
- "CREATE INDEX IF NOT EXISTS i_time ON received(timestamp);",
- "CREATE INDEX IF NOT EXISTS i_topicid ON received(topic_id);",
+ "CREATE TABLE IF NOT EXISTS sent" +
+ "(timestamp REAL NOT NULL," +
+ " topic_id INTEGER NOT NULL," +
+ " message TEXT NOT NULL," +
+ " FOREIGN KEY (topic_id) REFERENCES topics (id));",
+ "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);",
+ "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);",
+ "CREATE INDEX IF NOT EXISTS i_stime ON received(timestamp);",
+ "CREATE INDEX IF NOT EXISTS i_stopicid ON received(topic_id);",
"CREATE VIEW IF NOT EXISTS receivedf" +
"(timestamp,topic,message)" +
" AS SELECT datetime(timestamp),topics.name,message" +
" FROM received LEFT OUTER JOIN topics" +
" ON topics.id = topic_id;",
+ "CREATE VIEW IF NOT EXISTS sentf" +
+ "(timestamp,topic,message)" +
+ " AS SELECT datetime(timestamp),topics.name,message" +
+ " FROM sent LEFT OUTER JOIN topics" +
+ " ON topics.id = topic_id;",
"CREATE TRIGGER IF NOT EXISTS insert_received" +
" INSTEAD OF INSERT ON receivedf BEGIN" +
" INSERT INTO topics(name)" +
@@ -144,6 +150,15 @@ func connect(connectionString string) (*sqlogger, error) {
" VALUES (NEW.timestamp," +
" (SELECT id FROM topics WHERE name = NEW.topic)," +
" NEW.message); END;",
+ "CREATE TRIGGER IF NOT EXISTS insert_sent" +
+ " INSTEAD OF INSERT ON sentf BEGIN" +
+ " INSERT INTO topics(name)" +
+ " SELECT NEW.topic WHERE NOT EXISTS" +
+ " (SELECT 1 FROM topics WHERE name = NEW.topic);" +
+ " INSERT INTO sent(timestamp,topic_id,message)" +
+ " VALUES (NEW.timestamp," +
+ " (SELECT id FROM topics WHERE name = NEW.topic)," +
+ " NEW.message); END;",
} {
if _, err = db.Exec(cmd); err != nil {
db.Close()
@@ -151,14 +166,22 @@ func connect(connectionString string) (*sqlogger, error) {
}
}
- s, err := db.Prepare("INSERT INTO receivedf(timestamp,topic,message)" +
+ s1, err := db.Prepare("INSERT INTO receivedf(timestamp,topic,message)" +
+ " VALUES (?,?,?);")
+ if err != nil {
+ db.Close()
+ return nil, err
+ }
+
+ s2, err := db.Prepare("INSERT INTO sentf(timestamp,topic,message)" +
" VALUES (?,?,?);")
if err != nil {
+ s1.Close()
db.Close()
return nil, err
}
- return &sqlogger{db: db, insert: s}, nil
+ return &sqlogger{db: db, insertReceived: s1, insertSent: s2}, nil
}
func checkSqlogger(L *lua.LState, index int) *sqlogger {
@@ -200,23 +223,32 @@ func luaSqloggerNew(L *lua.LState, agent *fullMqttAgent) int {
}
}
-func luaSqloggerReceived(L *lua.LState) int {
- logger := checkSqlogger(L, 1)
+func luaSqloggerInsert(L *lua.LState, stmt *sql.Stmt) int {
message := L.CheckString(2)
topic := L.CheckString(3)
timestamp := L.OptNumber(4, lua.LNumber(time.Now().UnixMicro())*1.0e-6)
+ julian := (float64(timestamp) / 86400.0) + 2440587.5
+
+ if _, err := stmt.Exec(julian, topic, message); err != nil {
+ log.Println(err)
+ }
- logger.Received(&mqttagent.MqttMessage{
- Timestamp: float64(timestamp),
- ClientId: -1,
- Message: []byte(message),
- Topic: []byte(topic),
- })
return 0
}
+func luaSqloggerReceived(L *lua.LState) int {
+ logger := checkSqlogger(L, 1)
+ return luaSqloggerInsert(L, logger.insertReceived)
+}
+
+func luaSqloggerSent(L *lua.LState) int {
+ logger := checkSqlogger(L, 1)
+ return luaSqloggerInsert(L, logger.insertSent)
+}
+
var luaSqloggerMethods = map[string]lua.LGFunction{
"received": luaSqloggerReceived,
+ "sent": luaSqloggerSent,
}
func main() {