commit 696c2d6a40dc7103dcc9a921a9b63cd4793079de
parent 788fdab0fd2f3084d164dd8de7992a4d7a0bd9e0
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Mon, 7 Jul 2025 17:27:51 +0000
Full NATS objects are published
Diffstat:
M | init.sql | | | 29 | ++++++++++++++++++++++++++--- |
M | main.go | | | 44 | ++++++++++++++++++++++++++++---------------- |
2 files changed, 54 insertions(+), 19 deletions(-)
diff --git a/init.sql b/init.sql
@@ -15,14 +15,15 @@ CREATE INDEX i_rtime ON received(timestamp);
CREATE INDEX i_rsubjectid ON received(subject_id);
CREATE TABLE sent
- (timestamp REAL NOT NULL,
+ (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ timestamp REAL NOT NULL,
subject_id INTEGER NOT NULL,
data TEXT NOT NULL,
reply_subject_id INTEGER,
FOREIGN KEY (subject_id) REFERENCES subjects (id),
FOREIGN KEY (reply_subject_id) REFERENCES subjects (id));
-CREATE INDEX i_stime ON received(timestamp);
-CREATE INDEX i_ssubjectid ON received(subject_id);
+CREATE INDEX i_stime ON sent(timestamp);
+CREATE INDEX i_ssubjectid ON sent(subject_id);
CREATE VIEW received_view (timestamp,subject,reply_subject,data)
AS SELECT datetime(timestamp),subjects.name,rsub.name,data
@@ -115,4 +116,26 @@ BEGIN
(SELECT id FROM headers_view WHERE key = NEW.key AND value = NEW.value));
END;
+CREATE TABLE sent_headers
+ (msg_id INTEGER NOT NULL,
+ header_id INTEGER NOT NULL,
+ FOREIGN KEY (msg_id) REFERENCES sent (id),
+ FOREIGN KEY (header_id) REFERENCES headers (id));
+
+CREATE VIEW sent_headers_view (msg_id,key,value)
+ AS SELECT msg_id,header_keys.name,headers.value
+ FROM sent_headers
+ LEFT OUTER JOIN headers ON headers.id = header_id
+ LEFT OUTER JOIN header_keys ON header_keys.id = headers.key_id;
+
+CREATE TRIGGER insert_sent_header INSTEAD OF INSERT ON sent_headers_view
+BEGIN
+ INSERT INTO headers_view(key,value)
+ SELECT NEW.key,NEW.value WHERE NOT EXISTS
+ (SELECT 1 FROM headers_view WHERE key = NEW.key AND value = NEW.value);
+ INSERT INTO sent_headers(msg_id,header_id)
+ VALUES (NEW.msg_id,
+ (SELECT id FROM headers_view WHERE key = NEW.key AND value = NEW.value));
+END;
+
PRAGMA user_version = 1;
diff --git a/main.go b/main.go
@@ -111,6 +111,7 @@ type NatsIM struct {
insertReceived *sql.Stmt
insertRHeader *sql.Stmt
insertSent *sql.Stmt
+ insertSHeader *sql.Stmt
cmdQueue chan command
ircQueue chan string
dropped atomic.Uint32
@@ -226,6 +227,13 @@ func (natsim *NatsIM) Close() {
natsim.insertSent = nil
}
+ if natsim.insertSHeader != nil {
+ if err := natsim.insertSHeader.Close(); err != nil {
+ log.Println("Close insertSHeader:", err)
+ }
+ natsim.insertSHeader = nil
+ }
+
if natsim.db != nil {
if err := natsim.db.Close(); err != nil {
log.Println("Close log DB:", err)
@@ -465,10 +473,11 @@ func (natsim *NatsIM) ircReceive(e *irc.Event) {
if name, arg, found := unpackMark(natsim.Irc.Cmd, msg, true); found {
natsim.cmdQueue <- command{name: name, arg: arg}
} else if subject, data, found := unpackMark(natsim.Irc.Send, msg, false); found {
- if err := natsim.nc.Publish(subject, []byte(data)); err != nil {
+ nMsg := nats.Msg{Subject: subject, Data: []byte(data)}
+ if err := natsim.nc.PublishMsg(&nMsg); err != nil {
natsim.ircSendError("Publish", err)
} else {
- natsim.logSent(subject, data)
+ natsim.logSent(&nMsg)
}
}
}
@@ -699,16 +708,22 @@ func (natsim *NatsIM) logInit() error {
return err
}
- natsim.insertSent, err = natsim.db.Prepare("INSERT INTO sent_view(timestamp,subject,data) VALUES (?,?,?);")
+ natsim.insertSent, err = natsim.db.Prepare("INSERT INTO sent(timestamp,subject_id,reply_subject_id,data) VALUES (?, (SELECT id FROM subjects WHERE name = ?), (SELECT id FROM subjects WHERE name = ?), ?);")
if err != nil {
log.Println("Prepare insertSent:", err)
return err
}
+ natsim.insertSHeader, err = natsim.db.Prepare("INSERT INTO sent_headers_view(msg_id,key,value) VALUES (?,?,?);")
+ if err != nil {
+ log.Println("Prepare insertSHeader:", err)
+ return err
+ }
+
return nil
}
-func (natsim *NatsIM) logReceived(msg *nats.Msg) {
+func (natsim *NatsIM) logMsg(msg *nats.Msg, insertMsg, insertHeader *sql.Stmt) {
if natsim.db == nil || natsim.insertReceived == nil {
return
}
@@ -728,8 +743,8 @@ func (natsim *NatsIM) logReceived(msg *nats.Msg) {
}
t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
- if r, err := natsim.insertReceived.Exec(t, msg.Subject, reply, msg.Data); err != nil {
- natsim.ircSendError("insertReceived.Exec", err)
+ if r, err := insertMsg.Exec(t, msg.Subject, reply, msg.Data); err != nil {
+ natsim.ircSendError("insertMsg.Exec", err)
} else if id, err := r.LastInsertId(); err != nil {
natsim.ircSendError("LastInsertId", err)
} else if id <= 0 {
@@ -737,23 +752,20 @@ func (natsim *NatsIM) logReceived(msg *nats.Msg) {
} else {
for key, values := range msg.Header {
for _, value := range values {
- if _, err := natsim.insertRHeader.Exec(id, key, value); err != nil {
- natsim.ircSendf("insertRHeader(%q, %q): %s", key, value, err)
+ if _, err := insertHeader.Exec(id, key, value); err != nil {
+ natsim.ircSendf("insertHeader(%q, %q): %s", key, value, err)
}
}
}
}
}
-func (natsim *NatsIM) logSent(subject, data string) {
- if natsim.db == nil || natsim.insertSent == nil {
- return
- }
+func (natsim *NatsIM) logReceived(msg *nats.Msg) {
+ natsim.logMsg(msg, natsim.insertReceived, natsim.insertRHeader)
+}
- t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
- if _, err := natsim.insertSent.Exec(t, subject, data); err != nil {
- natsim.ircSendError("insertSent.Exec", err)
- }
+func (natsim *NatsIM) logSent(msg *nats.Msg) {
+ natsim.logMsg(msg, natsim.insertSent, natsim.insertSHeader)
}
/**************** Filters ****************/