main.go (12395B)
1 package main 2 3 import ( 4 "database/sql" 5 "errors" 6 "fmt" 7 "log" 8 "os" 9 "strings" 10 "time" 11 12 _ "github.com/glebarez/go-sqlite" 13 "github.com/go-mqtt/mqtt" 14 "github.com/pelletier/go-toml/v2" 15 "github.com/thoj/go-ircevent" 16 ) 17 18 type IrcConfig struct { 19 Channel string 20 Server string 21 Nick string 22 CmdStart string 23 CmdMid string 24 CmdEnd string 25 SendStart string 26 SendMid string 27 SendEnd string 28 ShowStart string 29 ShowMid string 30 ShowEnd string 31 MaxLine int 32 ContSuffix string 33 ContPrefix string 34 Ignored []string 35 Important []string 36 Verbose bool 37 } 38 39 type LogConfig struct { 40 SqlDriver string 41 SqlConnection string 42 } 43 44 type MqttConfig struct { 45 Server string 46 Session string 47 UserName string 48 Password string 49 } 50 51 type Config struct { 52 Irc IrcConfig 53 Log LogConfig 54 Mqtt MqttConfig 55 } 56 57 type Msg struct { 58 Topic []byte 59 Message []byte 60 } 61 62 func errMsg(context string, err error) Msg { 63 return Msg{ 64 Topic: []byte("$mqttim/" + context), 65 Message: []byte(err.Error()), 66 } 67 } 68 69 type command struct { 70 name string 71 arg string 72 } 73 74 func readConfig(path string) Config { 75 config := Config{ 76 Irc: IrcConfig{ 77 Nick: "mqttim", 78 CmdStart: "!", 79 CmdMid: " ", 80 SendMid: ": ", 81 ShowMid: ": ", 82 }, 83 } 84 85 f, err := os.Open("mqttim.toml") 86 if err != nil { 87 log.Fatal(err) 88 } 89 defer f.Close() 90 91 d := toml.NewDecoder(f) 92 err = d.Decode(&config) 93 if err != nil { 94 log.Fatal(err) 95 } 96 97 if config.Irc.MaxLine > 0 && len(config.Irc.ContPrefix)+len(config.Irc.ContSuffix) >= config.Irc.MaxLine { 98 config.Irc.ContPrefix = "" 99 config.Irc.ContSuffix = "" 100 } 101 102 return config 103 } 104 105 func main() { 106 var err error 107 var m *mqtt.Client 108 var l *mqttLogger 109 110 cmdQueue := make(chan command, 10) 111 ircQueue := make(chan Msg, 10) 112 113 config := readConfig("mqttim.toml") 114 115 if len(config.Log.SqlDriver) > 0 { 116 db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection) 117 if err != nil { 118 log.Fatal("sql.Open:", err) 119 } 120 121 l, err = logInit(db) 122 if err != nil { 123 log.Fatal("logInit:", err) 124 } 125 log.Println("Logging into", config.Log.SqlConnection) 126 } 127 128 m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{ 129 Dialer: mqtt.NewDialer("tcp", config.Mqtt.Server), 130 PauseTimeout: 4 * time.Second, 131 UserName: config.Mqtt.UserName, 132 Password: []byte(config.Mqtt.Password), 133 }) 134 if err != nil { 135 log.Fatal("mqtt.VolatileSession:", err) 136 } 137 138 i := irc.IRC(config.Irc.Nick, "mqttim") 139 if config.Irc.Verbose { 140 i.VerboseCallbackHandler = true 141 i.Debug = true 142 } 143 i.AddCallback("001", func(e *irc.Event) { i.Join(config.Irc.Channel) }) 144 i.AddCallback("366", func(e *irc.Event) {}) 145 i.AddCallback("PRIVMSG", func(e *irc.Event) { 146 msg := e.Message() 147 if strings.HasPrefix(msg, config.Irc.CmdStart) && strings.HasSuffix(msg, config.Irc.CmdEnd) { 148 msg = msg[len(config.Irc.CmdStart) : len(msg)-len(config.Irc.CmdEnd)] 149 name, arg, found := strings.Cut(msg, config.Irc.CmdMid) 150 if !found { 151 name = msg 152 arg = "" 153 } 154 if name == "send" { 155 msg = arg 156 } else { 157 cmdQueue <- command{name: name, arg: arg} 158 return 159 } 160 } 161 if !strings.HasPrefix(msg, config.Irc.SendStart) || !strings.HasSuffix(msg, config.Irc.SendEnd) { 162 return 163 } 164 msg = msg[len(config.Irc.SendStart) : len(msg)-len(config.Irc.SendEnd)] 165 topic, payload, found := strings.Cut(msg, config.Irc.SendMid) 166 if found { 167 logSent(l, []byte(payload), []byte(topic)) 168 if err := m.Publish(nil, []byte(payload), topic); err != nil { 169 ircQueue <- errMsg("Publish", err) 170 } 171 } 172 }) 173 err = i.Connect(config.Irc.Server) 174 if err != nil { 175 log.Fatal("irc.Connect:", err) 176 } 177 go subscribeAll(m, ircQueue) 178 go mqttReader(m, l, ircQueue, &config) 179 go ircSender(&config.Irc, i, ircQueue, cmdQueue) 180 i.Loop() 181 } 182 183 func dup(src []byte) []byte { 184 res := make([]byte, len(src)) 185 copy(res, src) 186 return res 187 } 188 189 func mqttReader(m *mqtt.Client, l *mqttLogger, c chan<- Msg, config *Config) { 190 var big *mqtt.BigMessage 191 192 for { 193 message, topic, err := m.ReadSlices() 194 switch { 195 case err == nil: 196 msg := Msg{Topic: dup(topic), Message: dup(message)} 197 logReceived(l, msg.Message, msg.Topic) 198 c <- msg 199 200 case errors.As(err, &big): 201 msg := Msg{Topic: dup(topic), Message: []byte("<Big Message>")} 202 logReceived(l, msg.Message, msg.Topic) 203 c <- msg 204 205 case errors.Is(err, mqtt.ErrClosed): 206 log.Println("mqttReader finishing:", err) 207 return 208 209 case mqtt.IsConnectionRefused(err): 210 c <- errMsg("mqttReader2", err) 211 time.Sleep(5 * time.Minute) 212 213 default: 214 c <- errMsg("mqttReader", err) 215 time.Sleep(2 * time.Second) 216 } 217 } 218 } 219 220 func ircSender(config *IrcConfig, i *irc.Connection, cm <-chan Msg, cc <-chan command) { 221 var buf strings.Builder 222 f := createTopicFilter(config) 223 224 for { 225 select { 226 case m := <-cm: 227 if !isFiltered(&f, m.Topic) { 228 str := config.ShowStart + 229 string(m.Topic) + 230 config.ShowMid + 231 string(m.Message) + 232 config.ShowEnd 233 ircSend(config, i, str, &buf) 234 } 235 case cmd := <-cc: 236 switch cmd.name { 237 case "filters": 238 ircSendFilters(config, i, &f, &buf) 239 case "ignore": 240 filterAddIgnored(&f, cmd.arg) 241 case "important": 242 filterAddImportant(&f, cmd.arg) 243 case "quit": 244 log.Println("Quit command", cmd.arg) 245 i.QuitMessage = cmd.arg 246 i.Quit() 247 case "unignore": 248 filterDelIgnored(&f, cmd.arg) 249 case "unimportant": 250 filterDelImportant(&f, cmd.arg) 251 default: 252 ircSend(config, i, "Unknown command: "+cmd.name, &buf) 253 } 254 } 255 } 256 } 257 258 func ircSend(config *IrcConfig, i *irc.Connection, s string, buf *strings.Builder) { 259 if config.MaxLine <= 0 || len(s) < config.MaxLine { 260 i.Privmsg(config.Channel, s) 261 } else { 262 for offset := 0; offset < len(s); { 263 l := len(s) - offset 264 buf.Reset() 265 if offset > 0 { 266 buf.WriteString(config.ContPrefix) 267 } 268 269 if buf.Len()+l <= config.MaxLine { 270 buf.WriteString(s[offset:]) 271 } else { 272 l = config.MaxLine - buf.Len() - len(config.ContSuffix) 273 buf.WriteString(s[offset : offset+l]) 274 buf.WriteString(config.ContSuffix) 275 } 276 277 i.Privmsg(config.Channel, buf.String()) 278 offset += l 279 } 280 } 281 } 282 283 func ircSendTopicList(config *IrcConfig, i *irc.Connection, name string, topics [][]string, buf *strings.Builder) { 284 if len(topics) >= 2 { 285 ircSend(config, i, name+" = [", buf) 286 for index, topic := range topics { 287 suffix := "," 288 if index == len(topics)-1 { 289 suffix = " ]" 290 } 291 ircSend(config, i, fmt.Sprintf(" %q%s", strings.Join(topic, "/"), suffix), buf) 292 } 293 } else if len(topics) == 1 { 294 ircSend(config, i, fmt.Sprintf("%s = [%q]", name, strings.Join(topics[0], "/")), buf) 295 } else { 296 ircSend(config, i, name+" = []", buf) 297 } 298 } 299 300 func ircSendFilters(config *IrcConfig, i *irc.Connection, f *mqttTopicFilter, buf *strings.Builder) { 301 ircSendTopicList(config, i, "important", f.important, buf) 302 ircSendTopicList(config, i, "ignored", f.ignored, buf) 303 } 304 305 func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg) { 306 for { 307 err := m.Subscribe(nil, "#") 308 309 if err != nil { 310 ircQueue <- errMsg("Subscribe", err) 311 time.Sleep(1 * time.Minute) 312 } else { 313 return 314 } 315 } 316 } 317 318 /**************** MQTT Logger Into SQL ****************/ 319 320 type mqttLogger struct { 321 db *sql.DB 322 getTopic *sql.Stmt 323 insertReceived *sql.Stmt 324 insertSent *sql.Stmt 325 insertTopic *sql.Stmt 326 } 327 328 func logClose(l *mqttLogger) { 329 if l.insertTopic != nil { 330 l.insertTopic.Close() 331 l.insertTopic = nil 332 } 333 if l.insertSent != nil { 334 l.insertSent.Close() 335 l.insertSent = nil 336 } 337 if l.insertReceived != nil { 338 l.insertReceived.Close() 339 l.insertReceived = nil 340 } 341 if l.getTopic != nil { 342 l.getTopic.Close() 343 l.getTopic = nil 344 } 345 if l.db != nil { 346 l.db.Close() 347 l.db = nil 348 } 349 } 350 351 func logInit(db *sql.DB) (*mqttLogger, error) { 352 var err error 353 result := mqttLogger{db: db} 354 355 for _, cmd := range []string{ 356 "CREATE TABLE IF NOT EXISTS topics" + 357 "(id INTEGER PRIMARY KEY AUTOINCREMENT," + 358 " name TEXT NOT NULL);", 359 "CREATE UNIQUE INDEX IF NOT EXISTS i_topics ON topics(name);", 360 "CREATE TABLE IF NOT EXISTS received" + 361 "(timestamp REAL NOT NULL," + 362 " topic_id INTEGER NOT NULL," + 363 " message TEXT NOT NULL," + 364 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 365 "CREATE TABLE IF NOT EXISTS sent" + 366 "(timestamp REAL NOT NULL," + 367 " topic_id INTEGER NOT NULL," + 368 " message TEXT NOT NULL," + 369 " FOREIGN KEY (topic_id) REFERENCES topics (id));", 370 "CREATE INDEX IF NOT EXISTS i_rtime ON received(timestamp);", 371 "CREATE INDEX IF NOT EXISTS i_rtopicid ON received(topic_id);", 372 "CREATE INDEX IF NOT EXISTS i_stime ON sent(timestamp);", 373 "CREATE INDEX IF NOT EXISTS i_stopicid ON sent(topic_id);", 374 } { 375 if _, err = result.db.Exec(cmd); err != nil { 376 logClose(&result) 377 return nil, err 378 } 379 } 380 381 result.getTopic, err = db.Prepare("SELECT id FROM topics WHERE name=?;") 382 if err != nil { 383 logClose(&result) 384 return nil, err 385 } 386 387 result.insertTopic, err = db.Prepare("INSERT OR IGNORE INTO topics(name) VALUES (?);") 388 if err != nil { 389 logClose(&result) 390 return nil, err 391 } 392 393 result.insertSent, err = db.Prepare("INSERT INTO sent (timestamp, topic_id, message) VALUES (?, ?, ?);") 394 if err != nil { 395 logClose(&result) 396 return nil, err 397 } 398 399 result.insertReceived, err = db.Prepare("INSERT INTO received (timestamp, topic_id, message) VALUES (?, ?, ?);") 400 if err != nil { 401 logClose(&result) 402 return nil, err 403 } 404 405 return &result, nil 406 } 407 408 func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) { 409 t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 410 411 id, err := logTopic(l, topic) 412 if err != nil { 413 log.Println("logTopic:", err) 414 logClose(l) 415 return 416 } 417 418 _, err = stmt.Exec(t, id, message) 419 if err != nil { 420 log.Println("logMessage:", err) 421 logClose(l) 422 return 423 } 424 } 425 426 func logReceived(l *mqttLogger, message, topic []byte) { 427 if l == nil || l.db == nil { 428 return 429 } 430 431 logMessage(l, l.insertReceived, message, topic) 432 } 433 434 func logSent(l *mqttLogger, message, topic []byte) { 435 if l == nil || l.db == nil { 436 return 437 } 438 439 logMessage(l, l.insertSent, message, topic) 440 } 441 442 func logTopic(l *mqttLogger, topic []byte) (int64, error) { 443 var id int64 444 err := l.getTopic.QueryRow(topic).Scan(&id) 445 446 if err == sql.ErrNoRows { 447 res, err := l.insertTopic.Exec(topic) 448 449 if err != nil { 450 return 0, err 451 } 452 453 return res.LastInsertId() 454 } else if err != nil { 455 return 0, err 456 } 457 458 return id, nil 459 } 460 461 /**************** MQTT Topic Filter ****************/ 462 463 type mqttTopicFilter struct { 464 ignored [][]string 465 important [][]string 466 } 467 468 func addPattern(topics [][]string, newPattern string) [][]string { 469 return append(topics, strings.Split(newPattern, "/")) 470 } 471 472 func delPattern(topics [][]string, toRemove string) [][]string { 473 var result [][]string 474 475 for _, pat := range topics { 476 if strings.Join(pat, "/") != toRemove { 477 result = append(result, pat) 478 } 479 } 480 481 return result 482 } 483 484 func createTopicFilter(config *IrcConfig) mqttTopicFilter { 485 result := mqttTopicFilter{ 486 ignored: make([][]string, len(config.Ignored)), 487 important: make([][]string, len(config.Important)), 488 } 489 490 for i, s := range config.Ignored { 491 result.ignored[i] = strings.Split(s, "/") 492 } 493 494 for i, s := range config.Important { 495 result.important[i] = strings.Split(s, "/") 496 } 497 498 return result 499 } 500 501 func filterAddIgnored(filter *mqttTopicFilter, pattern string) { 502 filter.ignored = addPattern(filter.ignored, pattern) 503 } 504 505 func filterAddImportant(filter *mqttTopicFilter, pattern string) { 506 filter.important = addPattern(filter.important, pattern) 507 } 508 509 func filterDelIgnored(filter *mqttTopicFilter, pattern string) { 510 filter.ignored = delPattern(filter.ignored, pattern) 511 } 512 513 func filterDelImportant(filter *mqttTopicFilter, pattern string) { 514 filter.important = delPattern(filter.important, pattern) 515 } 516 517 func isFiltered(filter *mqttTopicFilter, topic []byte) bool { 518 t := strings.Split(string(topic), "/") 519 520 for _, pattern := range filter.important { 521 if topicMatch(t, pattern) { 522 return false 523 } 524 } 525 526 for _, pattern := range filter.ignored { 527 if topicMatch(t, pattern) { 528 return true 529 } 530 } 531 532 return false 533 } 534 535 func topicMatch(actual, filter []string) bool { 536 if len(filter) == 0 { 537 return len(actual) == 0 538 } 539 540 if filter[0] == "#" { 541 if len(filter) == 1 { 542 return true 543 } 544 545 for i := range actual { 546 if topicMatch(actual[i:], filter[1:]) { 547 return true 548 } 549 } 550 551 return false 552 } 553 554 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 555 return topicMatch(actual[1:], filter[1:]) 556 } 557 558 return false 559 }