main.go (7142B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package main 18 19 import ( 20 "database/sql" 21 "log" 22 "os" 23 "time" 24 25 _ "github.com/glebarez/go-sqlite" 26 luajson "github.com/layeh/gopher-json" 27 "github.com/yuin/gopher-lua" 28 "instinctive.eu/go/mqttagent" 29 ) 30 31 type fullMqttAgent struct { 32 loggers map[string]*sqlogger 33 oldLoggers map[string]*sqlogger 34 } 35 36 func (agent *fullMqttAgent) Setup(L *lua.LState) { 37 luajson.Preload(L) 38 agent.loggers = make(map[string]*sqlogger) 39 40 mt := L.NewTypeMetatable("sqlogger") 41 L.SetGlobal("sqlogger", mt) 42 L.SetField(mt, "new", L.NewFunction(func(L *lua.LState) int { 43 return luaSqloggerNew(L, agent) 44 })) 45 L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), luaSqloggerMethods)) 46 } 47 48 func (agent *fullMqttAgent) Teardown(L *lua.LState) { 49 if agent.oldLoggers != nil { 50 panic("Unexpected state") 51 } 52 for _, logger := range agent.loggers { 53 logger.Close() 54 } 55 agent.loggers = nil 56 } 57 58 func (agent *fullMqttAgent) ReloadBegin(oldL, newL *lua.LState) { 59 if agent.oldLoggers != nil { 60 panic("Unexpected state") 61 } 62 agent.oldLoggers = agent.loggers 63 agent.Setup(newL) 64 } 65 66 func (agent *fullMqttAgent) ReloadAbort(oldL, newL *lua.LState) { 67 for key, logger := range agent.loggers { 68 if _, found := agent.oldLoggers[key]; !found { 69 logger.Close() 70 } 71 } 72 agent.loggers = agent.oldLoggers 73 agent.oldLoggers = nil 74 } 75 76 func (agent *fullMqttAgent) ReloadEnd(oldL, newL *lua.LState) { 77 for key, logger := range agent.oldLoggers { 78 if _, found := agent.loggers[key]; !found { 79 logger.Close() 80 } 81 } 82 agent.oldLoggers = nil 83 } 84 85 type sqlogger struct { 86 db *sql.DB 87 insertReceived *sql.Stmt 88 insertSent *sql.Stmt 89 } 90 91 func (logger *sqlogger) Close() { 92 if logger == nil { 93 return 94 } 95 96 if logger.insertReceived != nil { 97 logger.insertReceived.Close() 98 } 99 100 if logger.insertSent != nil { 101 logger.insertSent.Close() 102 } 103 104 if logger.db != nil { 105 logger.db.Close() 106 } 107 } 108 109 func connect(connectionString string) (*sqlogger, error) { 110 db, err := sql.Open("sqlite", connectionString) 111 if err != nil { 112 return nil, err 113 } 114 115 for _, cmd := range []string{ 116 "CREATE TABLE IF NOT EXISTS topics" + 117 "(id INTEGER PRIMARY KEY AUTOINCREMENT," + 118 " name TEXT NOT NULL);", 119 "CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);", 120 "CREATE TABLE IF NOT EXISTS received" + 121 "(timestamp REAL NOT NULL," + 122 " topic_id INTEGER NOT NULL," + 123 " message TEXT NOT NULL," + 124 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 125 "CREATE TABLE IF NOT EXISTS sent" + 126 "(timestamp REAL NOT NULL," + 127 " topic_id INTEGER NOT NULL," + 128 " message TEXT NOT NULL," + 129 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 130 "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);", 131 "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);", 132 "CREATE INDEX IF NOT EXISTS i_stime ON received(timestamp);", 133 "CREATE INDEX IF NOT EXISTS i_stopicid ON received(topic_id);", 134 "CREATE VIEW IF NOT EXISTS receivedf" + 135 "(timestamp,topic,message)" + 136 " AS SELECT datetime(timestamp),topics.name,message" + 137 " FROM received LEFT OUTER JOIN topics" + 138 " ON topics.id = topic_id;", 139 "CREATE VIEW IF NOT EXISTS sentf" + 140 "(timestamp,topic,message)" + 141 " AS SELECT datetime(timestamp),topics.name,message" + 142 " FROM sent LEFT OUTER JOIN topics" + 143 " ON topics.id = topic_id;", 144 "CREATE TRIGGER IF NOT EXISTS insert_received" + 145 " INSTEAD OF INSERT ON receivedf BEGIN" + 146 " INSERT INTO topics(name)" + 147 " SELECT NEW.topic WHERE NOT EXISTS" + 148 " (SELECT 1 FROM topics WHERE name = NEW.topic);" + 149 " INSERT INTO received(timestamp,topic_id,message)" + 150 " VALUES (NEW.timestamp," + 151 " (SELECT id FROM topics WHERE name = NEW.topic)," + 152 " NEW.message); END;", 153 "CREATE TRIGGER IF NOT EXISTS insert_sent" + 154 " INSTEAD OF INSERT ON sentf BEGIN" + 155 " INSERT INTO topics(name)" + 156 " SELECT NEW.topic WHERE NOT EXISTS" + 157 " (SELECT 1 FROM topics WHERE name = NEW.topic);" + 158 " INSERT INTO sent(timestamp,topic_id,message)" + 159 " VALUES (NEW.timestamp," + 160 " (SELECT id FROM topics WHERE name = NEW.topic)," + 161 " NEW.message); END;", 162 } { 163 if _, err = db.Exec(cmd); err != nil { 164 db.Close() 165 return nil, err 166 } 167 } 168 169 s1, err := db.Prepare("INSERT INTO receivedf(timestamp,topic,message)" + 170 " VALUES (?,?,?);") 171 if err != nil { 172 db.Close() 173 return nil, err 174 } 175 176 s2, err := db.Prepare("INSERT INTO sentf(timestamp,topic,message)" + 177 " VALUES (?,?,?);") 178 if err != nil { 179 s1.Close() 180 db.Close() 181 return nil, err 182 } 183 184 return &sqlogger{db: db, insertReceived: s1, insertSent: s2}, nil 185 } 186 187 func checkSqlogger(L *lua.LState, index int) *sqlogger { 188 ud := L.CheckUserData(index) 189 if v, ok := ud.Value.(*sqlogger); ok { 190 return v 191 } 192 L.ArgError(index, "sqlogger expected") 193 return nil 194 } 195 196 func luaSqloggerNew(L *lua.LState, agent *fullMqttAgent) int { 197 arg := L.CheckString(1) 198 if logger, found := agent.loggers[arg]; found { 199 ud := L.NewUserData() 200 ud.Value = logger 201 L.SetMetatable(ud, L.GetTypeMetatable("sqlogger")) 202 L.Push(ud) 203 return 1 204 } else if logger, found := agent.oldLoggers[arg]; found { 205 agent.loggers[arg] = logger 206 ud := L.NewUserData() 207 ud.Value = logger 208 L.SetMetatable(ud, L.GetTypeMetatable("sqlogger")) 209 L.Push(ud) 210 return 1 211 } else if logger, err := connect(arg); err != nil { 212 log.Println(err) 213 L.Push(lua.LNil) 214 L.Push(lua.LString(err.Error())) 215 return 2 216 } else { 217 agent.loggers[arg] = logger 218 ud := L.NewUserData() 219 ud.Value = logger 220 L.SetMetatable(ud, L.GetTypeMetatable("sqlogger")) 221 L.Push(ud) 222 return 1 223 } 224 } 225 226 func luaSqloggerInsert(L *lua.LState, stmt *sql.Stmt) int { 227 message := L.CheckString(2) 228 topic := L.CheckString(3) 229 timestamp := L.OptNumber(4, lua.LNumber(time.Now().UnixMicro())*1.0e-6) 230 julian := (float64(timestamp) / 86400.0) + 2440587.5 231 232 if _, err := stmt.Exec(julian, topic, message); err != nil { 233 log.Println(err) 234 } 235 236 return 0 237 } 238 239 func luaSqloggerReceived(L *lua.LState) int { 240 logger := checkSqlogger(L, 1) 241 return luaSqloggerInsert(L, logger.insertReceived) 242 } 243 244 func luaSqloggerSent(L *lua.LState) int { 245 logger := checkSqlogger(L, 1) 246 return luaSqloggerInsert(L, logger.insertSent) 247 } 248 249 var luaSqloggerMethods = map[string]lua.LGFunction{ 250 "received": luaSqloggerReceived, 251 "sent": luaSqloggerSent, 252 } 253 254 func main() { 255 var agent fullMqttAgent 256 257 main_script := "mqttagent.lua" 258 if len(os.Args) > 1 { 259 main_script = os.Args[1] 260 } 261 262 mqttagent.Run(&agent, main_script, 10) 263 264 os.Exit(0) 265 }