mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

commit a1bfb58472f26b7070a12e263325c679dd1babef
parent 63f3f773c9662d8e5dace8678a355284bdef20c7
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Sat, 25 Jan 2025 19:25:35 +0000

Topic filtering is moved from MQTT side to IRC side
Diffstat:
Mmain.go | 74++++++++++++++++++++++++++++++++++++++------------------------------------
1 file changed, 38 insertions(+), 36 deletions(-)

diff --git a/main.go b/main.go @@ -135,7 +135,7 @@ func main() { log.Fatal("irc.Connect:", err) } go subscribeAll(m, ircQueue) - go mqtt2irc(m, l, createTopicFilter(&config), ircQueue, &config) + go mqttReader(m, l, ircQueue, &config) go ircSender(&config.Irc, i, ircQueue) i.Loop() } @@ -146,7 +146,7 @@ func dup(src []byte) []byte { return res } -func mqtt2irc(m *mqtt.Client, l *mqttLogger, f mqttTopicFilter, c chan<- Msg, config *Config) { +func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) { var big *mqtt.BigMessage for { @@ -155,27 +155,23 @@ func mqtt2irc(m *mqtt.Client, l *mqttLogger, f mqttTopicFilter, c chan<- Msg, co case err == nil: msg := Msg{Topic: dup(topic), Message: dup(message)} logReceived(l, msg.Message, msg.Topic) - if !isFiltered(&f, topic) { - c <- msg - } + c <- msg case errors.As(err, &big): msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")} logReceived(l, msg.Message, msg.Topic) - if !isFiltered(&f, topic) { - c <- msg - } + c <- msg case errors.Is(err, mqtt.ErrClosed): - log.Println("mqtt2irc finishing:", err) + log.Println("mqttReader finishing:", err) return case mqtt.IsConnectionRefused(err): - c <- errMsg("mqtt2irc2", err) + c <- errMsg("mqttReader2", err) time.Sleep(5 * time.Minute) default: - c <- errMsg("mqtt2irc", err) + c <- errMsg("mqttReader", err) time.Sleep(2 * time.Second) } } @@ -183,31 +179,37 @@ func mqtt2irc(m *mqtt.Client, l *mqttLogger, f mqttTopicFilter, c chan<- Msg, co func ircSender(config *IrcConfig, i *irc.Connection, c <-chan Msg) { var buf bytes.Buffer + f := createTopicFilter(config) for { m := <-c + if !isFiltered(&f, m.Topic) { + ircSend(config, i, &m, &buf) + } + } +} - if len(m.Topic)+2+len(m.Message) < config.MaxLine { - i.Privmsgf(config.Channel, "%s: %s", m.Topic, m.Message) - } else { - for s := 0; s < len(m.Message); { - l := len(m.Message) - s - buf.Reset() - buf.Write(m.Topic) - buf.WriteString(": ") - if s > 0 { - buf.WriteString(config.ContPrefix) - } - if buf.Len()+l <= config.MaxLine { - buf.Write(m.Message[s:]) - } else { - l = config.MaxLine - buf.Len() - len(config.ContSuffix) - buf.Write(m.Message[s : s+l]) - buf.WriteString(config.ContSuffix) - } - i.Privmsg(config.Channel, buf.String()) - s += l +func ircSend(config *IrcConfig, i *irc.Connection, m *Msg, buf *bytes.Buffer) { + if len(m.Topic)+2+len(m.Message) < config.MaxLine { + i.Privmsgf(config.Channel, "%s: %s", m.Topic, m.Message) + } else { + for s := 0; s < len(m.Message); { + l := len(m.Message) - s + buf.Reset() + buf.Write(m.Topic) + buf.WriteString(": ") + if s > 0 { + buf.WriteString(config.ContPrefix) + } + if buf.Len()+l <= config.MaxLine { + buf.Write(m.Message[s:]) + } else { + l = config.MaxLine - buf.Len() - len(config.ContSuffix) + buf.Write(m.Message[s : s+l]) + buf.WriteString(config.ContSuffix) } + i.Privmsg(config.Channel, buf.String()) + s += l } } } @@ -375,17 +377,17 @@ type mqttTopicFilter struct { important [][]string } -func createTopicFilter(config *Config) mqttTopicFilter { +func createTopicFilter(config *IrcConfig) mqttTopicFilter { result := mqttTopicFilter{ - ignored: make([][]string, len(config.Irc.Ignored)), - important: make([][]string, len(config.Irc.Important)), + ignored: make([][]string, len(config.Ignored)), + important: make([][]string, len(config.Important)), } - for i, s := range config.Irc.Ignored { + for i, s := range config.Ignored { result.ignored[i] = strings.Split(s, "/") } - for i, s := range config.Irc.Important { + for i, s := range config.Important { result.important[i] = strings.Split(s, "/") }