mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

main.go (13876B)


      1 package main
      2 
      3 import (
      4 	"database/sql"
      5 	"errors"
      6 	"fmt"
      7 	"log"
      8 	"os"
      9 	"strings"
     10 	"time"
     11 
     12 	_ "github.com/glebarez/go-sqlite"
     13 	"github.com/go-mqtt/mqtt"
     14 	"github.com/pelletier/go-toml/v2"
     15 	"github.com/thoj/go-ircevent"
     16 )
     17 
     18 type IrcConfig struct {
     19 	Channel    string
     20 	Server     string
     21 	Nick       string
     22 	CmdStart   string
     23 	CmdMid     string
     24 	CmdEnd     string
     25 	SendStart  string
     26 	SendMid    string
     27 	SendEnd    string
     28 	ShowStart  string
     29 	ShowMid    string
     30 	ShowEnd    string
     31 	MaxLine    int
     32 	ContSuffix string
     33 	ContPrefix string
     34 	Ignored    []string
     35 	Important  []string
     36 	Verbose    bool
     37 }
     38 
     39 type LogConfig struct {
     40 	SqlDriver     string
     41 	SqlConnection string
     42 }
     43 
     44 type MqttConfig struct {
     45 	Server    string
     46 	Session   string
     47 	UserName  string
     48 	Password  string
     49 	TLS       bool
     50 	Keepalive int
     51 	Topics    []string
     52 }
     53 
     54 type Config struct {
     55 	Irc  IrcConfig
     56 	Log  LogConfig
     57 	Mqtt MqttConfig
     58 }
     59 
     60 type Msg struct {
     61 	Topic   []byte
     62 	Message []byte
     63 }
     64 
     65 func errMsg(context string, err error) Msg {
     66 	return Msg{
     67 		Topic:   []byte("$mqttim/" + context),
     68 		Message: []byte(err.Error()),
     69 	}
     70 }
     71 
     72 type command struct {
     73 	name string
     74 	arg  string
     75 }
     76 
     77 func readConfig(path string) Config {
     78 	config := Config{
     79 		Irc: IrcConfig{
     80 			Nick:     "mqttim",
     81 			CmdStart: "!",
     82 			CmdMid:   " ",
     83 			SendMid:  ": ",
     84 			ShowMid:  ": ",
     85 		},
     86 		Mqtt: MqttConfig{
     87 			Topics: []string{"#"},
     88 		},
     89 	}
     90 
     91 	f, err := os.Open(path)
     92 	if err != nil {
     93 		log.Fatal(err)
     94 	}
     95 	defer f.Close()
     96 
     97 	d := toml.NewDecoder(f)
     98 	err = d.Decode(&config)
     99 	if err != nil {
    100 		log.Fatal(err)
    101 	}
    102 
    103 	if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine {
    104 		config.Irc.ContPrefix = ""
    105 		config.Irc.ContSuffix = ""
    106 	}
    107 
    108 	return config
    109 }
    110 
    111 func dialer(config Config) mqtt.Dialer {
    112 	if config.Mqtt.TLS {
    113 		return mqtt.NewTLSDialer("tcp", config.Mqtt.Server, nil)
    114 	} else {
    115 		return mqtt.NewDialer("tcp", config.Mqtt.Server)
    116 	}
    117 }
    118 
    119 func main() {
    120 	var err error
    121 	var m *mqtt.Client
    122 	var l *mqttLogger
    123 
    124 	cmdQueue := make(chan command, 10)
    125 	ircQueue := make(chan Msg, 10)
    126 
    127 	config_file := "mqttim.toml"
    128 	if len(os.Args) > 1 {
    129 		config_file = os.Args[1]
    130 	}
    131 	config := readConfig(config_file)
    132 
    133 	if len(config.Log.SqlDriver) > 0 {
    134 		db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection)
    135 		if err != nil {
    136 			log.Fatal("sql.Open:", err)
    137 		}
    138 
    139 		l, err = logInit(db)
    140 		if err != nil {
    141 			log.Fatal("logInit:", err)
    142 		}
    143 		log.Println("Logging into", config.Log.SqlConnection)
    144 	}
    145 
    146 	m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{
    147 		Dialer:       dialer(config),
    148 		PauseTimeout: 4 * time.Second,
    149 		UserName:     config.Mqtt.UserName,
    150 		Password:     []byte(config.Mqtt.Password),
    151 	})
    152 	if err != nil {
    153 		log.Fatal("mqtt.VolatileSession:", err)
    154 	}
    155 
    156 	i := irc.IRC(config.Irc.Nick, "mqttim")
    157 	if config.Irc.Verbose {
    158 		i.VerboseCallbackHandler = true
    159 		i.Debug = true
    160 	}
    161 	i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) })
    162 	i.AddCallback("366", func(e *irc.Event) {})
    163 	i.AddCallback("PRIVMSG", func(e *irc.Event) {
    164 		msg := e.Message()
    165 		if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) {
    166 			msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)]
    167 			name, arg, found := strings.Cut(msg, config.Irc.CmdMid)
    168 			if !found {
    169 				name = msg
    170 				arg = ""
    171 			}
    172 			if name == "send" {
    173 				msg = arg
    174 			} else {
    175 				cmdQueue <- command{name: name, arg: arg}
    176 				return
    177 			}
    178 		}
    179 		if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) {
    180 			return
    181 		}
    182 		msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)]
    183 		topic, payload, found := strings.Cut(msg, config.Irc.SendMid)
    184 		if found {
    185 			logSent(l, []byte(payload), []byte(topic))
    186 			if err := m.Publish(nil, []byte(payload), topic); err != nil {
    187 				ircQueue <- errMsg("Publish", err)
    188 			}
    189 		}
    190 	})
    191 	err = i.Connect(config.Irc.Server)
    192 	if err != nil {
    193 		log.Fatal("irc.Connect:", err)
    194 	}
    195 	go subscribeAll(m, ircQueue, config.Mqtt.Topics)
    196 	go mqttReader(m, l, ircQueue, &config)
    197 	go ircSender(&config.Irc, i, ircQueue, cmdQueue)
    198 	go mqttKeepalive(m, ircQueue, config.Mqtt.Keepalive)
    199 	i.Loop()
    200 }
    201 
    202 func dup(src []byte) []byte {
    203 	res := make([]byte, len(src))
    204 	copy(res, src)
    205 	return res
    206 }
    207 
    208 func mqttKeepalive(m *mqtt.Client, c chan<- Msg, keepalive int) {
    209 	if keepalive <= 0 {
    210 		return
    211 	}
    212 
    213 	period := time.Duration(keepalive) * time.Second
    214 
    215 	for {
    216 		time.Sleep(period)
    217 
    218 		if err := m.Ping(nil); err != nil {
    219 			c <- errMsg("mqttPing", err)
    220 		}
    221 	}
    222 }
    223 
    224 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) {
    225 	var big *mqtt.BigMessage
    226 
    227 	for {
    228 		message, topic, err := m.ReadSlices()
    229 		switch {
    230 		case err == nil:
    231 			msg := Msg{Topic: dup(topic), Message: dup(message)}
    232 			logReceived(l, msg.Message, msg.Topic)
    233 			c <- msg
    234 
    235 		case errors.As(err, &big):
    236 			msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")}
    237 			logReceived(l, msg.Message, msg.Topic)
    238 			c <- msg
    239 
    240 		case errors.Is(err, mqtt.ErrClosed):
    241 			log.Println("mqttReader finishing:", err)
    242 			return
    243 
    244 		case mqtt.IsConnectionRefused(err):
    245 			c <- errMsg("mqttReader2", err)
    246 			time.Sleep(5 * time.Minute)
    247 
    248 		default:
    249 			c <- errMsg("mqttReader", err)
    250 			time.Sleep(2 * time.Second)
    251 		}
    252 	}
    253 }
    254 
    255 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) {
    256 	var buf strings.Builder
    257 	f := createTopicFilter(config)
    258 
    259 	for {
    260 		select {
    261 		case m := <-cm:
    262 			if !isFiltered(&f, m.Topic) {
    263 				str := config.ShowStart +
    264 					string(m.Topic) +
    265 					config.ShowMid +
    266 					string(m.Message) +
    267 					config.ShowEnd
    268 				ircSend(config, i, str, &buf)
    269 			}
    270 		case cmd := <-cc:
    271 			switch cmd.name {
    272 			case "filters":
    273 				ircSendFilters(config, i, &f, &buf)
    274 			case "help":
    275 				ircSend(config, i, "Command line:", &buf)
    276 				ircSendHelp(config, i, &buf, "filters", "")
    277 				ircSendHelp(config, i, &buf, "help", "")
    278 				ircSendHelp(config, i, &buf, "ignore", "<topic>")
    279 				ircSendHelp(config, i, &buf, "important", "<topic>")
    280 				ircSendHelp(config, i, &buf, "quit", "[message]")
    281 				ircSendHelp(config, i, &buf, "unignore", "<topic>")
    282 				ircSendHelp(config, i, &buf, "unimportant", "<topic>")
    283 			case "ignore":
    284 				filterAddIgnored(&f, cmd.arg)
    285 			case "important":
    286 				filterAddImportant(&f, cmd.arg)
    287 			case "quit":
    288 				log.Println("Quit command", cmd.arg)
    289 				i.QuitMessage = cmd.arg
    290 				i.Quit()
    291 			case "unignore":
    292 				filterDelIgnored(&f, cmd.arg)
    293 			case "unimportant":
    294 				filterDelImportant(&f, cmd.arg)
    295 			default:
    296 				ircSend(config, i, "Unknown command: "+cmd.name, &buf)
    297 			}
    298 		}
    299 	}
    300 }
    301 
    302 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) {
    303 	if config.MaxLine <= 0 || len(s) < config.MaxLine {
    304 		i.Privmsg(config.Channel, s)
    305 	} else {
    306 		for offset := 0; offset < len(s); {
    307 			l := len(s) - offset
    308 			buf.Reset()
    309 			if offset > 0 {
    310 				buf.WriteString(config.ContPrefix)
    311 			}
    312 
    313 			if buf.Len()+l <= config.MaxLine {
    314 				buf.WriteString(s[offset:])
    315 			} else {
    316 				l = config.MaxLine - buf.Len() - len(config.ContSuffix)
    317 				buf.WriteString(s[offset : offset+l])
    318 				buf.WriteString(config.ContSuffix)
    319 			}
    320 
    321 			i.Privmsg(config.Channel, buf.String())
    322 			offset += l
    323 		}
    324 	}
    325 }
    326 
    327 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) {
    328 	if len(topics) >= 2 {
    329 		ircSend(config, i, name+" = [", buf)
    330 		for index, topic := range topics {
    331 			suffix := ","
    332 			if index == len(topics)-1 {
    333 				suffix = " ]"
    334 			}
    335 			ircSend(config, i, fmt.Sprintf("  %q%s", strings.Join(topic, "/"), suffix), buf)
    336 		}
    337 	} else if len(topics) == 1 {
    338 		ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf)
    339 	} else {
    340 		ircSend(config, i, name+" = []", buf)
    341 	}
    342 }
    343 
    344 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) {
    345 	ircSendTopicList(config, i, "important", f.important, buf)
    346 	ircSendTopicList(config, i, "ignored", f.ignored, buf)
    347 }
    348 
    349 func ircSendHelp(config *IrcConfig, i *irc.Connection, buf *strings.Builder, cmd, arg string) {
    350 	if arg == "" {
    351 		ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdEnd), buf)
    352 	} else {
    353 		ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdMid+arg+config.CmdEnd), buf)
    354 	}
    355 }
    356 
    357 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg, topics []string) {
    358 	for _, topic := range topics {
    359 		for {
    360 			err := m.Subscribe(nil, topic)
    361 
    362 			if err != nil {
    363 				ircQueue <- errMsg("Subscribe", err)
    364 				time.Sleep(1 * time.Minute)
    365 			} else {
    366 				break
    367 			}
    368 		}
    369 	}
    370 }
    371 
    372 /**************** MQTT Logger Into SQL ****************/
    373 
    374 type mqttLogger struct {
    375 	db             *sql.DB
    376 	getTopic       *sql.Stmt
    377 	insertReceived *sql.Stmt
    378 	insertSent     *sql.Stmt
    379 	insertTopic    *sql.Stmt
    380 }
    381 
    382 func logClose(l *mqttLogger) {
    383 	if l.insertTopic != nil {
    384 		l.insertTopic.Close()
    385 		l.insertTopic = nil
    386 	}
    387 	if l.insertSent != nil {
    388 		l.insertSent.Close()
    389 		l.insertSent = nil
    390 	}
    391 	if l.insertReceived != nil {
    392 		l.insertReceived.Close()
    393 		l.insertReceived = nil
    394 	}
    395 	if l.getTopic != nil {
    396 		l.getTopic.Close()
    397 		l.getTopic = nil
    398 	}
    399 	if l.db != nil {
    400 		l.db.Close()
    401 		l.db = nil
    402 	}
    403 }
    404 
    405 func logInit(db *sql.DB) (*mqttLogger, error) {
    406 	var err error
    407 	result := mqttLogger{db: db}
    408 
    409 	for _, cmd := range []string{
    410 		"CREATE TABLE IF NOT EXISTS topics" +
    411 			"(id INTEGER PRIMARY KEY AUTOINCREMENT," +
    412 			" name TEXT NOT NULL);",
    413 		"CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);",
    414 		"CREATE TABLE IF NOT EXISTS received" +
    415 			"(timestamp REAL NOT NULL," +
    416 			" topic_id INTEGER NOT NULL," +
    417 			" message TEXT NOT NULL," +
    418 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    419 		"CREATE TABLE IF NOT EXISTS sent" +
    420 			"(timestamp REAL NOT NULL," +
    421 			" topic_id INTEGER NOT NULL," +
    422 			" message TEXT NOT NULL," +
    423 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    424 		"CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);",
    425 		"CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);",
    426 		"CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);",
    427 		"CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);",
    428 	} {
    429 		if _, err = result.db.Exec(cmd); err != nil {
    430 			logClose(&result)
    431 			return nil, err
    432 		}
    433 	}
    434 
    435 	result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;")
    436 	if err != nil {
    437 		logClose(&result)
    438 		return nil, err
    439 	}
    440 
    441 	result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);")
    442 	if err != nil {
    443 		logClose(&result)
    444 		return nil, err
    445 	}
    446 
    447 	result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);")
    448 	if err != nil {
    449 		logClose(&result)
    450 		return nil, err
    451 	}
    452 
    453 	result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);")
    454 	if err != nil {
    455 		logClose(&result)
    456 		return nil, err
    457 	}
    458 
    459 	return &result, nil
    460 }
    461 
    462 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) {
    463 	t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
    464 
    465 	id, err := logTopic(l, topic)
    466 	if err != nil {
    467 		log.Println("logTopic:", err)
    468 		logClose(l)
    469 		return
    470 	}
    471 
    472 	_, err = stmt.Exec(t, id, message)
    473 	if err != nil {
    474 		log.Println("logMessage:", err)
    475 		logClose(l)
    476 		return
    477 	}
    478 }
    479 
    480 func logReceived(l *mqttLogger, message, topic []byte) {
    481 	if l == nil || l.db == nil {
    482 		return
    483 	}
    484 
    485 	logMessage(l, l.insertReceived, message, topic)
    486 }
    487 
    488 func logSent(l *mqttLogger, message, topic []byte) {
    489 	if l == nil || l.db == nil {
    490 		return
    491 	}
    492 
    493 	logMessage(l, l.insertSent, message, topic)
    494 }
    495 
    496 func logTopic(l *mqttLogger, topic []byte) (int64, error) {
    497 	var id int64
    498 	err := l.getTopic.QueryRow(topic).Scan(&id)
    499 
    500 	if err == sql.ErrNoRows {
    501 		res, err := l.insertTopic.Exec(topic)
    502 
    503 		if err != nil {
    504 			return 0, err
    505 		}
    506 
    507 		return res.LastInsertId()
    508 	} else if err != nil {
    509 		return 0, err
    510 	}
    511 
    512 	return id, nil
    513 }
    514 
    515 /**************** MQTT Topic Filter ****************/
    516 
    517 type mqttTopicFilter struct {
    518 	ignored   [][]string
    519 	important [][]string
    520 }
    521 
    522 func addPattern(topics [][]string, newPattern string) [][]string {
    523 	return append(topics, strings.Split(newPattern, "/"))
    524 }
    525 
    526 func delPattern(topics [][]string, toRemove string) [][]string {
    527 	var result [][]string
    528 
    529 	for _, pat := range topics {
    530 		if strings.Join(pat, "/") != toRemove {
    531 			result = append(result, pat)
    532 		}
    533 	}
    534 
    535 	return result
    536 }
    537 
    538 func createTopicFilter(config *IrcConfig) mqttTopicFilter {
    539 	result := mqttTopicFilter{
    540 		ignored:   make([][]string, len(config.Ignored)),
    541 		important: make([][]string, len(config.Important)),
    542 	}
    543 
    544 	for i, s := range config.Ignored {
    545 		result.ignored[i] = strings.Split(s, "/")
    546 	}
    547 
    548 	for i, s := range config.Important {
    549 		result.important[i] = strings.Split(s, "/")
    550 	}
    551 
    552 	return result
    553 }
    554 
    555 func filterAddIgnored(filter *mqttTopicFilter, pattern string) {
    556 	filter.ignored = addPattern(filter.ignored, pattern)
    557 }
    558 
    559 func filterAddImportant(filter *mqttTopicFilter, pattern string) {
    560 	filter.important = addPattern(filter.important, pattern)
    561 }
    562 
    563 func filterDelIgnored(filter *mqttTopicFilter, pattern string) {
    564 	filter.ignored = delPattern(filter.ignored, pattern)
    565 }
    566 
    567 func filterDelImportant(filter *mqttTopicFilter, pattern string) {
    568 	filter.important = delPattern(filter.important, pattern)
    569 }
    570 
    571 func isFiltered(filter *mqttTopicFilter, topic []byte) bool {
    572 	t := strings.Split(string(topic), "/")
    573 
    574 	for _, pattern := range filter.important {
    575 		if topicMatch(t, pattern) {
    576 			return false
    577 		}
    578 	}
    579 
    580 	for _, pattern := range filter.ignored {
    581 		if topicMatch(t, pattern) {
    582 			return true
    583 		}
    584 	}
    585 
    586 	return false
    587 }
    588 
    589 func topicMatch(actual, filter []string) bool {
    590 	if len(filter) == 0 {
    591 		return len(actual) == 0
    592 	}
    593 
    594 	if filter[0] == "#" {
    595 		if len(filter) == 1 {
    596 			return true
    597 		}
    598 
    599 		for i := range actual {
    600 			if topicMatch(actual[i:], filter[1:]) {
    601 				return true
    602 			}
    603 		}
    604 
    605 		return false
    606 	}
    607 
    608 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    609 		return topicMatch(actual[1:], filter[1:])
    610 	}
    611 
    612 	return false
    613 }