main.go (14651B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package main 18 19 import ( 20 "database/sql" 21 "errors" 22 "fmt" 23 "log" 24 "os" 25 "strings" 26 "time" 27 28 _ "github.com/glebarez/go-sqlite" 29 "github.com/go-mqtt/mqtt" 30 "github.com/pelletier/go-toml/v2" 31 "github.com/thoj/go-ircevent" 32 ) 33 34 type IrcConfig struct { 35 Channel string 36 Server string 37 Nick string 38 CmdStart string 39 CmdMid string 40 CmdEnd string 41 SendStart string 42 SendMid string 43 SendEnd string 44 ShowStart string 45 ShowMid string 46 ShowEnd string 47 MaxLine int 48 ContSuffix string 49 ContPrefix string 50 Ignored []string 51 Important []string 52 Verbose bool 53 } 54 55 type LogConfig struct { 56 SqlDriver string 57 SqlConnection string 58 } 59 60 type MqttConfig struct { 61 Server string 62 Session string 63 UserName string 64 Password string 65 TLS bool 66 Keepalive int 67 Topics []string 68 } 69 70 type Config struct { 71 Irc IrcConfig 72 Log LogConfig 73 Mqtt MqttConfig 74 } 75 76 type Msg struct { 77 Topic []byte 78 Message []byte 79 } 80 81 func errMsg(context string, err error) Msg { 82 return Msg{ 83 Topic: []byte("$mqttim/" + context), 84 Message: []byte(err.Error()), 85 } 86 } 87 88 type command struct { 89 name string 90 arg string 91 } 92 93 func readConfig(path string) Config { 94 config := Config{ 95 Irc: IrcConfig{ 96 Nick: "mqttim", 97 CmdStart: "!", 98 CmdMid: " ", 99 SendMid: ": ", 100 ShowMid: ": ", 101 }, 102 Mqtt: MqttConfig{ 103 Topics: []string{"#"}, 104 }, 105 } 106 107 f, err := os.Open(path) 108 if err != nil { 109 log.Fatal(err) 110 } 111 defer f.Close() 112 113 d := toml.NewDecoder(f) 114 err = d.Decode(&config) 115 if err != nil { 116 log.Fatal(err) 117 } 118 119 if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine { 120 config.Irc.ContPrefix = "" 121 config.Irc.ContSuffix = "" 122 } 123 124 return config 125 } 126 127 func dialer(config Config) mqtt.Dialer { 128 if config.Mqtt.TLS { 129 return mqtt.NewTLSDialer("tcp", config.Mqtt.Server, nil) 130 } else { 131 return mqtt.NewDialer("tcp", config.Mqtt.Server) 132 } 133 } 134 135 func main() { 136 var err error 137 var m *mqtt.Client 138 var l *mqttLogger 139 140 cmdQueue := make(chan command, 10) 141 ircQueue := make(chan Msg, 10) 142 143 config_file := "mqttim.toml" 144 if len(os.Args) > 1 { 145 config_file = os.Args[1] 146 } 147 config := readConfig(config_file) 148 149 if len(config.Log.SqlDriver) > 0 { 150 db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection) 151 if err != nil { 152 log.Fatal("sql.Open:", err) 153 } 154 155 l, err = logInit(db) 156 if err != nil { 157 log.Fatal("logInit:", err) 158 } 159 log.Println("Logging into", config.Log.SqlConnection) 160 } 161 162 m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{ 163 Dialer: dialer(config), 164 PauseTimeout: 4 * time.Second, 165 UserName: config.Mqtt.UserName, 166 Password: []byte(config.Mqtt.Password), 167 }) 168 if err != nil { 169 log.Fatal("mqtt.VolatileSession:", err) 170 } 171 172 i := irc.IRC(config.Irc.Nick, "mqttim") 173 if config.Irc.Verbose { 174 i.VerboseCallbackHandler = true 175 i.Debug = true 176 } 177 i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) }) 178 i.AddCallback("366", func(e *irc.Event) {}) 179 i.AddCallback("PRIVMSG", func(e *irc.Event) { 180 msg := e.Message() 181 if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) { 182 msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)] 183 name, arg, found := strings.Cut(msg, config.Irc.CmdMid) 184 if !found { 185 name = msg 186 arg = "" 187 } 188 if name == "send" { 189 msg = arg 190 } else { 191 cmdQueue <- command{name: name, arg: arg} 192 return 193 } 194 } 195 if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) { 196 return 197 } 198 msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)] 199 topic, payload, found := strings.Cut(msg, config.Irc.SendMid) 200 if found { 201 logSent(l, []byte(payload), []byte(topic)) 202 if err := m.Publish(nil, []byte(payload), topic); err != nil { 203 ircQueue <- errMsg("Publish", err) 204 } 205 } 206 }) 207 err = i.Connect(config.Irc.Server) 208 if err != nil { 209 log.Fatal("irc.Connect:", err) 210 } 211 go subscribeAll(m, ircQueue, config.Mqtt.Topics) 212 go mqttReader(m, l, ircQueue, &config) 213 go ircSender(&config.Irc, i, ircQueue, cmdQueue) 214 go mqttKeepalive(m, ircQueue, config.Mqtt.Keepalive) 215 i.Loop() 216 } 217 218 func dup(src []byte) []byte { 219 res := make([]byte, len(src)) 220 copy(res, src) 221 return res 222 } 223 224 func mqttKeepalive(m *mqtt.Client, c chan<- Msg, keepalive int) { 225 if keepalive <= 0 { 226 return 227 } 228 229 period := time.Duration(keepalive) * time.Second 230 231 for { 232 time.Sleep(period) 233 234 if err := m.Ping(nil); err != nil { 235 c <- errMsg("mqttPing", err) 236 } 237 } 238 } 239 240 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) { 241 var big *mqtt.BigMessage 242 243 for { 244 message, topic, err := m.ReadSlices() 245 switch { 246 case err == nil: 247 msg := Msg{Topic: dup(topic), Message: dup(message)} 248 logReceived(l, msg.Message, msg.Topic) 249 c <- msg 250 251 case errors.As(err, &big): 252 msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")} 253 logReceived(l, msg.Message, msg.Topic) 254 c <- msg 255 256 case errors.Is(err, mqtt.ErrClosed): 257 log.Println("mqttReader finishing:", err) 258 return 259 260 case mqtt.IsConnectionRefused(err): 261 c <- errMsg("mqttReader2", err) 262 time.Sleep(5 * time.Minute) 263 264 default: 265 c <- errMsg("mqttReader", err) 266 time.Sleep(2 * time.Second) 267 } 268 } 269 } 270 271 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) { 272 var buf strings.Builder 273 f := createTopicFilter(config) 274 275 for { 276 select { 277 case m := <-cm: 278 if !isFiltered(&f, m.Topic) { 279 str := config.ShowStart + 280 string(m.Topic) + 281 config.ShowMid + 282 string(m.Message) + 283 config.ShowEnd 284 ircSend(config, i, str, &buf) 285 } 286 case cmd := <-cc: 287 switch cmd.name { 288 case "filters": 289 ircSendFilters(config, i, &f, &buf) 290 case "help": 291 ircSend(config, i, "Command line:", &buf) 292 ircSendHelp(config, i, &buf, "filters", "") 293 ircSendHelp(config, i, &buf, "help", "") 294 ircSendHelp(config, i, &buf, "ignore", "<topic>") 295 ircSendHelp(config, i, &buf, "important", "<topic>") 296 ircSendHelp(config, i, &buf, "quit", "[message]") 297 ircSendHelp(config, i, &buf, "unignore", "<topic>") 298 ircSendHelp(config, i, &buf, "unimportant", "<topic>") 299 case "ignore": 300 filterAddIgnored(&f, cmd.arg) 301 case "important": 302 filterAddImportant(&f, cmd.arg) 303 case "quit": 304 log.Println("Quit command", cmd.arg) 305 i.QuitMessage = cmd.arg 306 i.Quit() 307 case "unignore": 308 filterDelIgnored(&f, cmd.arg) 309 case "unimportant": 310 filterDelImportant(&f, cmd.arg) 311 default: 312 ircSend(config, i, "Unknown command: "+cmd.name, &buf) 313 } 314 } 315 } 316 } 317 318 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) { 319 if config.MaxLine <= 0 || len(s) < config.MaxLine { 320 i.Privmsg(config.Channel, s) 321 } else { 322 for offset := 0; offset < len(s); { 323 l := len(s) - offset 324 buf.Reset() 325 if offset > 0 { 326 buf.WriteString(config.ContPrefix) 327 } 328 329 if buf.Len()+l <= config.MaxLine { 330 buf.WriteString(s[offset:]) 331 } else { 332 l = config.MaxLine - buf.Len() - len(config.ContSuffix) 333 buf.WriteString(s[offset : offset+l]) 334 buf.WriteString(config.ContSuffix) 335 } 336 337 i.Privmsg(config.Channel, buf.String()) 338 offset += l 339 } 340 } 341 } 342 343 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) { 344 if len(topics) >= 2 { 345 ircSend(config, i, name+" = [", buf) 346 for index, topic := range topics { 347 suffix := "," 348 if index == len(topics)-1 { 349 suffix = " ]" 350 } 351 ircSend(config, i, fmt.Sprintf(" %q%s", strings.Join(topic, "/"), suffix), buf) 352 } 353 } else if len(topics) == 1 { 354 ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf) 355 } else { 356 ircSend(config, i, name+" = []", buf) 357 } 358 } 359 360 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) { 361 ircSendTopicList(config, i, "important", f.important, buf) 362 ircSendTopicList(config, i, "ignored", f.ignored, buf) 363 } 364 365 func ircSendHelp(config *IrcConfig, i *irc.Connection, buf *strings.Builder, cmd, arg string) { 366 if arg == "" { 367 ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdEnd), buf) 368 } else { 369 ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdMid+arg+config.CmdEnd), buf) 370 } 371 } 372 373 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg, topics []string) { 374 for _, topic := range topics { 375 for { 376 err := m.Subscribe(nil, topic) 377 378 if err != nil { 379 ircQueue <- errMsg("Subscribe", err) 380 time.Sleep(1 * time.Minute) 381 } else { 382 break 383 } 384 } 385 } 386 } 387 388 /**************** MQTT Logger Into SQL ****************/ 389 390 type mqttLogger struct { 391 db *sql.DB 392 getTopic *sql.Stmt 393 insertReceived *sql.Stmt 394 insertSent *sql.Stmt 395 insertTopic *sql.Stmt 396 } 397 398 func logClose(l *mqttLogger) { 399 if l.insertTopic != nil { 400 l.insertTopic.Close() 401 l.insertTopic = nil 402 } 403 if l.insertSent != nil { 404 l.insertSent.Close() 405 l.insertSent = nil 406 } 407 if l.insertReceived != nil { 408 l.insertReceived.Close() 409 l.insertReceived = nil 410 } 411 if l.getTopic != nil { 412 l.getTopic.Close() 413 l.getTopic = nil 414 } 415 if l.db != nil { 416 l.db.Close() 417 l.db = nil 418 } 419 } 420 421 func logInit(db *sql.DB) (*mqttLogger, error) { 422 var err error 423 result := mqttLogger{db: db} 424 425 for _, cmd := range []string{ 426 "CREATE TABLE IF NOT EXISTS topics" + 427 "(id INTEGER PRIMARY KEY AUTOINCREMENT," + 428 " name TEXT NOT NULL);", 429 "CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);", 430 "CREATE TABLE IF NOT EXISTS received" + 431 "(timestamp REAL NOT NULL," + 432 " topic_id INTEGER NOT NULL," + 433 " message TEXT NOT NULL," + 434 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 435 "CREATE TABLE IF NOT EXISTS sent" + 436 "(timestamp REAL NOT NULL," + 437 " topic_id INTEGER NOT NULL," + 438 " message TEXT NOT NULL," + 439 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 440 "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);", 441 "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);", 442 "CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);", 443 "CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);", 444 } { 445 if _, err = result.db.Exec(cmd); err != nil { 446 logClose(&result) 447 return nil, err 448 } 449 } 450 451 result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;") 452 if err != nil { 453 logClose(&result) 454 return nil, err 455 } 456 457 result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") 458 if err != nil { 459 logClose(&result) 460 return nil, err 461 } 462 463 result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);") 464 if err != nil { 465 logClose(&result) 466 return nil, err 467 } 468 469 result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);") 470 if err != nil { 471 logClose(&result) 472 return nil, err 473 } 474 475 return &result, nil 476 } 477 478 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) { 479 t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 480 481 id, err := logTopic(l, topic) 482 if err != nil { 483 log.Println("logTopic:", err) 484 logClose(l) 485 return 486 } 487 488 _, err = stmt.Exec(t, id, message) 489 if err != nil { 490 log.Println("logMessage:", err) 491 logClose(l) 492 return 493 } 494 } 495 496 func logReceived(l *mqttLogger, message, topic []byte) { 497 if l == nil || l.db == nil { 498 return 499 } 500 501 logMessage(l, l.insertReceived, message, topic) 502 } 503 504 func logSent(l *mqttLogger, message, topic []byte) { 505 if l == nil || l.db == nil { 506 return 507 } 508 509 logMessage(l, l.insertSent, message, topic) 510 } 511 512 func logTopic(l *mqttLogger, topic []byte) (int64, error) { 513 var id int64 514 err := l.getTopic.QueryRow(topic).Scan(&id) 515 516 if err == sql.ErrNoRows { 517 res, err := l.insertTopic.Exec(topic) 518 519 if err != nil { 520 return 0, err 521 } 522 523 return res.LastInsertId() 524 } else if err != nil { 525 return 0, err 526 } 527 528 return id, nil 529 } 530 531 /**************** MQTT Topic Filter ****************/ 532 533 type mqttTopicFilter struct { 534 ignored [][]string 535 important [][]string 536 } 537 538 func addPattern(topics [][]string, newPattern string) [][]string { 539 return append(topics, strings.Split(newPattern, "/")) 540 } 541 542 func delPattern(topics [][]string, toRemove string) [][]string { 543 var result [][]string 544 545 for _, pat := range topics { 546 if strings.Join(pat, "/") != toRemove { 547 result = append(result, pat) 548 } 549 } 550 551 return result 552 } 553 554 func createTopicFilter(config *IrcConfig) mqttTopicFilter { 555 result := mqttTopicFilter{ 556 ignored: make([][]string, len(config.Ignored)), 557 important: make([][]string, len(config.Important)), 558 } 559 560 for i, s := range config.Ignored { 561 result.ignored[i] = strings.Split(s, "/") 562 } 563 564 for i, s := range config.Important { 565 result.important[i] = strings.Split(s, "/") 566 } 567 568 return result 569 } 570 571 func filterAddIgnored(filter *mqttTopicFilter, pattern string) { 572 filter.ignored = addPattern(filter.ignored, pattern) 573 } 574 575 func filterAddImportant(filter *mqttTopicFilter, pattern string) { 576 filter.important = addPattern(filter.important, pattern) 577 } 578 579 func filterDelIgnored(filter *mqttTopicFilter, pattern string) { 580 filter.ignored = delPattern(filter.ignored, pattern) 581 } 582 583 func filterDelImportant(filter *mqttTopicFilter, pattern string) { 584 filter.important = delPattern(filter.important, pattern) 585 } 586 587 func isFiltered(filter *mqttTopicFilter, topic []byte) bool { 588 t := strings.Split(string(topic), "/") 589 590 for _, pattern := range filter.important { 591 if topicMatch(t, pattern) { 592 return false 593 } 594 } 595 596 for _, pattern := range filter.ignored { 597 if topicMatch(t, pattern) { 598 return true 599 } 600 } 601 602 return false 603 } 604 605 func topicMatch(actual, filter []string) bool { 606 if len(filter) == 0 { 607 return len(actual) == 0 608 } 609 610 if filter[0] == "#" { 611 if len(filter) == 1 { 612 return true 613 } 614 615 for i := range actual { 616 if topicMatch(actual[i:], filter[1:]) { 617 return true 618 } 619 } 620 621 return false 622 } 623 624 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 625 return topicMatch(actual[1:], filter[1:]) 626 } 627 628 return false 629 }