mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

main.go (3865B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package main
     18 
     19 import (
     20 	"database/sql"
     21 	"log"
     22 	"os"
     23 
     24 	"instinctive.eu/go/mqttagent"
     25 	_ "github.com/glebarez/go-sqlite"
     26 	luajson "github.com/layeh/gopher-json"
     27 	"github.com/yuin/gopher-lua"
     28 )
     29 
     30 type fullMqttAgent struct {
     31 	db             *sql.DB
     32 	insertTopic    *sql.Stmt
     33 	insertReceived *sql.Stmt
     34 }
     35 
     36 func (agent *fullMqttAgent) Setup(L *lua.LState) {
     37 	luajson.Preload(L)
     38 
     39 	L.SetGlobal("sqlog", L.NewFunction(func(L *lua.LState) int {
     40 		arg := L.CheckString(1)
     41 		if err := agent.connect(arg); err != nil {
     42 			log.Println(err)
     43 			L.Push(lua.LNil)
     44 			L.Push(lua.LString(err.Error()))
     45 			return 2
     46 		} else {
     47 			L.Push(lua.LTrue)
     48 			return 1
     49 		}
     50 	}))
     51 }
     52 
     53 func (agent *fullMqttAgent) Log(L *lua.LState, msg *mqttagent.MqttMessage) {
     54 	if agent.insertTopic == nil || agent.insertReceived == nil {
     55 		return
     56 	}
     57 
     58 	if _, err := agent.insertTopic.Exec(msg.Topic); err != nil {
     59 		log.Println(err)
     60 		return
     61 	}
     62 
     63 	if _, err := agent.insertReceived.Exec((msg.Timestamp/86400.0)+2440587.5, msg.Topic, msg.Message); err != nil {
     64 		log.Println(err)
     65 		return
     66 	}
     67 }
     68 
     69 func (agent *fullMqttAgent) Teardown(L *lua.LState) {
     70 	if agent.insertReceived != nil {
     71 		agent.insertReceived.Close()
     72 	}
     73 
     74 	if agent.insertTopic != nil {
     75 		agent.insertTopic.Close()
     76 	}
     77 
     78 	if agent.db != nil {
     79 		agent.db.Close()
     80 	}
     81 }
     82 
     83 func (agent *fullMqttAgent) connect(connectionString string) error {
     84 	if agent.insertReceived != nil {
     85 		agent.insertReceived.Close()
     86 		agent.insertReceived = nil
     87 	}
     88 
     89 	if agent.insertTopic != nil {
     90 		agent.insertTopic.Close()
     91 		agent.insertTopic = nil
     92 	}
     93 
     94 	if agent.db != nil {
     95 		agent.db.Close()
     96 		agent.db = nil
     97 	}
     98 
     99 	db, err := sql.Open("sqlite", connectionString)
    100 	if err != nil {
    101 		return err
    102 	}
    103 
    104 	_, err = db.Exec(`
    105 CREATE TABLE IF NOT EXISTS
    106   topics(id INTEGER PRIMARY KEY AUTOINCREMENT,
    107          name TEXT NOT NULL);
    108 `)
    109 	if err != nil {
    110 		db.Close()
    111 		return err
    112 	}
    113 
    114 	_, err = db.Exec("CREATE INDEX IF NOT EXISTS i_topics ON topics(name);")
    115 	if err != nil {
    116 		db.Close()
    117 		return err
    118 	}
    119 
    120 	_, err = db.Exec(`
    121 CREATE TABLE IF NOT EXISTS
    122   received(timestamp INTEGER NOT NULL DEFAULT CURRENT_TIMESTAMP,
    123            topic_id INTEGER NOT NULL,
    124            message TEXT NOT NULL,
    125            FOREIGN KEY (topic_id) REFERENCES topics (id));
    126 `)
    127 	if err != nil {
    128 		db.Close()
    129 		return err
    130 	}
    131 
    132 	_, err = db.Exec("CREATE INDEX IF NOT EXISTS i_time ON received(timestamp);")
    133 	if err != nil {
    134 		db.Close()
    135 		return err
    136 	}
    137 
    138 	_, err = db.Exec("CREATE INDEX IF NOT EXISTS i_topicid ON received(topic_id);")
    139 	if err != nil {
    140 		db.Close()
    141 		return err
    142 	}
    143 
    144 	s1, err := db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);")
    145 	if err != nil {
    146 		db.Close()
    147 		return err
    148 	}
    149 
    150 	s2, err := db.Prepare(`
    151 INSERT INTO received (timestamp, topic_id, message)
    152 VALUES (?, (SELECT id FROM topics WHERE name = ?), ?);
    153 `)
    154 	if err != nil {
    155 		s1.Close()
    156 		db.Close()
    157 		return err
    158 	}
    159 
    160 	agent.db = db
    161 	agent.insertTopic = s1
    162 	agent.insertReceived = s2
    163 	return nil
    164 }
    165 
    166 func main() {
    167 	var agent fullMqttAgent
    168 
    169 	main_script := "mqttagent.lua"
    170 	if len(os.Args) > 1 {
    171 		main_script = os.Args[1]
    172 	}
    173 
    174 	mqttagent.Run(&agent, main_script)
    175 
    176 	os.Exit(0)
    177 }