mqttim

MQTT ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/mqttim.git
Log | Files | Refs | README | LICENSE

commit 63f3f773c9662d8e5dace8678a355284bdef20c7
parent ecba883a621a9ec084c99d2f4721c0df86c8cc21
Author: Natasha Kerensikova <natgh@instinctive.eu>
Date:   Fri, 24 Jan 2025 18:49:14 +0000

Error handling is reworked
Diffstat:
Mmain.go | 67+++++++++++++++++++++++++++++++++++++++----------------------------
1 file changed, 39 insertions(+), 28 deletions(-)

diff --git a/main.go b/main.go @@ -4,7 +4,6 @@ import ( "bytes" "database/sql" "errors" - "fmt" "log" "os" "strings" @@ -54,12 +53,19 @@ type Msg struct { Message []byte } -func readConfig(path string, config *Config) (err error) { - var f *os.File - f, err = os.Open("mqttim.toml") +func errMsg(context string, err error) Msg { + return Msg{ + Topic: []byte("$mqttim/" + context), + Message: []byte(err.Error()), + } +} + +func readConfig(path string) Config { + var config Config + + f, err := os.Open("mqttim.toml") if err != nil { log.Fatal(err) - return err } defer f.Close() @@ -69,35 +75,29 @@ func readConfig(path string, config *Config) (err error) { log.Fatal(err) } - return err + return config } func main() { var err error - var config Config var m *mqtt.Client var l *mqttLogger ircQueue := make(chan Msg) - err = readConfig("mqttim.toml", &config) - if err != nil { - return - } + config := readConfig("mqttim.toml") if len(config.Log.SqlDriver) > 0 { db, err := sql.Open(config.Log.SqlDriver, config.Log.SqlConnection) if err != nil { - log.Fatal(err) - return + log.Fatal("sql.Open:", err) } l, err = logInit(db) if err != nil { - log.Fatal(err) - return + log.Fatal("logInit:", err) } - log.Println("Logger ready") + log.Println("Logging into", config.Log.SqlConnection) } m, err = mqtt.VolatileSession(config.Mqtt.Session, &mqtt.Config{ @@ -107,10 +107,8 @@ func main() { Password: []byte(config.Mqtt.Password), }) if err != nil { - log.Fatal(err) - return + log.Fatal("mqtt.VolatileSession:", err) } - go m.Subscribe(nil, "#") i := irc.IRC(config.Irc.Nick, "mqttim") if config.Irc.Verbose { @@ -127,14 +125,16 @@ func main() { topic, payload, found := strings.Cut(msg, config.Irc.CmdMid) if found { logSent(l, []byte(payload), []byte(topic)) - go m.Publish(nil, []byte(payload), topic) + if err := m.Publish(nil, []byte(payload), topic); err != nil { + ircQueue <- errMsg("Publish", err) + } } }) err = i.Connect(config.Irc.Server) if err != nil { - fmt.Printf("Err %s", err) - return + log.Fatal("irc.Connect:", err) } + go subscribeAll(m, ircQueue) go mqtt2irc(m, l, createTopicFilter(&config), ircQueue, &config) go ircSender(&config.Irc, i, ircQueue) i.Loop() @@ -171,13 +171,11 @@ func mqtt2irc(m *mqtt.Client, l *mqttLogger, f mqttTopicFilter, c chan<- Msg, co return case mqtt.IsConnectionRefused(err): - c <- Msg{Topic: []byte("$mqttim/err2"), - Message: []byte(err.Error())} + c <- errMsg("mqtt2irc2", err) time.Sleep(5 * time.Minute) default: - c <- Msg{Topic: []byte("$mqttim/err"), - Message: []byte(err.Error())} + c <- errMsg("mqtt2irc", err) time.Sleep(2 * time.Second) } } @@ -214,6 +212,19 @@ func ircSender(config *IrcConfig, i *irc.Connection, c <-chan Msg) { } } +func subscribeAll(m *mqtt.Client, ircQueue chan<- Msg) { + for { + err := m.Subscribe(nil, "#") + + if err != nil { + ircQueue <- errMsg("Subscribe", err) + time.Sleep(1 * time.Minute) + } else { + return + } + } +} + /**************** MQTT Logger Into SQL ****************/ type mqttLogger struct { @@ -309,14 +320,14 @@ func logMessage(l *mqttLogger, stmt *sql.Stmt, message, topic []byte) { id, err := logTopic(l, topic) if err != nil { - log.Println(err) + log.Println("logTopic:", err) logClose(l) return } _, err = stmt.Exec(t, id, message) if err != nil { - log.Println(err) + log.Println("logMessage:", err) logClose(l) return }