mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

main.go (14651B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package main
     18 
     19 import (
     20 	"database/sql"
     21 	"errors"
     22 	"fmt"
     23 	"log"
     24 	"os"
     25 	"strings"
     26 	"time"
     27 
     28 	_ "github.com/glebarez/go-sqlite"
     29 	"github.com/go-mqtt/mqtt"
     30 	"github.com/pelletier/go-toml/v2"
     31 	"github.com/thoj/go-ircevent"
     32 )
     33 
     34 type IrcConfig struct {
     35 	Channel    string
     36 	Server     string
     37 	Nick       string
     38 	CmdStart   string
     39 	CmdMid     string
     40 	CmdEnd     string
     41 	SendStart  string
     42 	SendMid    string
     43 	SendEnd    string
     44 	ShowStart  string
     45 	ShowMid    string
     46 	ShowEnd    string
     47 	MaxLine    int
     48 	ContSuffix string
     49 	ContPrefix string
     50 	Ignored    []string
     51 	Important  []string
     52 	Verbose    bool
     53 }
     54 
     55 type LogConfig struct {
     56 	SqlDriver     string
     57 	SqlConnection string
     58 }
     59 
     60 type MqttConfig struct {
     61 	Server    string
     62 	Session   string
     63 	UserName  string
     64 	Password  string
     65 	TLS       bool
     66 	Keepalive int
     67 	Topics    []string
     68 }
     69 
     70 type Config struct {
     71 	Irc  IrcConfig
     72 	Log  LogConfig
     73 	Mqtt MqttConfig
     74 }
     75 
     76 type Msg struct {
     77 	Topic   []byte
     78 	Message []byte
     79 }
     80 
     81 func errMsg(context string, err error) Msg {
     82 	return Msg{
     83 		Topic:   []byte("$mqttim/" + context),
     84 		Message: []byte(err.Error()),
     85 	}
     86 }
     87 
     88 type command struct {
     89 	name string
     90 	arg  string
     91 }
     92 
     93 func readConfig(path string) Config {
     94 	config := Config{
     95 		Irc: IrcConfig{
     96 			Nick:     "mqttim",
     97 			CmdStart: "!",
     98 			CmdMid:   " ",
     99 			SendMid:  ": ",
    100 			ShowMid:  ": ",
    101 		},
    102 		Mqtt: MqttConfig{
    103 			Topics: []string{"#"},
    104 		},
    105 	}
    106 
    107 	f, err := os.Open(path)
    108 	if err != nil {
    109 		log.Fatal(err)
    110 	}
    111 	defer f.Close()
    112 
    113 	d := toml.NewDecoder(f)
    114 	err = d.Decode(&config)
    115 	if err != nil {
    116 		log.Fatal(err)
    117 	}
    118 
    119 	if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine {
    120 		config.Irc.ContPrefix = ""
    121 		config.Irc.ContSuffix = ""
    122 	}
    123 
    124 	return config
    125 }
    126 
    127 func dialer(config Config) mqtt.Dialer {
    128 	if config.Mqtt.TLS {
    129 		return mqtt.NewTLSDialer("tcp", config.Mqtt.Server, nil)
    130 	} else {
    131 		return mqtt.NewDialer("tcp", config.Mqtt.Server)
    132 	}
    133 }
    134 
    135 func main() {
    136 	var err error
    137 	var m *mqtt.Client
    138 	var l *mqttLogger
    139 
    140 	cmdQueue := make(chan command, 10)
    141 	ircQueue := make(chan Msg, 10)
    142 
    143 	config_file := "mqttim.toml"
    144 	if len(os.Args) > 1 {
    145 		config_file = os.Args[1]
    146 	}
    147 	config := readConfig(config_file)
    148 
    149 	if len(config.Log.SqlDriver) > 0 {
    150 		db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection)
    151 		if err != nil {
    152 			log.Fatal("sql.Open:", err)
    153 		}
    154 
    155 		l, err = logInit(db)
    156 		if err != nil {
    157 			log.Fatal("logInit:", err)
    158 		}
    159 		log.Println("Logging into", config.Log.SqlConnection)
    160 	}
    161 
    162 	m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{
    163 		Dialer:       dialer(config),
    164 		PauseTimeout: 4 * time.Second,
    165 		UserName:     config.Mqtt.UserName,
    166 		Password:     []byte(config.Mqtt.Password),
    167 	})
    168 	if err != nil {
    169 		log.Fatal("mqtt.VolatileSession:", err)
    170 	}
    171 
    172 	i := irc.IRC(config.Irc.Nick, "mqttim")
    173 	if config.Irc.Verbose {
    174 		i.VerboseCallbackHandler = true
    175 		i.Debug = true
    176 	}
    177 	i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) })
    178 	i.AddCallback("366", func(e *irc.Event) {})
    179 	i.AddCallback("PRIVMSG", func(e *irc.Event) {
    180 		msg := e.Message()
    181 		if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) {
    182 			msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)]
    183 			name, arg, found := strings.Cut(msg, config.Irc.CmdMid)
    184 			if !found {
    185 				name = msg
    186 				arg = ""
    187 			}
    188 			if name == "send" {
    189 				msg = arg
    190 			} else {
    191 				cmdQueue <- command{name: name, arg: arg}
    192 				return
    193 			}
    194 		}
    195 		if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) {
    196 			return
    197 		}
    198 		msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)]
    199 		topic, payload, found := strings.Cut(msg, config.Irc.SendMid)
    200 		if found {
    201 			logSent(l, []byte(payload), []byte(topic))
    202 			if err := m.Publish(nil, []byte(payload), topic); err != nil {
    203 				ircQueue <- errMsg("Publish", err)
    204 			}
    205 		}
    206 	})
    207 	err = i.Connect(config.Irc.Server)
    208 	if err != nil {
    209 		log.Fatal("irc.Connect:", err)
    210 	}
    211 	go subscribeAll(m, ircQueue, config.Mqtt.Topics)
    212 	go mqttReader(m, l, ircQueue, &config)
    213 	go ircSender(&config.Irc, i, ircQueue, cmdQueue)
    214 	go mqttKeepalive(m, ircQueue, config.Mqtt.Keepalive)
    215 	i.Loop()
    216 }
    217 
    218 func dup(src []byte) []byte {
    219 	res := make([]byte, len(src))
    220 	copy(res, src)
    221 	return res
    222 }
    223 
    224 func mqttKeepalive(m *mqtt.Client, c chan<- Msg, keepalive int) {
    225 	if keepalive <= 0 {
    226 		return
    227 	}
    228 
    229 	period := time.Duration(keepalive) * time.Second
    230 
    231 	for {
    232 		time.Sleep(period)
    233 
    234 		if err := m.Ping(nil); err != nil {
    235 			c <- errMsg("mqttPing", err)
    236 		}
    237 	}
    238 }
    239 
    240 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) {
    241 	var big *mqtt.BigMessage
    242 
    243 	for {
    244 		message, topic, err := m.ReadSlices()
    245 		switch {
    246 		case err == nil:
    247 			msg := Msg{Topic: dup(topic), Message: dup(message)}
    248 			logReceived(l, msg.Message, msg.Topic)
    249 			c <- msg
    250 
    251 		case errors.As(err, &big):
    252 			msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")}
    253 			logReceived(l, msg.Message, msg.Topic)
    254 			c <- msg
    255 
    256 		case errors.Is(err, mqtt.ErrClosed):
    257 			log.Println("mqttReader finishing:", err)
    258 			return
    259 
    260 		case mqtt.IsConnectionRefused(err):
    261 			c <- errMsg("mqttReader2", err)
    262 			time.Sleep(5 * time.Minute)
    263 
    264 		default:
    265 			c <- errMsg("mqttReader", err)
    266 			time.Sleep(2 * time.Second)
    267 		}
    268 	}
    269 }
    270 
    271 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) {
    272 	var buf strings.Builder
    273 	f := createTopicFilter(config)
    274 
    275 	for {
    276 		select {
    277 		case m := <-cm:
    278 			if !isFiltered(&f, m.Topic) {
    279 				str := config.ShowStart +
    280 					string(m.Topic) +
    281 					config.ShowMid +
    282 					string(m.Message) +
    283 					config.ShowEnd
    284 				ircSend(config, i, str, &buf)
    285 			}
    286 		case cmd := <-cc:
    287 			switch cmd.name {
    288 			case "filters":
    289 				ircSendFilters(config, i, &f, &buf)
    290 			case "help":
    291 				ircSend(config, i, "Command line:", &buf)
    292 				ircSendHelp(config, i, &buf, "filters", "")
    293 				ircSendHelp(config, i, &buf, "help", "")
    294 				ircSendHelp(config, i, &buf, "ignore", "<topic>")
    295 				ircSendHelp(config, i, &buf, "important", "<topic>")
    296 				ircSendHelp(config, i, &buf, "quit", "[message]")
    297 				ircSendHelp(config, i, &buf, "unignore", "<topic>")
    298 				ircSendHelp(config, i, &buf, "unimportant", "<topic>")
    299 			case "ignore":
    300 				filterAddIgnored(&f, cmd.arg)
    301 			case "important":
    302 				filterAddImportant(&f, cmd.arg)
    303 			case "quit":
    304 				log.Println("Quit command", cmd.arg)
    305 				i.QuitMessage = cmd.arg
    306 				i.Quit()
    307 			case "unignore":
    308 				filterDelIgnored(&f, cmd.arg)
    309 			case "unimportant":
    310 				filterDelImportant(&f, cmd.arg)
    311 			default:
    312 				ircSend(config, i, "Unknown command: "+cmd.name, &buf)
    313 			}
    314 		}
    315 	}
    316 }
    317 
    318 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) {
    319 	if config.MaxLine <= 0 || len(s) < config.MaxLine {
    320 		i.Privmsg(config.Channel, s)
    321 	} else {
    322 		for offset := 0; offset < len(s); {
    323 			l := len(s) - offset
    324 			buf.Reset()
    325 			if offset > 0 {
    326 				buf.WriteString(config.ContPrefix)
    327 			}
    328 
    329 			if buf.Len()+l <= config.MaxLine {
    330 				buf.WriteString(s[offset:])
    331 			} else {
    332 				l = config.MaxLine - buf.Len() - len(config.ContSuffix)
    333 				buf.WriteString(s[offset : offset+l])
    334 				buf.WriteString(config.ContSuffix)
    335 			}
    336 
    337 			i.Privmsg(config.Channel, buf.String())
    338 			offset += l
    339 		}
    340 	}
    341 }
    342 
    343 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) {
    344 	if len(topics) >= 2 {
    345 		ircSend(config, i, name+" = [", buf)
    346 		for index, topic := range topics {
    347 			suffix := ","
    348 			if index == len(topics)-1 {
    349 				suffix = " ]"
    350 			}
    351 			ircSend(config, i, fmt.Sprintf("  %q%s", strings.Join(topic, "/"), suffix), buf)
    352 		}
    353 	} else if len(topics) == 1 {
    354 		ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf)
    355 	} else {
    356 		ircSend(config, i, name+" = []", buf)
    357 	}
    358 }
    359 
    360 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) {
    361 	ircSendTopicList(config, i, "important", f.important, buf)
    362 	ircSendTopicList(config, i, "ignored", f.ignored, buf)
    363 }
    364 
    365 func ircSendHelp(config *IrcConfig, i *irc.Connection, buf *strings.Builder, cmd, arg string) {
    366 	if arg == "" {
    367 		ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdEnd), buf)
    368 	} else {
    369 		ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdMid+arg+config.CmdEnd), buf)
    370 	}
    371 }
    372 
    373 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg, topics []string) {
    374 	for _, topic := range topics {
    375 		for {
    376 			err := m.Subscribe(nil, topic)
    377 
    378 			if err != nil {
    379 				ircQueue <- errMsg("Subscribe", err)
    380 				time.Sleep(1 * time.Minute)
    381 			} else {
    382 				break
    383 			}
    384 		}
    385 	}
    386 }
    387 
    388 /**************** MQTT Logger Into SQL ****************/
    389 
    390 type mqttLogger struct {
    391 	db             *sql.DB
    392 	getTopic       *sql.Stmt
    393 	insertReceived *sql.Stmt
    394 	insertSent     *sql.Stmt
    395 	insertTopic    *sql.Stmt
    396 }
    397 
    398 func logClose(l *mqttLogger) {
    399 	if l.insertTopic != nil {
    400 		l.insertTopic.Close()
    401 		l.insertTopic = nil
    402 	}
    403 	if l.insertSent != nil {
    404 		l.insertSent.Close()
    405 		l.insertSent = nil
    406 	}
    407 	if l.insertReceived != nil {
    408 		l.insertReceived.Close()
    409 		l.insertReceived = nil
    410 	}
    411 	if l.getTopic != nil {
    412 		l.getTopic.Close()
    413 		l.getTopic = nil
    414 	}
    415 	if l.db != nil {
    416 		l.db.Close()
    417 		l.db = nil
    418 	}
    419 }
    420 
    421 func logInit(db *sql.DB) (*mqttLogger, error) {
    422 	var err error
    423 	result := mqttLogger{db: db}
    424 
    425 	for _, cmd := range []string{
    426 		"CREATE TABLE IF NOT EXISTS topics" +
    427 			"(id INTEGER PRIMARY KEY AUTOINCREMENT," +
    428 			" name TEXT NOT NULL);",
    429 		"CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);",
    430 		"CREATE TABLE IF NOT EXISTS received" +
    431 			"(timestamp REAL NOT NULL," +
    432 			" topic_id INTEGER NOT NULL," +
    433 			" message TEXT NOT NULL," +
    434 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    435 		"CREATE TABLE IF NOT EXISTS sent" +
    436 			"(timestamp REAL NOT NULL," +
    437 			" topic_id INTEGER NOT NULL," +
    438 			" message TEXT NOT NULL," +
    439 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    440 		"CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);",
    441 		"CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);",
    442 		"CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);",
    443 		"CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);",
    444 	} {
    445 		if _, err = result.db.Exec(cmd); err != nil {
    446 			logClose(&result)
    447 			return nil, err
    448 		}
    449 	}
    450 
    451 	result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;")
    452 	if err != nil {
    453 		logClose(&result)
    454 		return nil, err
    455 	}
    456 
    457 	result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);")
    458 	if err != nil {
    459 		logClose(&result)
    460 		return nil, err
    461 	}
    462 
    463 	result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);")
    464 	if err != nil {
    465 		logClose(&result)
    466 		return nil, err
    467 	}
    468 
    469 	result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);")
    470 	if err != nil {
    471 		logClose(&result)
    472 		return nil, err
    473 	}
    474 
    475 	return &result, nil
    476 }
    477 
    478 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) {
    479 	t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
    480 
    481 	id, err := logTopic(l, topic)
    482 	if err != nil {
    483 		log.Println("logTopic:", err)
    484 		logClose(l)
    485 		return
    486 	}
    487 
    488 	_, err = stmt.Exec(t, id, message)
    489 	if err != nil {
    490 		log.Println("logMessage:", err)
    491 		logClose(l)
    492 		return
    493 	}
    494 }
    495 
    496 func logReceived(l *mqttLogger, message, topic []byte) {
    497 	if l == nil || l.db == nil {
    498 		return
    499 	}
    500 
    501 	logMessage(l, l.insertReceived, message, topic)
    502 }
    503 
    504 func logSent(l *mqttLogger, message, topic []byte) {
    505 	if l == nil || l.db == nil {
    506 		return
    507 	}
    508 
    509 	logMessage(l, l.insertSent, message, topic)
    510 }
    511 
    512 func logTopic(l *mqttLogger, topic []byte) (int64, error) {
    513 	var id int64
    514 	err := l.getTopic.QueryRow(topic).Scan(&id)
    515 
    516 	if err == sql.ErrNoRows {
    517 		res, err := l.insertTopic.Exec(topic)
    518 
    519 		if err != nil {
    520 			return 0, err
    521 		}
    522 
    523 		return res.LastInsertId()
    524 	} else if err != nil {
    525 		return 0, err
    526 	}
    527 
    528 	return id, nil
    529 }
    530 
    531 /**************** MQTT Topic Filter ****************/
    532 
    533 type mqttTopicFilter struct {
    534 	ignored   [][]string
    535 	important [][]string
    536 }
    537 
    538 func addPattern(topics [][]string, newPattern string) [][]string {
    539 	return append(topics, strings.Split(newPattern, "/"))
    540 }
    541 
    542 func delPattern(topics [][]string, toRemove string) [][]string {
    543 	var result [][]string
    544 
    545 	for _, pat := range topics {
    546 		if strings.Join(pat, "/") != toRemove {
    547 			result = append(result, pat)
    548 		}
    549 	}
    550 
    551 	return result
    552 }
    553 
    554 func createTopicFilter(config *IrcConfig) mqttTopicFilter {
    555 	result := mqttTopicFilter{
    556 		ignored:   make([][]string, len(config.Ignored)),
    557 		important: make([][]string, len(config.Important)),
    558 	}
    559 
    560 	for i, s := range config.Ignored {
    561 		result.ignored[i] = strings.Split(s, "/")
    562 	}
    563 
    564 	for i, s := range config.Important {
    565 		result.important[i] = strings.Split(s, "/")
    566 	}
    567 
    568 	return result
    569 }
    570 
    571 func filterAddIgnored(filter *mqttTopicFilter, pattern string) {
    572 	filter.ignored = addPattern(filter.ignored, pattern)
    573 }
    574 
    575 func filterAddImportant(filter *mqttTopicFilter, pattern string) {
    576 	filter.important = addPattern(filter.important, pattern)
    577 }
    578 
    579 func filterDelIgnored(filter *mqttTopicFilter, pattern string) {
    580 	filter.ignored = delPattern(filter.ignored, pattern)
    581 }
    582 
    583 func filterDelImportant(filter *mqttTopicFilter, pattern string) {
    584 	filter.important = delPattern(filter.important, pattern)
    585 }
    586 
    587 func isFiltered(filter *mqttTopicFilter, topic []byte) bool {
    588 	t := strings.Split(string(topic), "/")
    589 
    590 	for _, pattern := range filter.important {
    591 		if topicMatch(t, pattern) {
    592 			return false
    593 		}
    594 	}
    595 
    596 	for _, pattern := range filter.ignored {
    597 		if topicMatch(t, pattern) {
    598 			return true
    599 		}
    600 	}
    601 
    602 	return false
    603 }
    604 
    605 func topicMatch(actual, filter []string) bool {
    606 	if len(filter) == 0 {
    607 		return len(actual) == 0
    608 	}
    609 
    610 	if filter[0] == "#" {
    611 		if len(filter) == 1 {
    612 			return true
    613 		}
    614 
    615 		for i := range actual {
    616 			if topicMatch(actual[i:], filter[1:]) {
    617 				return true
    618 			}
    619 		}
    620 
    621 		return false
    622 	}
    623 
    624 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    625 		return topicMatch(actual[1:], filter[1:])
    626 	}
    627 
    628 	return false
    629 }