main.go (13804B)
1 package main 2 3 import ( 4 "database/sql" 5 "errors" 6 "fmt" 7 "log" 8 "os" 9 "strings" 10 "time" 11 12 _ "github.com/glebarez/go-sqlite" 13 "github.com/go-mqtt/mqtt" 14 "github.com/pelletier/go-toml/v2" 15 "github.com/thoj/go-ircevent" 16 ) 17 18 type IrcConfig struct { 19 Channel string 20 Server string 21 Nick string 22 CmdStart string 23 CmdMid string 24 CmdEnd string 25 SendStart string 26 SendMid string 27 SendEnd string 28 ShowStart string 29 ShowMid string 30 ShowEnd string 31 MaxLine int 32 ContSuffix string 33 ContPrefix string 34 Ignored []string 35 Important []string 36 Verbose bool 37 } 38 39 type LogConfig struct { 40 SqlDriver string 41 SqlConnection string 42 } 43 44 type MqttConfig struct { 45 Server string 46 Session string 47 UserName string 48 Password string 49 TLS bool 50 Keepalive int 51 Topics []string 52 } 53 54 type Config struct { 55 Irc IrcConfig 56 Log LogConfig 57 Mqtt MqttConfig 58 } 59 60 type Msg struct { 61 Topic []byte 62 Message []byte 63 } 64 65 func errMsg(context string, err error) Msg { 66 return Msg{ 67 Topic: []byte("$mqttim/" + context), 68 Message: []byte(err.Error()), 69 } 70 } 71 72 type command struct { 73 name string 74 arg string 75 } 76 77 func readConfig(path string) Config { 78 config := Config{ 79 Irc: IrcConfig{ 80 Nick: "mqttim", 81 CmdStart: "!", 82 CmdMid: " ", 83 SendMid: ": ", 84 ShowMid: ": ", 85 }, 86 Mqtt: MqttConfig{ 87 Topics: []string{"#"}, 88 }, 89 } 90 91 f, err := os.Open("mqttim.toml") 92 if err != nil { 93 log.Fatal(err) 94 } 95 defer f.Close() 96 97 d := toml.NewDecoder(f) 98 err = d.Decode(&config) 99 if err != nil { 100 log.Fatal(err) 101 } 102 103 if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine { 104 config.Irc.ContPrefix = "" 105 config.Irc.ContSuffix = "" 106 } 107 108 return config 109 } 110 111 func dialer(config Config) mqtt.Dialer { 112 if config.Mqtt.TLS { 113 return mqtt.NewTLSDialer("tcp", config.Mqtt.Server, nil) 114 } else { 115 return mqtt.NewDialer("tcp", config.Mqtt.Server) 116 } 117 } 118 119 func main() { 120 var err error 121 var m *mqtt.Client 122 var l *mqttLogger 123 124 cmdQueue := make(chan command, 10) 125 ircQueue := make(chan Msg, 10) 126 127 config := readConfig("mqttim.toml") 128 129 if len(config.Log.SqlDriver) > 0 { 130 db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection) 131 if err != nil { 132 log.Fatal("sql.Open:", err) 133 } 134 135 l, err = logInit(db) 136 if err != nil { 137 log.Fatal("logInit:", err) 138 } 139 log.Println("Logging into", config.Log.SqlConnection) 140 } 141 142 m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{ 143 Dialer: dialer(config), 144 PauseTimeout: 4 * time.Second, 145 UserName: config.Mqtt.UserName, 146 Password: []byte(config.Mqtt.Password), 147 }) 148 if err != nil { 149 log.Fatal("mqtt.VolatileSession:", err) 150 } 151 152 i := irc.IRC(config.Irc.Nick, "mqttim") 153 if config.Irc.Verbose { 154 i.VerboseCallbackHandler = true 155 i.Debug = true 156 } 157 i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) }) 158 i.AddCallback("366", func(e *irc.Event) {}) 159 i.AddCallback("PRIVMSG", func(e *irc.Event) { 160 msg := e.Message() 161 if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) { 162 msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)] 163 name, arg, found := strings.Cut(msg, config.Irc.CmdMid) 164 if !found { 165 name = msg 166 arg = "" 167 } 168 if name == "send" { 169 msg = arg 170 } else { 171 cmdQueue <- command{name: name, arg: arg} 172 return 173 } 174 } 175 if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) { 176 return 177 } 178 msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)] 179 topic, payload, found := strings.Cut(msg, config.Irc.SendMid) 180 if found { 181 logSent(l, []byte(payload), []byte(topic)) 182 if err := m.Publish(nil, []byte(payload), topic); err != nil { 183 ircQueue <- errMsg("Publish", err) 184 } 185 } 186 }) 187 err = i.Connect(config.Irc.Server) 188 if err != nil { 189 log.Fatal("irc.Connect:", err) 190 } 191 go subscribeAll(m, ircQueue, config.Mqtt.Topics) 192 go mqttReader(m, l, ircQueue, &config) 193 go ircSender(&config.Irc, i, ircQueue, cmdQueue) 194 go mqttKeepalive(m, ircQueue, config.Mqtt.Keepalive) 195 i.Loop() 196 } 197 198 func dup(src []byte) []byte { 199 res := make([]byte, len(src)) 200 copy(res, src) 201 return res 202 } 203 204 func mqttKeepalive(m *mqtt.Client, c chan<- Msg, keepalive int) { 205 if keepalive <= 0 { 206 return 207 } 208 209 period := time.Duration(keepalive) * time.Second 210 211 for { 212 time.Sleep(period) 213 214 if err := m.Ping(nil); err != nil { 215 c <- errMsg("mqttPing", err) 216 } 217 } 218 } 219 220 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) { 221 var big *mqtt.BigMessage 222 223 for { 224 message, topic, err := m.ReadSlices() 225 switch { 226 case err == nil: 227 msg := Msg{Topic: dup(topic), Message: dup(message)} 228 logReceived(l, msg.Message, msg.Topic) 229 c <- msg 230 231 case errors.As(err, &big): 232 msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")} 233 logReceived(l, msg.Message, msg.Topic) 234 c <- msg 235 236 case errors.Is(err, mqtt.ErrClosed): 237 log.Println("mqttReader finishing:", err) 238 return 239 240 case mqtt.IsConnectionRefused(err): 241 c <- errMsg("mqttReader2", err) 242 time.Sleep(5 * time.Minute) 243 244 default: 245 c <- errMsg("mqttReader", err) 246 time.Sleep(2 * time.Second) 247 } 248 } 249 } 250 251 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) { 252 var buf strings.Builder 253 f := createTopicFilter(config) 254 255 for { 256 select { 257 case m := <-cm: 258 if !isFiltered(&f, m.Topic) { 259 str := config.ShowStart + 260 string(m.Topic) + 261 config.ShowMid + 262 string(m.Message) + 263 config.ShowEnd 264 ircSend(config, i, str, &buf) 265 } 266 case cmd := <-cc: 267 switch cmd.name { 268 case "filters": 269 ircSendFilters(config, i, &f, &buf) 270 case "help": 271 ircSend(config, i, "Command line:", &buf) 272 ircSendHelp(config, i, &buf, "filters", "") 273 ircSendHelp(config, i, &buf, "help", "") 274 ircSendHelp(config, i, &buf, "ignore", "<topic>") 275 ircSendHelp(config, i, &buf, "important", "<topic>") 276 ircSendHelp(config, i, &buf, "quit", "[message]") 277 ircSendHelp(config, i, &buf, "unignore", "<topic>") 278 ircSendHelp(config, i, &buf, "unimportant", "<topic>") 279 case "ignore": 280 filterAddIgnored(&f, cmd.arg) 281 case "important": 282 filterAddImportant(&f, cmd.arg) 283 case "quit": 284 log.Println("Quit command", cmd.arg) 285 i.QuitMessage = cmd.arg 286 i.Quit() 287 case "unignore": 288 filterDelIgnored(&f, cmd.arg) 289 case "unimportant": 290 filterDelImportant(&f, cmd.arg) 291 default: 292 ircSend(config, i, "Unknown command: "+cmd.name, &buf) 293 } 294 } 295 } 296 } 297 298 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) { 299 if config.MaxLine <= 0 || len(s) < config.MaxLine { 300 i.Privmsg(config.Channel, s) 301 } else { 302 for offset := 0; offset < len(s); { 303 l := len(s) - offset 304 buf.Reset() 305 if offset > 0 { 306 buf.WriteString(config.ContPrefix) 307 } 308 309 if buf.Len()+l <= config.MaxLine { 310 buf.WriteString(s[offset:]) 311 } else { 312 l = config.MaxLine - buf.Len() - len(config.ContSuffix) 313 buf.WriteString(s[offset : offset+l]) 314 buf.WriteString(config.ContSuffix) 315 } 316 317 i.Privmsg(config.Channel, buf.String()) 318 offset += l 319 } 320 } 321 } 322 323 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) { 324 if len(topics) >= 2 { 325 ircSend(config, i, name+" = [", buf) 326 for index, topic := range topics { 327 suffix := "," 328 if index == len(topics)-1 { 329 suffix = " ]" 330 } 331 ircSend(config, i, fmt.Sprintf(" %q%s", strings.Join(topic, "/"), suffix), buf) 332 } 333 } else if len(topics) == 1 { 334 ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf) 335 } else { 336 ircSend(config, i, name+" = []", buf) 337 } 338 } 339 340 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) { 341 ircSendTopicList(config, i, "important", f.important, buf) 342 ircSendTopicList(config, i, "ignored", f.ignored, buf) 343 } 344 345 func ircSendHelp(config *IrcConfig, i *irc.Connection, buf *strings.Builder, cmd, arg string) { 346 if arg == "" { 347 ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdEnd), buf) 348 } else { 349 ircSend(config, i, fmt.Sprintf("- %q", config.CmdStart+cmd+config.CmdMid+arg+config.CmdEnd), buf) 350 } 351 } 352 353 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg, topics []string) { 354 for _, topic := range topics { 355 for { 356 err := m.Subscribe(nil, topic) 357 358 if err != nil { 359 ircQueue <- errMsg("Subscribe", err) 360 time.Sleep(1 * time.Minute) 361 } else { 362 break 363 } 364 } 365 } 366 } 367 368 /**************** MQTT Logger Into SQL ****************/ 369 370 type mqttLogger struct { 371 db *sql.DB 372 getTopic *sql.Stmt 373 insertReceived *sql.Stmt 374 insertSent *sql.Stmt 375 insertTopic *sql.Stmt 376 } 377 378 func logClose(l *mqttLogger) { 379 if l.insertTopic != nil { 380 l.insertTopic.Close() 381 l.insertTopic = nil 382 } 383 if l.insertSent != nil { 384 l.insertSent.Close() 385 l.insertSent = nil 386 } 387 if l.insertReceived != nil { 388 l.insertReceived.Close() 389 l.insertReceived = nil 390 } 391 if l.getTopic != nil { 392 l.getTopic.Close() 393 l.getTopic = nil 394 } 395 if l.db != nil { 396 l.db.Close() 397 l.db = nil 398 } 399 } 400 401 func logInit(db *sql.DB) (*mqttLogger, error) { 402 var err error 403 result := mqttLogger{db: db} 404 405 for _, cmd := range []string{ 406 "CREATE TABLE IF NOT EXISTS topics" + 407 "(id INTEGER PRIMARY KEY AUTOINCREMENT," + 408 " name TEXT NOT NULL);", 409 "CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);", 410 "CREATE TABLE IF NOT EXISTS received" + 411 "(timestamp REAL NOT NULL," + 412 " topic_id INTEGER NOT NULL," + 413 " message TEXT NOT NULL," + 414 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 415 "CREATE TABLE IF NOT EXISTS sent" + 416 "(timestamp REAL NOT NULL," + 417 " topic_id INTEGER NOT NULL," + 418 " message TEXT NOT NULL," + 419 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 420 "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);", 421 "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);", 422 "CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);", 423 "CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);", 424 } { 425 if _, err = result.db.Exec(cmd); err != nil { 426 logClose(&result) 427 return nil, err 428 } 429 } 430 431 result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;") 432 if err != nil { 433 logClose(&result) 434 return nil, err 435 } 436 437 result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") 438 if err != nil { 439 logClose(&result) 440 return nil, err 441 } 442 443 result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);") 444 if err != nil { 445 logClose(&result) 446 return nil, err 447 } 448 449 result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);") 450 if err != nil { 451 logClose(&result) 452 return nil, err 453 } 454 455 return &result, nil 456 } 457 458 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) { 459 t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 460 461 id, err := logTopic(l, topic) 462 if err != nil { 463 log.Println("logTopic:", err) 464 logClose(l) 465 return 466 } 467 468 _, err = stmt.Exec(t, id, message) 469 if err != nil { 470 log.Println("logMessage:", err) 471 logClose(l) 472 return 473 } 474 } 475 476 func logReceived(l *mqttLogger, message, topic []byte) { 477 if l == nil || l.db == nil { 478 return 479 } 480 481 logMessage(l, l.insertReceived, message, topic) 482 } 483 484 func logSent(l *mqttLogger, message, topic []byte) { 485 if l == nil || l.db == nil { 486 return 487 } 488 489 logMessage(l, l.insertSent, message, topic) 490 } 491 492 func logTopic(l *mqttLogger, topic []byte) (int64, error) { 493 var id int64 494 err := l.getTopic.QueryRow(topic).Scan(&id) 495 496 if err == sql.ErrNoRows { 497 res, err := l.insertTopic.Exec(topic) 498 499 if err != nil { 500 return 0, err 501 } 502 503 return res.LastInsertId() 504 } else if err != nil { 505 return 0, err 506 } 507 508 return id, nil 509 } 510 511 /**************** MQTT Topic Filter ****************/ 512 513 type mqttTopicFilter struct { 514 ignored [][]string 515 important [][]string 516 } 517 518 func addPattern(topics [][]string, newPattern string) [][]string { 519 return append(topics, strings.Split(newPattern, "/")) 520 } 521 522 func delPattern(topics [][]string, toRemove string) [][]string { 523 var result [][]string 524 525 for _, pat := range topics { 526 if strings.Join(pat, "/") != toRemove { 527 result = append(result, pat) 528 } 529 } 530 531 return result 532 } 533 534 func createTopicFilter(config *IrcConfig) mqttTopicFilter { 535 result := mqttTopicFilter{ 536 ignored: make([][]string, len(config.Ignored)), 537 important: make([][]string, len(config.Important)), 538 } 539 540 for i, s := range config.Ignored { 541 result.ignored[i] = strings.Split(s, "/") 542 } 543 544 for i, s := range config.Important { 545 result.important[i] = strings.Split(s, "/") 546 } 547 548 return result 549 } 550 551 func filterAddIgnored(filter *mqttTopicFilter, pattern string) { 552 filter.ignored = addPattern(filter.ignored, pattern) 553 } 554 555 func filterAddImportant(filter *mqttTopicFilter, pattern string) { 556 filter.important = addPattern(filter.important, pattern) 557 } 558 559 func filterDelIgnored(filter *mqttTopicFilter, pattern string) { 560 filter.ignored = delPattern(filter.ignored, pattern) 561 } 562 563 func filterDelImportant(filter *mqttTopicFilter, pattern string) { 564 filter.important = delPattern(filter.important, pattern) 565 } 566 567 func isFiltered(filter *mqttTopicFilter, topic []byte) bool { 568 t := strings.Split(string(topic), "/") 569 570 for _, pattern := range filter.important { 571 if topicMatch(t, pattern) { 572 return false 573 } 574 } 575 576 for _, pattern := range filter.ignored { 577 if topicMatch(t, pattern) { 578 return true 579 } 580 } 581 582 return false 583 } 584 585 func topicMatch(actual, filter []string) bool { 586 if len(filter) == 0 { 587 return len(actual) == 0 588 } 589 590 if filter[0] == "#" { 591 if len(filter) == 1 { 592 return true 593 } 594 595 for i := range actual { 596 if topicMatch(actual[i:], filter[1:]) { 597 return true 598 } 599 } 600 601 return false 602 } 603 604 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 605 return topicMatch(actual[1:], filter[1:]) 606 } 607 608 return false 609 }