commit eaa4e79e526229b84ff7accdfc8f076a1601006a
parent 21e23eb948b68e18a69089f516c609648428f57d
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date: Tue, 21 Jan 2025 19:33:38 +0000
Topic filtering
Diffstat:
M | main.go | | | 82 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- |
1 file changed, 78 insertions(+), 4 deletions(-)
diff --git a/main.go b/main.go
@@ -26,6 +26,8 @@ type IrcConfig struct {
MaxLine int
ContSuffix string
ContPrefix string
+ Ignored []string
+ Important []string
Verbose bool
}
@@ -133,7 +135,7 @@ func main() {
fmt.Printf("Err %s", err)
return
}
- go mqtt2irc(m, l, ircQueue, &config)
+ go mqtt2irc(m, l, createTopicFilter(&config), ircQueue, &config)
go ircSender(&config.Irc, i, ircQueue)
i.Loop()
}
@@ -144,7 +146,7 @@ func dup(src []byte) []byte {
return res
}
-func mqtt2irc(m *mqtt.Client, l *mqttLogger, c chan Msg, config *Config) error {
+func mqtt2irc(m *mqtt.Client, l *mqttLogger, f mqttTopicFilter, c chan Msg, config *Config) error {
var big *mqtt.BigMessage
for {
@@ -153,11 +155,15 @@ func mqtt2irc(m *mqtt.Client, l *mqttLogger, c chan Msg, config *Config) error {
case err == nil:
msg := Msg{Topic: dup(topic), Message: dup(message)}
logReceived(l, msg.Message, msg.Topic)
- c <- msg
+ if !isFiltered(&f, topic) {
+ c <- msg
+ }
case errors.As(err, &big):
msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")}
logReceived(l, msg.Message, msg.Topic)
- c <- msg
+ if !isFiltered(&f, topic) {
+ c <- msg
+ }
default:
log.Print(err)
return err
@@ -338,3 +344,71 @@ func logTopic(l *mqttLogger, topic []byte) (int64, error) {
return id, nil
}
+
+/**************** MQTT Topic Filter ****************/
+
+type mqttTopicFilter struct {
+ ignored [][]string
+ important [][]string
+}
+
+func createTopicFilter(config *Config) mqttTopicFilter {
+ result := mqttTopicFilter{
+ ignored: make([][]string, len(config.Irc.Ignored)),
+ important: make([][]string, len(config.Irc.Important)),
+ }
+
+ for i, s := range config.Irc.Ignored {
+ result.ignored[i] = strings.Split(s, "/")
+ }
+
+ for i, s := range config.Irc.Important {
+ result.important[i] = strings.Split(s, "/")
+ }
+
+ return result
+}
+
+func isFiltered(filter *mqttTopicFilter, topic []byte) bool {
+ t := strings.Split(string(topic), "/")
+
+ for _, pattern := range filter.important {
+ if topicMatch(t, pattern) {
+ return false
+ }
+ }
+
+ for _, pattern := range filter.ignored {
+ if topicMatch(t, pattern) {
+ return true
+ }
+ }
+
+ return false
+}
+
+func topicMatch(actual, filter []string) bool {
+ if len(filter) == 0 {
+ return len(actual) == 0
+ }
+
+ if filter[0] == "#" {
+ if len(filter) == 1 {
+ return true
+ }
+
+ for i := range actual {
+ if topicMatch(actual[i:], filter[1:]) {
+ return true
+ }
+ }
+
+ return false
+ }
+
+ if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
+ return topicMatch(actual[1:], filter[1:])
+ }
+
+ return false
+}