mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

main.go (7142B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package main
     18 
     19 import (
     20 	"database/sql"
     21 	"log"
     22 	"os"
     23 	"time"
     24 
     25 	_ "github.com/glebarez/go-sqlite"
     26 	luajson "github.com/layeh/gopher-json"
     27 	"github.com/yuin/gopher-lua"
     28 	"instinctive.eu/go/mqttagent"
     29 )
     30 
     31 type fullMqttAgent struct {
     32 	loggers    map[string]*sqlogger
     33 	oldLoggers map[string]*sqlogger
     34 }
     35 
     36 func (agent *fullMqttAgent) Setup(L *lua.LState) {
     37 	luajson.Preload(L)
     38 	agent.loggers = make(map[string]*sqlogger)
     39 
     40 	mt := L.NewTypeMetatable("sqlogger")
     41 	L.SetGlobal("sqlogger", mt)
     42 	L.SetField(mt, "new", L.NewFunction(func(L *lua.LState) int {
     43 		return luaSqloggerNew(L, agent)
     44 	}))
     45 	L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), luaSqloggerMethods))
     46 }
     47 
     48 func (agent *fullMqttAgent) Teardown(L *lua.LState) {
     49 	if agent.oldLoggers != nil {
     50 		panic("Unexpected state")
     51 	}
     52 	for _, logger := range agent.loggers {
     53 		logger.Close()
     54 	}
     55 	agent.loggers = nil
     56 }
     57 
     58 func (agent *fullMqttAgent) ReloadBegin(oldL, newL *lua.LState) {
     59 	if agent.oldLoggers != nil {
     60 		panic("Unexpected state")
     61 	}
     62 	agent.oldLoggers = agent.loggers
     63 	agent.Setup(newL)
     64 }
     65 
     66 func (agent *fullMqttAgent) ReloadAbort(oldL, newL *lua.LState) {
     67 	for key, logger := range agent.loggers {
     68 		if _, found := agent.oldLoggers[key]; !found {
     69 			logger.Close()
     70 		}
     71 	}
     72 	agent.loggers = agent.oldLoggers
     73 	agent.oldLoggers = nil
     74 }
     75 
     76 func (agent *fullMqttAgent) ReloadEnd(oldL, newL *lua.LState) {
     77 	for key, logger := range agent.oldLoggers {
     78 		if _, found := agent.loggers[key]; !found {
     79 			logger.Close()
     80 		}
     81 	}
     82 	agent.oldLoggers = nil
     83 }
     84 
     85 type sqlogger struct {
     86 	db             *sql.DB
     87 	insertReceived *sql.Stmt
     88 	insertSent     *sql.Stmt
     89 }
     90 
     91 func (logger *sqlogger) Close() {
     92 	if logger == nil {
     93 		return
     94 	}
     95 
     96 	if logger.insertReceived != nil {
     97 		logger.insertReceived.Close()
     98 	}
     99 
    100 	if logger.insertSent != nil {
    101 		logger.insertSent.Close()
    102 	}
    103 
    104 	if logger.db != nil {
    105 		logger.db.Close()
    106 	}
    107 }
    108 
    109 func connect(connectionString string) (*sqlogger, error) {
    110 	db, err := sql.Open("sqlite", connectionString)
    111 	if err != nil {
    112 		return nil, err
    113 	}
    114 
    115 	for _, cmd := range []string{
    116 		"CREATE TABLE IF NOT EXISTS topics" +
    117 			"(id INTEGER PRIMARY KEY AUTOINCREMENT," +
    118 			" name TEXT NOT NULL);",
    119 		"CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);",
    120 		"CREATE TABLE IF NOT EXISTS received" +
    121 			"(timestamp REAL NOT NULL," +
    122 			" topic_id INTEGER NOT NULL," +
    123 			" message TEXT NOT NULL," +
    124 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    125 		"CREATE TABLE IF NOT EXISTS sent" +
    126 			"(timestamp REAL NOT NULL," +
    127 			" topic_id INTEGER NOT NULL," +
    128 			" message TEXT NOT NULL," +
    129 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    130 		"CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);",
    131 		"CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);",
    132 		"CREATE INDEX IF NOT EXISTS i_stime ON received(timestamp);",
    133 		"CREATE INDEX IF NOT EXISTS i_stopicid ON received(topic_id);",
    134 		"CREATE VIEW IF NOT EXISTS receivedf" +
    135 			"(timestamp,topic,message)" +
    136 			" AS SELECT datetime(timestamp),topics.name,message" +
    137 			" FROM received LEFT OUTER JOIN topics" +
    138 			" ON topics.id = topic_id;",
    139 		"CREATE VIEW IF NOT EXISTS sentf" +
    140 			"(timestamp,topic,message)" +
    141 			" AS SELECT datetime(timestamp),topics.name,message" +
    142 			" FROM sent LEFT OUTER JOIN topics" +
    143 			" ON topics.id = topic_id;",
    144 		"CREATE TRIGGER IF NOT EXISTS insert_received" +
    145 			" INSTEAD OF INSERT ON receivedf BEGIN" +
    146 			" INSERT INTO topics(name)" +
    147 			" SELECT NEW.topic WHERE NOT EXISTS" +
    148 			" (SELECT 1 FROM topics WHERE name = NEW.topic);" +
    149 			" INSERT INTO received(timestamp,topic_id,message)" +
    150 			" VALUES (NEW.timestamp," +
    151 			" (SELECT id FROM topics WHERE name = NEW.topic)," +
    152 			" NEW.message); END;",
    153 		"CREATE TRIGGER IF NOT EXISTS insert_sent" +
    154 			" INSTEAD OF INSERT ON sentf BEGIN" +
    155 			" INSERT INTO topics(name)" +
    156 			" SELECT NEW.topic WHERE NOT EXISTS" +
    157 			" (SELECT 1 FROM topics WHERE name = NEW.topic);" +
    158 			" INSERT INTO sent(timestamp,topic_id,message)" +
    159 			" VALUES (NEW.timestamp," +
    160 			" (SELECT id FROM topics WHERE name = NEW.topic)," +
    161 			" NEW.message); END;",
    162 	} {
    163 		if _, err = db.Exec(cmd); err != nil {
    164 			db.Close()
    165 			return nil, err
    166 		}
    167 	}
    168 
    169 	s1, err := db.Prepare("INSERT INTO receivedf(timestamp,topic,message)" +
    170 		" VALUES (?,?,?);")
    171 	if err != nil {
    172 		db.Close()
    173 		return nil, err
    174 	}
    175 
    176 	s2, err := db.Prepare("INSERT INTO sentf(timestamp,topic,message)" +
    177 		" VALUES (?,?,?);")
    178 	if err != nil {
    179 		s1.Close()
    180 		db.Close()
    181 		return nil, err
    182 	}
    183 
    184 	return &sqlogger{db: db, insertReceived: s1, insertSent: s2}, nil
    185 }
    186 
    187 func checkSqlogger(L *lua.LState, index int) *sqlogger {
    188 	ud := L.CheckUserData(index)
    189 	if v, ok := ud.Value.(*sqlogger); ok {
    190 		return v
    191 	}
    192 	L.ArgError(index, "sqlogger expected")
    193 	return nil
    194 }
    195 
    196 func luaSqloggerNew(L *lua.LState, agent *fullMqttAgent) int {
    197 	arg := L.CheckString(1)
    198 	if logger, found := agent.loggers[arg]; found {
    199 		ud := L.NewUserData()
    200 		ud.Value = logger
    201 		L.SetMetatable(ud, L.GetTypeMetatable("sqlogger"))
    202 		L.Push(ud)
    203 		return 1
    204 	} else if logger, found := agent.oldLoggers[arg]; found {
    205 		agent.loggers[arg] = logger
    206 		ud := L.NewUserData()
    207 		ud.Value = logger
    208 		L.SetMetatable(ud, L.GetTypeMetatable("sqlogger"))
    209 		L.Push(ud)
    210 		return 1
    211 	} else if logger, err := connect(arg); err != nil {
    212 		log.Println(err)
    213 		L.Push(lua.LNil)
    214 		L.Push(lua.LString(err.Error()))
    215 		return 2
    216 	} else {
    217 		agent.loggers[arg] = logger
    218 		ud := L.NewUserData()
    219 		ud.Value = logger
    220 		L.SetMetatable(ud, L.GetTypeMetatable("sqlogger"))
    221 		L.Push(ud)
    222 		return 1
    223 	}
    224 }
    225 
    226 func luaSqloggerInsert(L *lua.LState, stmt *sql.Stmt) int {
    227 	message := L.CheckString(2)
    228 	topic := L.CheckString(3)
    229 	timestamp := L.OptNumber(4, lua.LNumber(time.Now().UnixMicro())*1.0e-6)
    230 	julian := (float64(timestamp) / 86400.0) + 2440587.5
    231 
    232 	if _, err := stmt.Exec(julian, topic, message); err != nil {
    233 		log.Println(err)
    234 	}
    235 
    236 	return 0
    237 }
    238 
    239 func luaSqloggerReceived(L *lua.LState) int {
    240 	logger := checkSqlogger(L, 1)
    241 	return luaSqloggerInsert(L, logger.insertReceived)
    242 }
    243 
    244 func luaSqloggerSent(L *lua.LState) int {
    245 	logger := checkSqlogger(L, 1)
    246 	return luaSqloggerInsert(L, logger.insertSent)
    247 }
    248 
    249 var luaSqloggerMethods = map[string]lua.LGFunction{
    250 	"received": luaSqloggerReceived,
    251 	"sent":     luaSqloggerSent,
    252 }
    253 
    254 func main() {
    255 	var agent fullMqttAgent
    256 
    257 	main_script := "mqttagent.lua"
    258 	if len(os.Args) > 1 {
    259 		main_script = os.Args[1]
    260 	}
    261 
    262 	mqttagent.Run(&agent, main_script, 10)
    263 
    264 	os.Exit(0)
    265 }