main.go (13876B)
1 package main 2 3 import ( 4 "database/sql" 5 "errors" 6 "fmt" 7 "log" 8 "os" 9 "strings" 10 "time" 11 12 _ "github.com/glebarez/go-sqlite" 13 "github.com/go-mqtt/mqtt" 14 "github.com/pelletier/go-toml/v2" 15 "github.com/thoj/go-ircevent" 16 ) 17 18 type IrcConfig struct { 19 Channel string 20 Server string 21 Nick string 22 CmdStart string 23 CmdMid string 24 CmdEnd string 25 SendStart string 26 SendMid string 27 SendEnd string 28 ShowStart string 29 ShowMid string 30 ShowEnd string 31 MaxLine int 32 ContSuffix string 33 ContPrefix string 34 Ignored []string 35 Important []string 36 Verbose bool 37 } 38 39 type LogConfig struct { 40 SqlDriver string 41 SqlConnection string 42 } 43 44 type MqttConfig struct { 45 Server string 46 Session string 47 UserName string 48 Password string 49 TLS bool 50 Keepalive int 51 Topics []string 52 } 53 54 type Config struct { 55 Irc IrcConfig 56 Log LogConfig 57 Mqtt MqttConfig 58 } 59 60 type Msg struct { 61 Topic []byte 62 Message []byte 63 } 64 65 func errMsg(context string, err error) Msg { 66 return Msg{ 67 Topic: []byte("$mqttim/" + context), 68 Message: []byte(err.Error()), 69 } 70 } 71 72 type command struct { 73 name string 74 arg string 75 } 76 77 func readConfig(path string) Config { 78 config := Config{ 79 Irc: IrcConfig{ 80 Nick: "mqttim", 81 CmdStart: "!", 82 CmdMid: " ", 83 SendMid: ": ", 84 ShowMid: ": ", 85 }, 86 Mqtt: MqttConfig{ 87 Topics: []string{"#"}, 88 }, 89 } 90 91 f, err := os.Open(path) 92 if err != nil { 93 log.Fatal(err) 94 } 95 defer f.Close() 96 97 d := toml.NewDecoder(f) 98 err = d.Decode(&config) 99 if err != nil { 100 log.Fatal(err) 101 } 102 103 if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine { 104 config.Irc.ContPrefix = "" 105 config.Irc.ContSuffix = "" 106 } 107 108 return config 109 } 110 111 func dialer(config Config) mqtt.Dialer { 112 if config.Mqtt.TLS { 113 return mqtt.NewTLSDialer("tcp", config.Mqtt.Server, nil) 114 } else { 115 return mqtt.NewDialer("tcp", config.Mqtt.Server) 116 } 117 } 118 119 func main() { 120 var err error 121 var m *mqtt.Client 122 var l *mqttLogger 123 124 cmdQueue := make(chan command, 10) 125 ircQueue := make(chan Msg, 10) 126 127 config_file := "mqttim.toml" 128 if len(os.Args) > 1 { 129 config_file = os.Args[1] 130 } 131 config := readConfig(config_file) 132 133 if len(config.Log.SqlDriver) > 0 { 134 db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection) 135 if err != nil { 136 log.Fatal("sql.Open:", err) 137 } 138 139 l, err = logInit(db) 140 if err != nil { 141 log.Fatal("logInit:", err) 142 } 143 log.Println("Logging into", config.Log.SqlConnection) 144 } 145 146 m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{ 147 Dialer: dialer(config), 148 PauseTimeout: 4 * time.Second, 149 UserName: config.Mqtt.UserName, 150 Password: []byte(config.Mqtt.Password), 151 }) 152 if err != nil { 153 log.Fatal("mqtt.VolatileSession:", err) 154 } 155 156 i := irc.IRC(config.Irc.Nick, "mqttim") 157 if config.Irc.Verbose { 158 i.VerboseCallbackHandler = true 159 i.Debug = true 160 } 161 i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) }) 162 i.AddCallback("366", func(e *irc.Event) {}) 163 i.AddCallback("PRIVMSG", func(e *irc.Event) { 164 msg := e.Message() 165 if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) { 166 msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)] 167 name, arg, found := strings.Cut(msg, config.Irc.CmdMid) 168 if !found { 169 name = msg 170 arg = "" 171 } 172 if name == "send" { 173 msg = arg 174 } else { 175 cmdQueue <- command{name: name, arg: arg} 176 return 177 } 178 } 179 if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) { 180 return 181 } 182 msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)] 183 topic, payload, found := strings.Cut(msg, config.Irc.SendMid) 184 if found { 185 logSent(l, []byte(payload), []byte(topic)) 186 if err := m.Publish(nil, []byte(payload), topic); err != nil { 187 ircQueue <- errMsg("Publish", err) 188 } 189 } 190 }) 191 err = i.Connect(config.Irc.Server) 192 if err != nil { 193 log.Fatal("irc.Connect:", err) 194 } 195 go subscribeAll(m, ircQueue, config.Mqtt.Topics) 196 go mqttReader(m, l, ircQueue, &config) 197 go ircSender(&config.Irc, i, ircQueue, cmdQueue) 198 go mqttKeepalive(m, ircQueue, config.Mqtt.Keepalive) 199 i.Loop() 200 } 201 202 func dup(src []byte) []byte { 203 res := make([]byte, len(src)) 204 copy(res, src) 205 return res 206 } 207 208 func mqttKeepalive(m *mqtt.Client, c chan<- Msg, keepalive int) { 209 if keepalive <= 0 { 210 return 211 } 212 213 period := time.Duration(keepalive) * time.Second 214 215 for { 216 time.Sleep(period) 217 218 if err := m.Ping(nil); err != nil { 219 c <- errMsg("mqttPing", err) 220 } 221 } 222 } 223 224 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) { 225 var big *mqtt.BigMessage 226 227 for { 228 message, topic, err := m.ReadSlices() 229 switch { 230 case err == nil: 231 msg := Msg{Topic: dup(topic), Message: dup(message)} 232 logReceived(l, msg.Message, msg.Topic) 233 c <- msg 234 235 case errors.As(err, &big): 236 msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")} 237 logReceived(l, msg.Message, msg.Topic) 238 c <- msg 239 240 case errors.Is(err, mqtt.ErrClosed): 241 log.Println("mqttReader finishing:", err) 242 return 243 244 case mqtt.IsConnectionRefused(err): 245 c <- errMsg("mqttReader2", err) 246 time.Sleep(5 * time.Minute) 247 248 default: 249 c <- errMsg("mqttReader", err) 250 time.Sleep(2 * time.Second) 251 } 252 } 253 } 254 255 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) { 256 var buf strings.Builder 257 f := createTopicFilter(config) 258 259 for { 260 select { 261 case m := <-cm: 262 if !isFiltered(&f, m.Topic) { 263 str := config.ShowStart + 264 string(m.Topic) + 265 config.ShowMid + 266 string(m.Message) + 267 config.ShowEnd 268 ircSend(config, i, str, &buf) 269 } 270 case cmd := <-cc: 271 switch cmd.name { 272 case "filters": 273 ircSendFilters(config, i, &f, &buf) 274 case "help": 275 ircSend(config, i, "Command line:", &buf) 276 ircSendHelp(config, i, &buf, "filters", "") 277 ircSendHelp(config, i, &buf, "help", "") 278 ircSendHelp(config, i, &buf, "ignore", "<topic>") 279 ircSendHelp(config, i, &buf, "important", "<topic>") 280 ircSendHelp(config, i, &buf, "quit", "[message]") 281 ircSendHelp(config, i, &buf, "unignore", "<topic>") 282 ircSendHelp(config, i, &buf, "unimportant", "<topic>") 283 case "ignore": 284 filterAddIgnored(&f, cmd.arg) 285 case "important": 286 filterAddImportant(&f, cmd.arg) 287 case "quit": 288 log.Println("Quit command", cmd.arg) 289 i.QuitMessage = cmd.arg 290 i.Quit() 291 case "unignore": 292 filterDelIgnored(&f, cmd.arg) 293 case "unimportant": 294 filterDelImportant(&f, cmd.arg) 295 default: 296 ircSend(config, i, "Unknown command: "+cmd.name, &buf) 297 } 298 } 299 } 300 } 301 302 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) { 303 if config.MaxLine <= 0 || len(s) < config.MaxLine { 304 i.Privmsg(config.Channel, s) 305 } else { 306 for offset := 0; offset < len(s); { 307 l := len(s) - offset 308 buf.Reset() 309 if offset > 0 { 310 buf.WriteString(config.ContPrefix) 311 } 312 313 if buf.Len()+l <= config.MaxLine { 314 buf.WriteString(s[offset:]) 315 } else { 316 l = config.MaxLine - buf.Len() - len(config.ContSuffix) 317 buf.WriteString(s[offset : offset+l]) 318 buf.WriteString(config.ContSuffix) 319 } 320 321 i.Privmsg(config.Channel, buf.String()) 322 offset += l 323 } 324 } 325 } 326 327 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) { 328 if len(topics) >= 2 { 329 ircSend(config, i, name+" = [", buf) 330 for index, topic := range topics { 331 suffix := "," 332 if index == len(topics)-1 { 333 suffix = " ]" 334 } 335 ircSend(config, i, fmt.Sprintf(" %q%s", strings.Join(topic, "/"), suffix), buf) 336 } 337 } else if len(topics) == 1 { 338 ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf) 339 } else { 340 ircSend(config, i, name+" = []", buf) 341 } 342 } 343 344 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) { 345 ircSendTopicList(config, i, "important", f.important, buf) 346 ircSendTopicList(config, i, "ignored", f.ignored, buf) 347 } 348 349 func ircSendHelp(config *IrcConfig, i *irc.Connection, buf *strings.Builder, cmd, arg string) { 350 if arg == "" { 351 ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdEnd), buf) 352 } else { 353 ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdMid+arg+config.CmdEnd), buf) 354 } 355 } 356 357 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg, topics []string) { 358 for _, topic := range topics { 359 for { 360 err := m.Subscribe(nil, topic) 361 362 if err != nil { 363 ircQueue <- errMsg("Subscribe", err) 364 time.Sleep(1 * time.Minute) 365 } else { 366 break 367 } 368 } 369 } 370 } 371 372 /**************** MQTT Logger Into SQL ****************/ 373 374 type mqttLogger struct { 375 db *sql.DB 376 getTopic *sql.Stmt 377 insertReceived *sql.Stmt 378 insertSent *sql.Stmt 379 insertTopic *sql.Stmt 380 } 381 382 func logClose(l *mqttLogger) { 383 if l.insertTopic != nil { 384 l.insertTopic.Close() 385 l.insertTopic = nil 386 } 387 if l.insertSent != nil { 388 l.insertSent.Close() 389 l.insertSent = nil 390 } 391 if l.insertReceived != nil { 392 l.insertReceived.Close() 393 l.insertReceived = nil 394 } 395 if l.getTopic != nil { 396 l.getTopic.Close() 397 l.getTopic = nil 398 } 399 if l.db != nil { 400 l.db.Close() 401 l.db = nil 402 } 403 } 404 405 func logInit(db *sql.DB) (*mqttLogger, error) { 406 var err error 407 result := mqttLogger{db: db} 408 409 for _, cmd := range []string{ 410 "CREATE TABLE IF NOT EXISTS topics" + 411 "(id INTEGER PRIMARY KEY AUTOINCREMENT," + 412 " name TEXT NOT NULL);", 413 "CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);", 414 "CREATE TABLE IF NOT EXISTS received" + 415 "(timestamp REAL NOT NULL," + 416 " topic_id INTEGER NOT NULL," + 417 " message TEXT NOT NULL," + 418 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 419 "CREATE TABLE IF NOT EXISTS sent" + 420 "(timestamp REAL NOT NULL," + 421 " topic_id INTEGER NOT NULL," + 422 " message TEXT NOT NULL," + 423 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 424 "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);", 425 "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);", 426 "CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);", 427 "CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);", 428 } { 429 if _, err = result.db.Exec(cmd); err != nil { 430 logClose(&result) 431 return nil, err 432 } 433 } 434 435 result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;") 436 if err != nil { 437 logClose(&result) 438 return nil, err 439 } 440 441 result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") 442 if err != nil { 443 logClose(&result) 444 return nil, err 445 } 446 447 result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);") 448 if err != nil { 449 logClose(&result) 450 return nil, err 451 } 452 453 result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);") 454 if err != nil { 455 logClose(&result) 456 return nil, err 457 } 458 459 return &result, nil 460 } 461 462 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) { 463 t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 464 465 id, err := logTopic(l, topic) 466 if err != nil { 467 log.Println("logTopic:", err) 468 logClose(l) 469 return 470 } 471 472 _, err = stmt.Exec(t, id, message) 473 if err != nil { 474 log.Println("logMessage:", err) 475 logClose(l) 476 return 477 } 478 } 479 480 func logReceived(l *mqttLogger, message, topic []byte) { 481 if l == nil || l.db == nil { 482 return 483 } 484 485 logMessage(l, l.insertReceived, message, topic) 486 } 487 488 func logSent(l *mqttLogger, message, topic []byte) { 489 if l == nil || l.db == nil { 490 return 491 } 492 493 logMessage(l, l.insertSent, message, topic) 494 } 495 496 func logTopic(l *mqttLogger, topic []byte) (int64, error) { 497 var id int64 498 err := l.getTopic.QueryRow(topic).Scan(&id) 499 500 if err == sql.ErrNoRows { 501 res, err := l.insertTopic.Exec(topic) 502 503 if err != nil { 504 return 0, err 505 } 506 507 return res.LastInsertId() 508 } else if err != nil { 509 return 0, err 510 } 511 512 return id, nil 513 } 514 515 /**************** MQTT Topic Filter ****************/ 516 517 type mqttTopicFilter struct { 518 ignored [][]string 519 important [][]string 520 } 521 522 func addPattern(topics [][]string, newPattern string) [][]string { 523 return append(topics, strings.Split(newPattern, "/")) 524 } 525 526 func delPattern(topics [][]string, toRemove string) [][]string { 527 var result [][]string 528 529 for _, pat := range topics { 530 if strings.Join(pat, "/") != toRemove { 531 result = append(result, pat) 532 } 533 } 534 535 return result 536 } 537 538 func createTopicFilter(config *IrcConfig) mqttTopicFilter { 539 result := mqttTopicFilter{ 540 ignored: make([][]string, len(config.Ignored)), 541 important: make([][]string, len(config.Important)), 542 } 543 544 for i, s := range config.Ignored { 545 result.ignored[i] = strings.Split(s, "/") 546 } 547 548 for i, s := range config.Important { 549 result.important[i] = strings.Split(s, "/") 550 } 551 552 return result 553 } 554 555 func filterAddIgnored(filter *mqttTopicFilter, pattern string) { 556 filter.ignored = addPattern(filter.ignored, pattern) 557 } 558 559 func filterAddImportant(filter *mqttTopicFilter, pattern string) { 560 filter.important = addPattern(filter.important, pattern) 561 } 562 563 func filterDelIgnored(filter *mqttTopicFilter, pattern string) { 564 filter.ignored = delPattern(filter.ignored, pattern) 565 } 566 567 func filterDelImportant(filter *mqttTopicFilter, pattern string) { 568 filter.important = delPattern(filter.important, pattern) 569 } 570 571 func isFiltered(filter *mqttTopicFilter, topic []byte) bool { 572 t := strings.Split(string(topic), "/") 573 574 for _, pattern := range filter.important { 575 if topicMatch(t, pattern) { 576 return false 577 } 578 } 579 580 for _, pattern := range filter.ignored { 581 if topicMatch(t, pattern) { 582 return true 583 } 584 } 585 586 return false 587 } 588 589 func topicMatch(actual, filter []string) bool { 590 if len(filter) == 0 { 591 return len(actual) == 0 592 } 593 594 if filter[0] == "#" { 595 if len(filter) == 1 { 596 return true 597 } 598 599 for i := range actual { 600 if topicMatch(actual[i:], filter[1:]) { 601 return true 602 } 603 } 604 605 return false 606 } 607 608 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 609 return topicMatch(actual[1:], filter[1:]) 610 } 611 612 return false 613 }