natsim

NATS ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/natsim.git
Log | Files | Refs | README | LICENSE

commit 39b66a1bdbc3721736b7d505b9526ad832ae40ae
parent 48e0bacfb50769e3d5a58f0d2e004e0dc39b12c3
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Sat, 21 Jun 2025 17:32:08 +0000

Sqlite logging
Diffstat:
Mgo.mod | 9+++++++++
Mgo.sum | 21+++++++++++++++++++++
Ainit.sql | 53+++++++++++++++++++++++++++++++++++++++++++++++++++++
Mmain.go | 136++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
4 files changed, 214 insertions(+), 5 deletions(-)

diff --git a/go.mod b/go.mod @@ -5,17 +5,26 @@ go 1.23.0 toolchain go1.23.9 require ( + github.com/glebarez/go-sqlite v1.22.0 github.com/nats-io/nats.go v1.42.0 github.com/pelletier/go-toml/v2 v2.2.4 github.com/thoj/go-ircevent v0.0.0-20210723090443-73e444401d64 ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.5.0 // indirect github.com/klauspost/compress v1.18.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect golang.org/x/crypto v0.37.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect + modernc.org/libc v1.37.6 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.28.0 // indirect ) diff --git a/go.sum b/go.sum @@ -1,5 +1,15 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/glebarez/go-sqlite v1.22.0 h1:uAcMJhaA6r3LHMTFgP0SifzgXg46yJkgxqyuyec+ruQ= +github.com/glebarez/go-sqlite v1.22.0/go.mod h1:PlBIdHe0+aUEFn+r2/uthrWq4FxbzugL0L8Li6yQJbc= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM= github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= @@ -8,6 +18,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/thoj/go-ircevent v0.0.0-20210723090443-73e444401d64 h1:l/T7dYuJEQZOwVOpjIXr1180aM9PZL/d1MnMVIxefX4= github.com/thoj/go-ircevent v0.0.0-20210723090443-73e444401d64/go.mod h1:Q1NAJOuRdQCqN/VIWdnaaEhV8LpeO2rtlBP7/iDJNII= golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= @@ -17,6 +29,7 @@ golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -24,3 +37,11 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +modernc.org/libc v1.37.6 h1:orZH3c5wmhIQFTXF+Nt+eeauyd+ZIt2BX6ARe+kD+aw= +modernc.org/libc v1.37.6/go.mod h1:YAXkAZ8ktnkCKaN9sw/UDeUVkGYJ/YquGO4FTi5nmHE= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= +modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ= +modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0= diff --git a/init.sql b/init.sql @@ -0,0 +1,53 @@ +CREATE TABLE subjects + (id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL); +CREATE UNIQUE INDEX i_subjects ON subjects(name); + +CREATE TABLE received + (timestamp REAL NOT NULL, + subject_id INTEGER NOT NULL, + data TEXT NOT NULL, + FOREIGN KEY (subject_id) REFERENCES subjects (id)); +CREATE INDEX i_rtime ON received(timestamp); +CREATE INDEX i_rsubjectid ON received(subject_id); + +CREATE TABLE sent + (timestamp REAL NOT NULL, + subject_id INTEGER NOT NULL, + data TEXT NOT NULL, + FOREIGN KEY (subject_id) REFERENCES subjects (id)); +CREATE INDEX i_stime ON received(timestamp); +CREATE INDEX i_ssubjectid ON received(subject_id); + +CREATE VIEW received_view (timestamp,subject,data) + AS SELECT datetime(timestamp),subjects.name,data + FROM received LEFT OUTER JOIN subjects + ON subjects.id = subject_id; + +CREATE VIEW sent_view (timestamp,subject,data) + AS SELECT datetime(timestamp),subjects.name,data + FROM sent LEFT OUTER JOIN subjects ON subjects.id = subject_id; + +CREATE TRIGGER insert_received INSTEAD OF INSERT ON received_view +BEGIN + INSERT INTO subjects(name) + SELECT NEW.subject WHERE NOT EXISTS + (SELECT 1 FROM subjects WHERE name = NEW.subject); + INSERT INTO received(timestamp,subject_id,data) + VALUES (NEW.timestamp, + (SELECT id FROM subjects WHERE name = NEW.subject), + NEW.data); +END; + +CREATE TRIGGER insert_sent INSTEAD OF INSERT ON sent_view +BEGIN + INSERT INTO subjects(name) + SELECT NEW.subject WHERE NOT EXISTS + (SELECT 1 FROM subjects WHERE name = NEW.subject); + INSERT INTO sent(timestamp,subject_id,data) + VALUES (NEW.timestamp, + (SELECT id FROM subjects WHERE name = NEW.subject), + NEW.data); +END; + +PRAGMA user_version = 1; diff --git a/main.go b/main.go @@ -17,6 +17,8 @@ package main import ( + "database/sql" + "embed" "errors" "fmt" "log" @@ -28,6 +30,7 @@ import ( "sync/atomic" "time" + _ "github.com/glebarez/go-sqlite" "github.com/nats-io/nats.go" "github.com/pelletier/go-toml/v2" "github.com/thoj/go-ircevent" @@ -83,6 +86,12 @@ type IrcConfig struct { AntiFlood antiflood } +type LogConfig struct { + SqlDriver string + SqlConnection string + Filter []FilterElement +} + type NatsConfig struct { Server string NkeySeed string @@ -92,13 +101,17 @@ type NatsConfig struct { type NatsIM struct { Irc IrcConfig + Log LogConfig Nats NatsConfig - irc *irc.Connection - nc *nats.Conn - cmdQueue chan command - ircQueue chan string - dropped atomic.Uint32 + irc *irc.Connection + nc *nats.Conn + db *sql.DB + insertReceived *sql.Stmt + insertSent *sql.Stmt + cmdQueue chan command + ircQueue chan string + dropped atomic.Uint32 } func NewNatsIM(configPath string) (*NatsIM, error) { @@ -144,6 +157,13 @@ func NewNatsIM(configPath string) (*NatsIM, error) { natsim.Irc.ContSuffix = "" } + if natsim.Log.SqlDriver != "" { + if err := natsim.logInit(); err != nil { + natsim.Close() + return nil, err + } + } + natsim.cmdQueue = make(chan command, 10) natsim.ircQueue = make(chan string, 10) @@ -175,6 +195,27 @@ func (natsim *NatsIM) Close() { natsim.nc = nil } + if natsim.insertReceived != nil { + if err := natsim.insertReceived.Close(); err != nil { + log.Println("Close insertReceived:", err) + } + natsim.insertReceived = nil + } + + if natsim.insertSent != nil { + if err := natsim.insertSent.Close(); err != nil { + log.Println("Close insertSent:", err) + } + natsim.insertSent = nil + } + + if natsim.db != nil { + if err := natsim.db.Close(); err != nil { + log.Println("Close log DB:", err) + } + natsim.db = nil + } + close(natsim.cmdQueue) close(natsim.ircQueue) } @@ -241,6 +282,8 @@ func (natsim *NatsIM) ircReceive(e *irc.Event) { } else if subject, data, found := unpackMark(natsim.Irc.Send, msg, false); found { if err := natsim.nc.Publish(subject, []byte(data)); err != nil { natsim.ircSendError("Publish", err) + } else { + natsim.logSent(subject, data) } } } @@ -363,6 +406,8 @@ func (natsim *NatsIM) natsReceive(m *nats.Msg) { return } + natsim.logReceived(m) + var sb strings.Builder sb.WriteString(packMark(natsim.Irc.Show, m.Subject, string(m.Data))) @@ -391,6 +436,87 @@ func (natsim *NatsIM) natsReconnectErr(c *nats.Conn, err error) { natsim.ircSendError("Reconnect", err) } +/**************** Log to Database ****************/ + +//go:embed init.sql +var embeddedSQL embed.FS + +func (natsim *NatsIM) logInit() error { + if natsim.Log.SqlDriver == "" { + return nil + } + + var err error + + natsim.db, err = sql.Open(natsim.Log.SqlDriver, natsim.Log.SqlConnection) + if err != nil { + log.Println("sql.Open:", err) + return err + } + + var version int + if err = natsim.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil { + log.Println("query user_verison", err) + return err + } + + switch version { + case 0: + initSQL, err := embeddedSQL.ReadFile("init.sql") + if err != nil { + log.Println("embedded.ReadFile:", err) + return err + } + + if _, err = natsim.db.Exec(string(initSQL)); err != nil { + log.Println("Init log DB:", err) + return err + } + + case 1: + + default: + log.Println("Unsupported database version:", version) + return errors.New("unsupported database version") + } + + natsim.insertReceived, err = natsim.db.Prepare("INSERT INTO received_view(timestamp,subject,data) VALUES (?,?,?);") + if err != nil { + log.Println("Prepare insertReceived:", err) + return err + } + + natsim.insertSent, err = natsim.db.Prepare("INSERT INTO sent_view(timestamp,subject,data) VALUES (?,?,?);") + if err != nil { + log.Println("Prepare insertSent:", err) + return err + } + + return nil +} + +func (natsim *NatsIM) logReceived(msg *nats.Msg) { + if natsim.db == nil || natsim.insertReceived == nil { + return + } + + t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 + if _, err := natsim.insertReceived.Exec(t, msg.Subject, msg.Data); err != nil { + natsim.ircSendError("insertReceived.Exec", err) + } +} + +func (natsim *NatsIM) logSent(subject, data string) { + if natsim.db == nil || natsim.insertSent == nil { + return + } + + t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 + if _, err := natsim.insertSent.Exec(t, subject, data); err != nil { + natsim.ircSendError("insertSent.Exec", err) + } +} + /**************** Filters ****************/ type FilterElement struct {