mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

main.go (12395B)


      1 package main
      2 
      3 import (
      4 	"database/sql"
      5 	"errors"
      6 	"fmt"
      7 	"log"
      8 	"os"
      9 	"strings"
     10 	"time"
     11 
     12 	_ "github.com/glebarez/go-sqlite"
     13 	"github.com/go-mqtt/mqtt"
     14 	"github.com/pelletier/go-toml/v2"
     15 	"github.com/thoj/go-ircevent"
     16 )
     17 
     18 type IrcConfig struct {
     19 	Channel    string
     20 	Server     string
     21 	Nick       string
     22 	CmdStart   string
     23 	CmdMid     string
     24 	CmdEnd     string
     25 	SendStart  string
     26 	SendMid    string
     27 	SendEnd    string
     28 	ShowStart  string
     29 	ShowMid    string
     30 	ShowEnd    string
     31 	MaxLine    int
     32 	ContSuffix string
     33 	ContPrefix string
     34 	Ignored    []string
     35 	Important  []string
     36 	Verbose    bool
     37 }
     38 
     39 type LogConfig struct {
     40 	SqlDriver     string
     41 	SqlConnection string
     42 }
     43 
     44 type MqttConfig struct {
     45 	Server   string
     46 	Session  string
     47 	UserName string
     48 	Password string
     49 }
     50 
     51 type Config struct {
     52 	Irc  IrcConfig
     53 	Log  LogConfig
     54 	Mqtt MqttConfig
     55 }
     56 
     57 type Msg struct {
     58 	Topic   []byte
     59 	Message []byte
     60 }
     61 
     62 func errMsg(context string, err error) Msg {
     63 	return Msg{
     64 		Topic:   []byte("$mqttim/" + context),
     65 		Message: []byte(err.Error()),
     66 	}
     67 }
     68 
     69 type command struct {
     70 	name string
     71 	arg  string
     72 }
     73 
     74 func readConfig(path string) Config {
     75 	config := Config{
     76 		Irc: IrcConfig{
     77 			Nick:     "mqttim",
     78 			CmdStart: "!",
     79 			CmdMid:   " ",
     80 			SendMid:  ": ",
     81 			ShowMid:  ": ",
     82 		},
     83 	}
     84 
     85 	f, err := os.Open("mqttim.toml")
     86 	if err != nil {
     87 		log.Fatal(err)
     88 	}
     89 	defer f.Close()
     90 
     91 	d := toml.NewDecoder(f)
     92 	err = d.Decode(&config)
     93 	if err != nil {
     94 		log.Fatal(err)
     95 	}
     96 
     97 	if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine {
     98 		config.Irc.ContPrefix = ""
     99 		config.Irc.ContSuffix = ""
    100 	}
    101 
    102 	return config
    103 }
    104 
    105 func main() {
    106 	var err error
    107 	var m *mqtt.Client
    108 	var l *mqttLogger
    109 
    110 	cmdQueue := make(chan command, 10)
    111 	ircQueue := make(chan Msg, 10)
    112 
    113 	config := readConfig("mqttim.toml")
    114 
    115 	if len(config.Log.SqlDriver) > 0 {
    116 		db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection)
    117 		if err != nil {
    118 			log.Fatal("sql.Open:", err)
    119 		}
    120 
    121 		l, err = logInit(db)
    122 		if err != nil {
    123 			log.Fatal("logInit:", err)
    124 		}
    125 		log.Println("Logging into", config.Log.SqlConnection)
    126 	}
    127 
    128 	m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{
    129 		Dialer:       mqtt.NewDialer("tcp", config.Mqtt.Server),
    130 		PauseTimeout: 4 * time.Second,
    131 		UserName:     config.Mqtt.UserName,
    132 		Password:     []byte(config.Mqtt.Password),
    133 	})
    134 	if err != nil {
    135 		log.Fatal("mqtt.VolatileSession:", err)
    136 	}
    137 
    138 	i := irc.IRC(config.Irc.Nick, "mqttim")
    139 	if config.Irc.Verbose {
    140 		i.VerboseCallbackHandler = true
    141 		i.Debug = true
    142 	}
    143 	i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) })
    144 	i.AddCallback("366", func(e *irc.Event) {})
    145 	i.AddCallback("PRIVMSG", func(e *irc.Event) {
    146 		msg := e.Message()
    147 		if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) {
    148 			msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)]
    149 			name, arg, found := strings.Cut(msg, config.Irc.CmdMid)
    150 			if !found {
    151 				name = msg
    152 				arg = ""
    153 			}
    154 			if name == "send" {
    155 				msg = arg
    156 			} else {
    157 				cmdQueue <- command{name: name, arg: arg}
    158 				return
    159 			}
    160 		}
    161 		if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) {
    162 			return
    163 		}
    164 		msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)]
    165 		topic, payload, found := strings.Cut(msg, config.Irc.SendMid)
    166 		if found {
    167 			logSent(l, []byte(payload), []byte(topic))
    168 			if err := m.Publish(nil, []byte(payload), topic); err != nil {
    169 				ircQueue <- errMsg("Publish", err)
    170 			}
    171 		}
    172 	})
    173 	err = i.Connect(config.Irc.Server)
    174 	if err != nil {
    175 		log.Fatal("irc.Connect:", err)
    176 	}
    177 	go subscribeAll(m, ircQueue)
    178 	go mqttReader(m, l, ircQueue, &config)
    179 	go ircSender(&config.Irc, i, ircQueue, cmdQueue)
    180 	i.Loop()
    181 }
    182 
    183 func dup(src []byte) []byte {
    184 	res := make([]byte, len(src))
    185 	copy(res, src)
    186 	return res
    187 }
    188 
    189 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) {
    190 	var big *mqtt.BigMessage
    191 
    192 	for {
    193 		message, topic, err := m.ReadSlices()
    194 		switch {
    195 		case err == nil:
    196 			msg := Msg{Topic: dup(topic), Message: dup(message)}
    197 			logReceived(l, msg.Message, msg.Topic)
    198 			c <- msg
    199 
    200 		case errors.As(err, &big):
    201 			msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")}
    202 			logReceived(l, msg.Message, msg.Topic)
    203 			c <- msg
    204 
    205 		case errors.Is(err, mqtt.ErrClosed):
    206 			log.Println("mqttReader finishing:", err)
    207 			return
    208 
    209 		case mqtt.IsConnectionRefused(err):
    210 			c <- errMsg("mqttReader2", err)
    211 			time.Sleep(5 * time.Minute)
    212 
    213 		default:
    214 			c <- errMsg("mqttReader", err)
    215 			time.Sleep(2 * time.Second)
    216 		}
    217 	}
    218 }
    219 
    220 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) {
    221 	var buf strings.Builder
    222 	f := createTopicFilter(config)
    223 
    224 	for {
    225 		select {
    226 		case m := <-cm:
    227 			if !isFiltered(&f, m.Topic) {
    228 				str := config.ShowStart +
    229 					string(m.Topic) +
    230 					config.ShowMid +
    231 					string(m.Message) +
    232 					config.ShowEnd
    233 				ircSend(config, i, str, &buf)
    234 			}
    235 		case cmd := <-cc:
    236 			switch cmd.name {
    237 			case "filters":
    238 				ircSendFilters(config, i, &f, &buf)
    239 			case "ignore":
    240 				filterAddIgnored(&f, cmd.arg)
    241 			case "important":
    242 				filterAddImportant(&f, cmd.arg)
    243 			case "quit":
    244 				log.Println("Quit command", cmd.arg)
    245 				i.QuitMessage = cmd.arg
    246 				i.Quit()
    247 			case "unignore":
    248 				filterDelIgnored(&f, cmd.arg)
    249 			case "unimportant":
    250 				filterDelImportant(&f, cmd.arg)
    251 			default:
    252 				ircSend(config, i, "Unknown command: "+cmd.name, &buf)
    253 			}
    254 		}
    255 	}
    256 }
    257 
    258 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) {
    259 	if config.MaxLine <= 0 || len(s) < config.MaxLine {
    260 		i.Privmsg(config.Channel, s)
    261 	} else {
    262 		for offset := 0; offset < len(s); {
    263 			l := len(s) - offset
    264 			buf.Reset()
    265 			if offset > 0 {
    266 				buf.WriteString(config.ContPrefix)
    267 			}
    268 
    269 			if buf.Len()+l <= config.MaxLine {
    270 				buf.WriteString(s[offset:])
    271 			} else {
    272 				l = config.MaxLine - buf.Len() - len(config.ContSuffix)
    273 				buf.WriteString(s[offset : offset+l])
    274 				buf.WriteString(config.ContSuffix)
    275 			}
    276 
    277 			i.Privmsg(config.Channel, buf.String())
    278 			offset += l
    279 		}
    280 	}
    281 }
    282 
    283 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) {
    284 	if len(topics) >= 2 {
    285 		ircSend(config, i, name+" = [", buf)
    286 		for index, topic := range topics {
    287 			suffix := ","
    288 			if index == len(topics)-1 {
    289 				suffix = " ]"
    290 			}
    291 			ircSend(config, i, fmt.Sprintf("  %q%s", strings.Join(topic, "/"), suffix), buf)
    292 		}
    293 	} else if len(topics) == 1 {
    294 		ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf)
    295 	} else {
    296 		ircSend(config, i, name+" = []", buf)
    297 	}
    298 }
    299 
    300 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) {
    301 	ircSendTopicList(config, i, "important", f.important, buf)
    302 	ircSendTopicList(config, i, "ignored", f.ignored, buf)
    303 }
    304 
    305 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg) {
    306 	for {
    307 		err := m.Subscribe(nil, "#")
    308 
    309 		if err != nil {
    310 			ircQueue <- errMsg("Subscribe", err)
    311 			time.Sleep(1 * time.Minute)
    312 		} else {
    313 			return
    314 		}
    315 	}
    316 }
    317 
    318 /**************** MQTT Logger Into SQL ****************/
    319 
    320 type mqttLogger struct {
    321 	db             *sql.DB
    322 	getTopic       *sql.Stmt
    323 	insertReceived *sql.Stmt
    324 	insertSent     *sql.Stmt
    325 	insertTopic    *sql.Stmt
    326 }
    327 
    328 func logClose(l *mqttLogger) {
    329 	if l.insertTopic != nil {
    330 		l.insertTopic.Close()
    331 		l.insertTopic = nil
    332 	}
    333 	if l.insertSent != nil {
    334 		l.insertSent.Close()
    335 		l.insertSent = nil
    336 	}
    337 	if l.insertReceived != nil {
    338 		l.insertReceived.Close()
    339 		l.insertReceived = nil
    340 	}
    341 	if l.getTopic != nil {
    342 		l.getTopic.Close()
    343 		l.getTopic = nil
    344 	}
    345 	if l.db != nil {
    346 		l.db.Close()
    347 		l.db = nil
    348 	}
    349 }
    350 
    351 func logInit(db *sql.DB) (*mqttLogger, error) {
    352 	var err error
    353 	result := mqttLogger{db: db}
    354 
    355 	for _, cmd := range []string{
    356 		"CREATE TABLE IF NOT EXISTS topics" +
    357 			"(id INTEGER PRIMARY KEY AUTOINCREMENT," +
    358 			" name TEXT NOT NULL);",
    359 		"CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);",
    360 		"CREATE TABLE IF NOT EXISTS received" +
    361 			"(timestamp REAL NOT NULL," +
    362 			" topic_id INTEGER NOT NULL," +
    363 			" message TEXT NOT NULL," +
    364 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    365 		"CREATE TABLE IF NOT EXISTS sent" +
    366 			"(timestamp REAL NOT NULL," +
    367 			" topic_id INTEGER NOT NULL," +
    368 			" message TEXT NOT NULL," +
    369 			" FOREIGN KEY (topic_id) REFERENCES topics (id));",
    370 		"CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);",
    371 		"CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);",
    372 		"CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);",
    373 		"CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);",
    374 	} {
    375 		if _, err = result.db.Exec(cmd); err != nil {
    376 			logClose(&result)
    377 			return nil, err
    378 		}
    379 	}
    380 
    381 	result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;")
    382 	if err != nil {
    383 		logClose(&result)
    384 		return nil, err
    385 	}
    386 
    387 	result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);")
    388 	if err != nil {
    389 		logClose(&result)
    390 		return nil, err
    391 	}
    392 
    393 	result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);")
    394 	if err != nil {
    395 		logClose(&result)
    396 		return nil, err
    397 	}
    398 
    399 	result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);")
    400 	if err != nil {
    401 		logClose(&result)
    402 		return nil, err
    403 	}
    404 
    405 	return &result, nil
    406 }
    407 
    408 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) {
    409 	t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
    410 
    411 	id, err := logTopic(l, topic)
    412 	if err != nil {
    413 		log.Println("logTopic:", err)
    414 		logClose(l)
    415 		return
    416 	}
    417 
    418 	_, err = stmt.Exec(t, id, message)
    419 	if err != nil {
    420 		log.Println("logMessage:", err)
    421 		logClose(l)
    422 		return
    423 	}
    424 }
    425 
    426 func logReceived(l *mqttLogger, message, topic []byte) {
    427 	if l == nil || l.db == nil {
    428 		return
    429 	}
    430 
    431 	logMessage(l, l.insertReceived, message, topic)
    432 }
    433 
    434 func logSent(l *mqttLogger, message, topic []byte) {
    435 	if l == nil || l.db == nil {
    436 		return
    437 	}
    438 
    439 	logMessage(l, l.insertSent, message, topic)
    440 }
    441 
    442 func logTopic(l *mqttLogger, topic []byte) (int64, error) {
    443 	var id int64
    444 	err := l.getTopic.QueryRow(topic).Scan(&id)
    445 
    446 	if err == sql.ErrNoRows {
    447 		res, err := l.insertTopic.Exec(topic)
    448 
    449 		if err != nil {
    450 			return 0, err
    451 		}
    452 
    453 		return res.LastInsertId()
    454 	} else if err != nil {
    455 		return 0, err
    456 	}
    457 
    458 	return id, nil
    459 }
    460 
    461 /**************** MQTT Topic Filter ****************/
    462 
    463 type mqttTopicFilter struct {
    464 	ignored   [][]string
    465 	important [][]string
    466 }
    467 
    468 func addPattern(topics [][]string, newPattern string) [][]string {
    469 	return append(topics, strings.Split(newPattern, "/"))
    470 }
    471 
    472 func delPattern(topics [][]string, toRemove string) [][]string {
    473 	var result [][]string
    474 
    475 	for _, pat := range topics {
    476 		if strings.Join(pat, "/") != toRemove {
    477 			result = append(result, pat)
    478 		}
    479 	}
    480 
    481 	return result
    482 }
    483 
    484 func createTopicFilter(config *IrcConfig) mqttTopicFilter {
    485 	result := mqttTopicFilter{
    486 		ignored:   make([][]string, len(config.Ignored)),
    487 		important: make([][]string, len(config.Important)),
    488 	}
    489 
    490 	for i, s := range config.Ignored {
    491 		result.ignored[i] = strings.Split(s, "/")
    492 	}
    493 
    494 	for i, s := range config.Important {
    495 		result.important[i] = strings.Split(s, "/")
    496 	}
    497 
    498 	return result
    499 }
    500 
    501 func filterAddIgnored(filter *mqttTopicFilter, pattern string) {
    502 	filter.ignored = addPattern(filter.ignored, pattern)
    503 }
    504 
    505 func filterAddImportant(filter *mqttTopicFilter, pattern string) {
    506 	filter.important = addPattern(filter.important, pattern)
    507 }
    508 
    509 func filterDelIgnored(filter *mqttTopicFilter, pattern string) {
    510 	filter.ignored = delPattern(filter.ignored, pattern)
    511 }
    512 
    513 func filterDelImportant(filter *mqttTopicFilter, pattern string) {
    514 	filter.important = delPattern(filter.important, pattern)
    515 }
    516 
    517 func isFiltered(filter *mqttTopicFilter, topic []byte) bool {
    518 	t := strings.Split(string(topic), "/")
    519 
    520 	for _, pattern := range filter.important {
    521 		if topicMatch(t, pattern) {
    522 			return false
    523 		}
    524 	}
    525 
    526 	for _, pattern := range filter.ignored {
    527 		if topicMatch(t, pattern) {
    528 			return true
    529 		}
    530 	}
    531 
    532 	return false
    533 }
    534 
    535 func topicMatch(actual, filter []string) bool {
    536 	if len(filter) == 0 {
    537 		return len(actual) == 0
    538 	}
    539 
    540 	if filter[0] == "#" {
    541 		if len(filter) == 1 {
    542 			return true
    543 		}
    544 
    545 		for i := range actual {
    546 			if topicMatch(actual[i:], filter[1:]) {
    547 				return true
    548 			}
    549 		}
    550 
    551 		return false
    552 	}
    553 
    554 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    555 		return topicMatch(actual[1:], filter[1:])
    556 	}
    557 
    558 	return false
    559 }