mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

commit 21e23eb948b68e18a69089f516c609648428f57d
parent 980a7c8cdf611f8b786d23d816068816725a1110
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Mon, 20 Jan 2025 18:29:52 +0000

Logging into a SQLite database
Diffstat:
Mgo.mod | 10++++++++++
Mgo.sum | 23+++++++++++++++++++++++
Mmain.go | 187++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 213 insertions(+), 7 deletions(-)

diff --git a/go.mod b/go.mod @@ -3,12 +3,22 @@ module instinctive.eu/go/mqttim go 1.21.9 require ( + github.com/glebarez/go-sqlite v1.22.0 github.com/go-mqtt/mqtt v0.0.0-20210702165922-b33ea0451b0b github.com/pelletier/go-toml/v2 v2.2.3 github.com/thoj/go-ircevent v0.0.0-20210723090443-73e444401d64 ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.5.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.3.6 // indirect + modernc.org/libc v1.37.6 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.28.0 // indirect ) diff --git a/go.sum b/go.sum @@ -1,11 +1,23 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/glebarez/go-sqlite v1.22.0 h1:uAcMJhaA6r3LHMTFgP0SifzgXg46yJkgxqyuyec+ruQ= +github.com/glebarez/go-sqlite v1.22.0/go.mod h1:PlBIdHe0+aUEFn+r2/uthrWq4FxbzugL0L8Li6yQJbc= github.com/go-mqtt/mqtt v0.0.0-20210702165922-b33ea0451b0b h1:vm2f0/jmLkfNt4Dmni++I0mi5/2xNB4Ye2/Jj9Wao9o= github.com/go-mqtt/mqtt v0.0.0-20210702165922-b33ea0451b0b/go.mod h1:ayzudw2gSvvoYMzWZAx74WLzqCXQ+g4QoaoFGQye+aE= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/thoj/go-ircevent v0.0.0-20210723090443-73e444401d64 h1:l/T7dYuJEQZOwVOpjIXr1180aM9PZL/d1MnMVIxefX4= @@ -14,9 +26,20 @@ golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GE golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/libc v1.37.6 h1:orZH3c5wmhIQFTXF+Nt+eeauyd+ZIt2BX6ARe+kD+aw= +modernc.org/libc v1.37.6/go.mod h1:YAXkAZ8ktnkCKaN9sw/UDeUVkGYJ/YquGO4FTi5nmHE= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= +modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ= +modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0= diff --git a/main.go b/main.go @@ -2,15 +2,18 @@ package main import ( "bytes" + "database/sql" "errors" "fmt" - "github.com/go-mqtt/mqtt" - "github.com/pelletier/go-toml/v2" - "github.com/thoj/go-ircevent" "log" "os" "strings" "time" + + _ "github.com/glebarez/go-sqlite" + "github.com/go-mqtt/mqtt" + "github.com/pelletier/go-toml/v2" + "github.com/thoj/go-ircevent" ) type IrcConfig struct { @@ -26,6 +29,11 @@ type IrcConfig struct { Verbose bool } +type LogConfig struct { + SqlDriver string + SqlConnection string +} + type MqttConfig struct { Server string Session string @@ -35,6 +43,7 @@ type MqttConfig struct { type Config struct { Irc IrcConfig + Log LogConfig Mqtt MqttConfig } @@ -65,6 +74,7 @@ func main() { var err error var config Config var m *mqtt.Client + var l *mqttLogger ircQueue := make(chan Msg) @@ -73,6 +83,21 @@ func main() { return } + if len(config.Log.SqlDriver) > 0 { + db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection) + if err != nil { + log.Fatal(err) + return + } + + l, err = logInit(db) + if err != nil { + log.Fatal(err) + return + } + log.Println("Logger ready") + } + m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{ Dialer: mqtt.NewDialer("tcp", config.Mqtt.Server), PauseTimeout: 4 * time.Second, @@ -99,6 +124,7 @@ func main() { } topic, payload, found := strings.Cut(msg, config.Irc.CmdMid) if found { + logSent(l, []byte(payload), []byte(topic)) go m.Publish(nil, []byte(payload), topic) } }) @@ -107,7 +133,7 @@ func main() { fmt.Printf("Err %s", err) return } - go mqtt2irc(m, ircQueue, &config) + go mqtt2irc(m, l, ircQueue, &config) go ircSender(&config.Irc, i, ircQueue) i.Loop() } @@ -118,16 +144,20 @@ func dup(src []byte) []byte { return res } -func mqtt2irc(m *mqtt.Client, c chan Msg, config *Config) error { +func mqtt2irc(m *mqtt.Client, l *mqttLogger, c chan Msg, config *Config) error { var big *mqtt.BigMessage for { message, topic, err := m.ReadSlices() switch { case err == nil: - c <- Msg{Topic: dup(topic), Message: dup(message)} + msg := Msg{Topic: dup(topic), Message: dup(message)} + logReceived(l, msg.Message, msg.Topic) + c <- msg case errors.As(err, &big): - c <- Msg{Topic: dup(topic), Message: []byte("<Big Message>")} + msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")} + logReceived(l, msg.Message, msg.Topic) + c <- msg default: log.Print(err) return err @@ -165,3 +195,146 @@ func ircSender(config *IrcConfig, i *irc.Connection, c chan Msg) error { } } } + +/**************** MQTT Logger Into SQL ****************/ + +type mqttLogger struct { + db *sql.DB + getTopic *sql.Stmt + insertReceived *sql.Stmt + insertSent *sql.Stmt + insertTopic *sql.Stmt +} + +func logClose(l *mqttLogger) { + if l.insertTopic != nil { + l.insertTopic.Close() + l.insertTopic = nil + } + if l.insertSent != nil { + l.insertSent.Close() + l.insertSent = nil + } + if l.insertReceived != nil { + l.insertReceived.Close() + l.insertReceived = nil + } + if l.getTopic != nil { + l.getTopic.Close() + l.getTopic = nil + } + if l.db != nil { + l.db.Close() + l.db = nil + } +} + +func logInit(db *sql.DB) (*mqttLogger, error) { + var err error + result := mqttLogger{db: db} + + for _, cmd := range []string{ + "CREATE TABLE IF NOT EXISTS topics" + + "(id INTEGER PRIMARY KEY AUTOINCREMENT," + + " name TEXT NOT NULL);", + "CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);", + "CREATE TABLE IF NOT EXISTS received" + + "(timestamp REAL NOT NULL," + + " topic_id INTEGER NOT NULL," + + " message TEXT NOT NULL," + + " FOREIGN KEY (topic_id) REFERENCES topics (id));", + "CREATE TABLE IF NOT EXISTS sent" + + "(timestamp REAL NOT NULL," + + " topic_id INTEGER NOT NULL," + + " message TEXT NOT NULL," + + " FOREIGN KEY (topic_id) REFERENCES topics (id));", + "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);", + "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);", + "CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);", + "CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);", + } { + if _, err = result.db.Exec(cmd); err != nil { + logClose(&result) + return nil, err + } + } + + result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;") + if err != nil { + logClose(&result) + return nil, err + } + + result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") + if err != nil { + logClose(&result) + return nil, err + } + + result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);") + if err != nil { + logClose(&result) + return nil, err + } + + result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);") + if err != nil { + logClose(&result) + return nil, err + } + + return &result, nil +} + +func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) { + t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 + + id, err := logTopic(l, topic) + if err != nil { + log.Println(err) + logClose(l) + return + } + + _, err = stmt.Exec(t, id, message) + if err != nil { + log.Println(err) + logClose(l) + return + } +} + +func logReceived(l *mqttLogger, message, topic []byte) { + if l == nil || l.db == nil { + return + } + + logMessage(l, l.insertReceived, message, topic) +} + +func logSent(l *mqttLogger, message, topic []byte) { + if l == nil || l.db == nil { + return + } + + logMessage(l, l.insertSent, message, topic) +} + +func logTopic(l *mqttLogger, topic []byte) (int64, error) { + var id int64 + err := l.getTopic.QueryRow(topic).Scan(&id) + + if err == sql.ErrNoRows { + res, err := l.insertTopic.Exec(topic) + + if err != nil { + return 0, err + } + + return res.LastInsertId() + } else if err != nil { + return 0, err + } + + return id, nil +}