mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

main.go (13804B)


      1 package main
      2 
      3 import (
      4 	"database/sql"
      5 	"errors"
      6 	"fmt"
      7 	"log"
      8 	"os"
      9 	"strings"
     10 	"time"
     11 
     12 	_ "github.com/glebarez/go-sqlite"
     13 	"github.com/go-mqtt/mqtt"
     14 	"github.com/pelletier/go-toml/v2"
     15 	"github.com/thoj/go-ircevent"
     16 )
     17 
     18 type IrcConfig struct {
     19 	Channel    string
     20 	Server     string
     21 	Nick       string
     22 	CmdStart   string
     23 	CmdMid     string
     24 	CmdEnd     string
     25 	SendStart  string
     26 	SendMid    string
     27 	SendEnd    string
     28 	ShowStart  string
     29 	ShowMid    string
     30 	ShowEnd    string
     31 	MaxLine    int
     32 	ContSuffix string
     33 	ContPrefix string
     34 	Ignored    []string
     35 	Important  []string
     36 	Verbose    bool
     37 }
     38 
     39 type LogConfig struct {
     40 	SqlDriver     string
     41 	SqlConnection string
     42 }
     43 
     44 type MqttConfig struct {
     45 	Server    string
     46 	Session   string
     47 	UserName  string
     48 	Password  string
     49 	TLS       bool
     50 	Keepalive int
     51 	Topics    []string
     52 }
     53 
     54 type Config struct {
     55 	Irc  IrcConfig
     56 	Log  LogConfig
     57 	Mqtt MqttConfig
     58 }
     59 
     60 type Msg struct {
     61 	Topic   []byte
     62 	Message []byte
     63 }
     64 
     65 func errMsg(context string, err error) Msg {
     66 	return Msg{
     67 		Topic:   []byte("$mqttim/" + context),
     68 		Message: []byte(err.Error()),
     69 	}
     70 }
     71 
     72 type command struct {
     73 	name string
     74 	arg  string
     75 }
     76 
     77 func readConfig(path string) Config {
     78 	config := Config{
     79 		Irc: IrcConfig{
     80 			Nick:     "mqttim",
     81 			CmdStart: "!",
     82 			CmdMid:   " ",
     83 			SendMid:  ": ",
     84 			ShowMid:  ": ",
     85 		},
     86 		Mqtt: MqttConfig{
     87 			Topics: []string{"#"},
     88 		},
     89 	}
     90 
     91 	f, err := os.Open("mqttim.toml")
     92 	if err != nil {
     93 		log.Fatal(err)
     94 	}
     95 	defer f.Close()
     96 
     97 	d := toml.NewDecoder(f)
     98 	err = d.Decode(&config)
     99 	if err != nil {
    100 		log.Fatal(err)
    101 	}
    102 
    103 	if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine {
    104 		config.Irc.ContPrefix = ""
    105 		config.Irc.ContSuffix = ""
    106 	}
    107 
    108 	return config
    109 }
    110 
    111 func dialer(config Config) mqtt.Dialer {
    112 	if config.Mqtt.TLS {
    113 		return mqtt.NewTLSDialer("tcp", config.Mqtt.Server, nil)
    114 	} else {
    115 		return mqtt.NewDialer("tcp", config.Mqtt.Server)
    116 	}
    117 }
    118 
    119 func main() {
    120 	var err error
    121 	var m *mqtt.Client
    122 	var l *mqttLogger
    123 
    124 	cmdQueue := make(chan command, 10)
    125 	ircQueue := make(chan Msg, 10)
    126 
    127 	config := readConfig("mqttim.toml")
    128 
    129 	if len(config.Log.SqlDriver) > 0 {
    130 		db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection)
    131 		if err != nil {
    132 			log.Fatal("sql.Open:", err)
    133 		}
    134 
    135 		l, err = logInit(db)
    136 		if err != nil {
    137 			log.Fatal("logInit:", err)
    138 		}
    139 		log.Println("Logging into", config.Log.SqlConnection)
    140 	}
    141 
    142 	m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{
    143 		Dialer:       dialer(config),
    144 		PauseTimeout: 4 * time.Second,
    145 		UserName:     config.Mqtt.UserName,
    146 		Password:     []byte(config.Mqtt.Password),
    147 	})
    148 	if err != nil {
    149 		log.Fatal("mqtt.VolatileSession:", err)
    150 	}
    151 
    152 	i := irc.IRC(config.Irc.Nick, "mqttim")
    153 	if config.Irc.Verbose {
    154 		i.VerboseCallbackHandler = true
    155 		i.Debug = true
    156 	}
    157 	i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) })
    158 	i.AddCallback("366", func(e *irc.Event) {})
    159 	i.AddCallback("PRIVMSG", func(e *irc.Event) {
    160 		msg := e.Message()
    161 		if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) {
    162 			msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)]
    163 			name, arg, found := strings.Cut(msg, config.Irc.CmdMid)
    164 			if !found {
    165 				name = msg
    166 				arg = ""
    167 			}
    168 			if name == "send" {
    169 				msg = arg
    170 			} else {
    171 				cmdQueue <- command{name: name, arg: arg}
    172 				return
    173 			}
    174 		}
    175 		if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) {
    176 			return
    177 		}
    178 		msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)]
    179 		topic, payload, found := strings.Cut(msg, config.Irc.SendMid)
    180 		if found {
    181 			logSent(l, []byte(payload), []byte(topic))
    182 			if err := m.Publish(nil, []byte(payload), topic); err != nil {
    183 				ircQueue <- errMsg("Publish", err)
    184 			}
    185 		}
    186 	})
    187 	err = i.Connect(config.Irc.Server)
    188 	if err != nil {
    189 		log.Fatal("irc.Connect:", err)
    190 	}
    191 	go subscribeAll(m, ircQueue, config.Mqtt.Topics)
    192 	go mqttReader(m, l, ircQueue, &config)
    193 	go ircSender(&config.Irc, i, ircQueue, cmdQueue)
    194 	go mqttKeepalive(m, ircQueue, config.Mqtt.Keepalive)
    195 	i.Loop()
    196 }
    197 
    198 func dup(src []byte) []byte {
    199 	res := make([]byte, len(src))
    200 	copy(res, src)
    201 	return res
    202 }
    203 
    204 func mqttKeepalive(m *mqtt.Client, c chan<- Msg, keepalive int) {
    205 	if keepalive <= 0 {
    206 		return
    207 	}
    208 
    209 	period := time.Duration(keepalive) * time.Second
    210 
    211 	for {
    212 		time.Sleep(period)
    213 
    214 		if err := m.Ping(nil); err != nil {
    215 			c <- errMsg("mqttPing", err)
    216 		}
    217 	}
    218 }
    219 
    220 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) {
    221 	var big *mqtt.BigMessage
    222 
    223 	for {
    224 		message, topic, err := m.ReadSlices()
    225 		switch {
    226 		case err == nil:
    227 			msg := Msg{Topic: dup(topic), Message: dup(message)}
    228 			logReceived(l, msg.Message, msg.Topic)
    229 			c <- msg
    230 
    231 		case errors.As(err, &big):
    232 			msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")}
    233 			logReceived(l, msg.Message, msg.Topic)
    234 			c <- msg
    235 
    236 		case errors.Is(err, mqtt.ErrClosed):
    237 			log.Println("mqttReader finishing:", err)
    238 			return
    239 
    240 		case mqtt.IsConnectionRefused(err):
    241 			c <- errMsg("mqttReader2", err)
    242 			time.Sleep(5 * time.Minute)
    243 
    244 		default:
    245 			c <- errMsg("mqttReader", err)
    246 			time.Sleep(2 * time.Second)
    247 		}
    248 	}
    249 }
    250 
    251 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) {
    252 	var buf strings.Builder
    253 	f := createTopicFilter(config)
    254 
    255 	for {
    256 		select {
    257 		case m := <-cm:
    258 			if !isFiltered(&f, m.Topic) {
    259 				str := config.ShowStart +
    260 					string(m.Topic) +
    261 					config.ShowMid +
    262 					string(m.Message) +
    263 					config.ShowEnd
    264 				ircSend(config, i, str, &buf)
    265 			}
    266 		case cmd := <-cc:
    267 			switch cmd.name {
    268 			case "filters":
    269 				ircSendFilters(config, i, &f, &buf)
    270 			case "help":
    271 				ircSend(config, i, "Command line:", &buf)
    272 				ircSendHelp(config, i, &buf, "filters", "")
    273 				ircSendHelp(config, i, &buf, "help", "")
    274 				ircSendHelp(config, i, &buf, "ignore", "<topic>")
    275 				ircSendHelp(config, i, &buf, "important", "<topic>")
    276 				ircSendHelp(config, i, &buf, "quit", "[message]")
    277 				ircSendHelp(config, i, &buf, "unignore", "<topic>")
    278 				ircSendHelp(config, i, &buf, "unimportant", "<topic>")
    279 			case "ignore":
    280 				filterAddIgnored(&f, cmd.arg)
    281 			case "important":
    282 				filterAddImportant(&f, cmd.arg)
    283 			case "quit":
    284 				log.Println("Quit command", cmd.arg)
    285 				i.QuitMessage = cmd.arg
    286 				i.Quit()
    287 			case "unignore":
    288 				filterDelIgnored(&f, cmd.arg)
    289 			case "unimportant":
    290 				filterDelImportant(&f, cmd.arg)
    291 			default:
    292 				ircSend(config, i, "Unknown command: "+cmd.name, &buf)
    293 			}
    294 		}
    295 	}
    296 }
    297 
    298 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) {
    299 	if config.MaxLine <= 0 || len(s) < config.MaxLine {
    300 		i.Privmsg(config.Channel, s)
    301 	} else {
    302 		for offset := 0; offset < len(s); {
    303 			l := len(s) - offset
    304 			buf.Reset()
    305 			if offset > 0 {
    306 				buf.WriteString(config.ContPrefix)
    307 			}
    308 
    309 			if buf.Len()+l <= config.MaxLine {
    310 				buf.WriteString(s[offset:])
    311 			} else {
    312 				l = config.MaxLine - buf.Len() - len(config.ContSuffix)
    313 				buf.WriteString(s[offset : offset+l])
    314 				buf.WriteString(config.ContSuffix)
    315 			}
    316 
    317 			i.Privmsg(config.Channel, buf.String())
    318 			offset += l
    319 		}
    320 	}
    321 }
    322 
    323 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) {
    324 	if len(topics) >= 2 {
    325 		ircSend(config, i, name+" = [", buf)
    326 		for index, topic := range topics {
    327 			suffix := ","
    328 			if index == len(topics)-1 {
    329 				suffix = " ]"
    330 			}
    331 			ircSend(config, i, fmt.Sprintf("  %q%s", strings.Join(topic, "/"), suffix), buf)
    332 		}
    333 	} else if len(topics) == 1 {
    334 		ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf)
    335 	} else {
    336 		ircSend(config, i, name+" = []", buf)
    337 	}
    338 }
    339 
    340 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) {
    341 	ircSendTopicList(config, i, "important", f.important, buf)
    342 	ircSendTopicList(config, i, "ignored", f.ignored, buf)
    343 }
    344 
    345 func ircSendHelp(config *IrcConfig, i *irc.Connection, buf *strings.Builder, cmd, arg string) {
    346 	if arg == "" {
    347 		ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdEnd), buf)
    348 	} else {
    349 		ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdMid+arg+config.CmdEnd), buf)
    350 	}
    351 }
    352 
    353 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg, topics []string) {
    354 	for _, topic := range topics {
    355 		for {
    356 			err := m.Subscribe(nil, topic)
    357 
    358 			if err != nil {
    359 				ircQueue <- errMsg("Subscribe", err)
    360 				time.Sleep(1 * time.Minute)
    361 			} else {
    362 				break
    363 			}
    364 		}
    365 	}
    366 }
    367 
    368 /**************** MQTT Logger Into SQL ****************/
    369 
    370 type mqttLogger struct {
    371 	db             *sql.DB
    372 	getTopic       *sql.Stmt
    373 	insertReceived *sql.Stmt
    374 	insertSent     *sql.Stmt
    375 	insertTopic    *sql.Stmt
    376 }
    377 
    378 func logClose(l *mqttLogger) {
    379 	if l.insertTopic != nil {
    380 		l.insertTopic.Close()
    381 		l.insertTopic = nil
    382 	}
    383 	if l.insertSent != nil {
    384 		l.insertSent.Close()
    385 		l.insertSent = nil
    386 	}
    387 	if l.insertReceived != nil {
    388 		l.insertReceived.Close()
    389 		l.insertReceived = nil
    390 	}
    391 	if l.getTopic != nil {
    392 		l.getTopic.Close()
    393 		l.getTopic = nil
    394 	}
    395 	if l.db != nil {
    396 		l.db.Close()
    397 		l.db = nil
    398 	}
    399 }
    400 
    401 func logInit(db *sql.DB) (*mqttLogger, error) {
    402 	var err error
    403 	result := mqttLogger{db: db}
    404 
    405 	for _, cmd := range []string{
    406 		"CREATE TABLE IF NOT EXISTS topics" +
    407 			"(id INTEGER PRIMARY KEY AUTOINCREMENT," +
    408 			" name TEXT NOT NULL);",
    409 		"CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);",
    410 		"CREATE TABLE IF NOT EXISTS received" +
    411 			"(timestamp REAL NOT NULL," +
    412 			" topic_id INTEGER NOT NULL," +
    413 			" message TEXT NOT NULL," +
    414 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    415 		"CREATE TABLE IF NOT EXISTS sent" +
    416 			"(timestamp REAL NOT NULL," +
    417 			" topic_id INTEGER NOT NULL," +
    418 			" message TEXT NOT NULL," +
    419 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    420 		"CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);",
    421 		"CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);",
    422 		"CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);",
    423 		"CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);",
    424 	} {
    425 		if _, err = result.db.Exec(cmd); err != nil {
    426 			logClose(&result)
    427 			return nil, err
    428 		}
    429 	}
    430 
    431 	result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;")
    432 	if err != nil {
    433 		logClose(&result)
    434 		return nil, err
    435 	}
    436 
    437 	result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);")
    438 	if err != nil {
    439 		logClose(&result)
    440 		return nil, err
    441 	}
    442 
    443 	result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);")
    444 	if err != nil {
    445 		logClose(&result)
    446 		return nil, err
    447 	}
    448 
    449 	result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);")
    450 	if err != nil {
    451 		logClose(&result)
    452 		return nil, err
    453 	}
    454 
    455 	return &result, nil
    456 }
    457 
    458 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) {
    459 	t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
    460 
    461 	id, err := logTopic(l, topic)
    462 	if err != nil {
    463 		log.Println("logTopic:", err)
    464 		logClose(l)
    465 		return
    466 	}
    467 
    468 	_, err = stmt.Exec(t, id, message)
    469 	if err != nil {
    470 		log.Println("logMessage:", err)
    471 		logClose(l)
    472 		return
    473 	}
    474 }
    475 
    476 func logReceived(l *mqttLogger, message, topic []byte) {
    477 	if l == nil || l.db == nil {
    478 		return
    479 	}
    480 
    481 	logMessage(l, l.insertReceived, message, topic)
    482 }
    483 
    484 func logSent(l *mqttLogger, message, topic []byte) {
    485 	if l == nil || l.db == nil {
    486 		return
    487 	}
    488 
    489 	logMessage(l, l.insertSent, message, topic)
    490 }
    491 
    492 func logTopic(l *mqttLogger, topic []byte) (int64, error) {
    493 	var id int64
    494 	err := l.getTopic.QueryRow(topic).Scan(&id)
    495 
    496 	if err == sql.ErrNoRows {
    497 		res, err := l.insertTopic.Exec(topic)
    498 
    499 		if err != nil {
    500 			return 0, err
    501 		}
    502 
    503 		return res.LastInsertId()
    504 	} else if err != nil {
    505 		return 0, err
    506 	}
    507 
    508 	return id, nil
    509 }
    510 
    511 /**************** MQTT Topic Filter ****************/
    512 
    513 type mqttTopicFilter struct {
    514 	ignored   [][]string
    515 	important [][]string
    516 }
    517 
    518 func addPattern(topics [][]string, newPattern string) [][]string {
    519 	return append(topics, strings.Split(newPattern, "/"))
    520 }
    521 
    522 func delPattern(topics [][]string, toRemove string) [][]string {
    523 	var result [][]string
    524 
    525 	for _, pat := range topics {
    526 		if strings.Join(pat, "/") != toRemove {
    527 			result = append(result, pat)
    528 		}
    529 	}
    530 
    531 	return result
    532 }
    533 
    534 func createTopicFilter(config *IrcConfig) mqttTopicFilter {
    535 	result := mqttTopicFilter{
    536 		ignored:   make([][]string, len(config.Ignored)),
    537 		important: make([][]string, len(config.Important)),
    538 	}
    539 
    540 	for i, s := range config.Ignored {
    541 		result.ignored[i] = strings.Split(s, "/")
    542 	}
    543 
    544 	for i, s := range config.Important {
    545 		result.important[i] = strings.Split(s, "/")
    546 	}
    547 
    548 	return result
    549 }
    550 
    551 func filterAddIgnored(filter *mqttTopicFilter, pattern string) {
    552 	filter.ignored = addPattern(filter.ignored, pattern)
    553 }
    554 
    555 func filterAddImportant(filter *mqttTopicFilter, pattern string) {
    556 	filter.important = addPattern(filter.important, pattern)
    557 }
    558 
    559 func filterDelIgnored(filter *mqttTopicFilter, pattern string) {
    560 	filter.ignored = delPattern(filter.ignored, pattern)
    561 }
    562 
    563 func filterDelImportant(filter *mqttTopicFilter, pattern string) {
    564 	filter.important = delPattern(filter.important, pattern)
    565 }
    566 
    567 func isFiltered(filter *mqttTopicFilter, topic []byte) bool {
    568 	t := strings.Split(string(topic), "/")
    569 
    570 	for _, pattern := range filter.important {
    571 		if topicMatch(t, pattern) {
    572 			return false
    573 		}
    574 	}
    575 
    576 	for _, pattern := range filter.ignored {
    577 		if topicMatch(t, pattern) {
    578 			return true
    579 		}
    580 	}
    581 
    582 	return false
    583 }
    584 
    585 func topicMatch(actual, filter []string) bool {
    586 	if len(filter) == 0 {
    587 		return len(actual) == 0
    588 	}
    589 
    590 	if filter[0] == "#" {
    591 		if len(filter) == 1 {
    592 			return true
    593 		}
    594 
    595 		for i := range actual {
    596 			if topicMatch(actual[i:], filter[1:]) {
    597 				return true
    598 			}
    599 		}
    600 
    601 		return false
    602 	}
    603 
    604 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    605 		return topicMatch(actual[1:], filter[1:])
    606 	}
    607 
    608 	return false
    609 }