README.md (7827B)
1 # mqttagent 2 3 [](https://casuallymaintained.tech/) 4 5 This is a daemon MQTT client which runs Lua callbacks on received messages. 6 7 It started with a backup graph that did not match the connectivity graph, 8 so some data had to be pushed while other had to be pulled. 9 So I needed machine-to-machine signalling to pull data from a source after it 10 has been pushed completely. 11 12 I thought of MQTT for the signalling, and a kind of 13 cron-but-for-MQTT-messages-instead-of-time to run actions. 14 But then I imagined writing a parser for a crontab-like configuration file, 15 and then writing such a configuration file, so I reconsidered my life choices. 16 17 It turns out that a Lua script is much easier for development (thanks to 18 existing interpreters), for setup (thanks to a friendlier language), and 19 for maintenance (thanks to basic logic not being scattered across a lot 20 of small shell scripts). 21 22 ## Manual 23 24 ### Commands 25 26 The provided commands provide varying levels of extra primitives bound 27 to the Lua environment. 28 29 For now, the generated commands run `mqttagent.lua` in the current 30 directory and stay in the foreground. 31 32 Once the script is initially run, the program runs until all connections 33 and all timers are finished (or the script calls `os.exit`). 34 35 ### MQTT client creation 36 37 ```lua 38 client = mqttclient.new(config) 39 ``` 40 41 It creates a new MQTT client, using a configuration table which 42 is deserialized into [a `go-mqtt/mqtt.Config` 43 structure](https://pkg.go.dev/github.com/go-mqtt/mqtt#Config), 44 with the extra field `connection` used to make the `Dialer`. 45 46 ### MQTT reception callbacks 47 48 The client has a table-like API to set or reset callbacks: 49 50 ```lua 51 -- Set a callback 52 client[topic_filer_string] = callback_function 53 -- Call or query an existing callback 54 client[topic_filter_string](self, message, topic, t) 55 local f = client[topic_filter_string] 56 -- Reset a callback 57 client[topic_filter_string] = nil 58 ``` 59 60 The callbacks are called with four parameters: 61 62 - the client object (`self`); 63 - the received message, as a string; 64 - the topic of the received message, as a string; 65 - the reception time, as a number in seconds (compatible with timers). 66 67 Setting a new callback automatically subscribes to the filter, 68 and resetting the callback automaticall unsubscribes. 69 70 Note that when subscribing to overlapping-but-different filters, messages 71 may be duplicated (i.e. when the overlap makes the broker send the message 72 twice, each callback will be called twice). 73 Since QoS is not supported yet, callbacks should always be ready to handle 74 multiple and missing messages anyway. 75 76 ### Internal event callbacks 77 78 When the connection of a client comes up or down, Lua callback is triggered 79 as if an empty message has been sent on topic `$SYS/self/online` or 80 `$SYS/self/offline`. 81 82 Note that due to being offline, the client should not be used in the offline 83 callback. 84 85 Also note that mqttagent assumes without checking that subscriptions are 86 preserved by the broker when coming back online. 87 The online callback is a good place to resubscribe if needed. 88 89 ### MQTT message sending 90 91 The client has a function-like API to send messages: 92 93 ```lua 94 -- Send a ping to the server 95 client() 96 -- Send a message to the given topic 97 client(message, topic) 98 ``` 99 100 ### Timers 101 102 Timers are created using `timer.new` with a time and a callback: 103 104 ```lua 105 timer_obj = timer.new(t, callback) 106 ``` 107 108 Timers **are NOT** automatically repeated, the callback is called only 109 once, after `t`, and then the timer is destroyed unless explicitly 110 rescheduled: 111 112 ```lua 113 timer_obj:schedule(next_t) 114 ``` 115 116 To make repeating timers, explicitly call `self:schedule` within the 117 callback with the next time. 118 119 When the time given in `timer.new` or `:schedule` is in the past, the 120 callback is called as soon as possible. 121 122 The `:cancel` method is also available to de-schedule a timer without 123 destroying it. 124 125 ## Examples 126 127 ### Simplest client prints one message and leaves 128 129 ```lua 130 -- Create an anonymous client 131 local c = mqttclient.new({ connection = "127.0.0.1:1883" }) 132 133 -- Subscribe to all topics under `test` 134 c["test/#"] = function(self, message, topic) 135 -- Print the received message 136 print(message) 137 -- Unsubscribe 138 c["test/#"] = nil 139 end 140 ``` 141 142 ### Client with LWT, keepalive, timer, and internal events 143 144 ```lua 145 -- Keep alive in seconds 146 local keepalive = 60 147 -- Create the client 148 local c = mqttclient.new{ 149 connection = "127.0.0.1:1883", 150 user_name = "mqttagent", 151 password = "1234", 152 will = { 153 topic = "test/status/mqttagent", 154 message = "Offline", 155 }, 156 keep_alive = keepalive, 157 } 158 159 -- Setup a ping timer (not needed anymore for the keepalive) 160 if keepalive > 0 then 161 -- Create a new timer, dropping the variable (`self` is enough) 162 timer.new(os.time() + keepalive, function(self, t) 163 -- Send a ping 164 c() 165 -- Schedule next keepalive 166 self:schedule(t + keepalive) 167 end) 168 end 169 170 -- Print the next 10 messages send to `test` topic 171 local count = 10 172 c["test"] = function(self, message, topic) 173 print(message) 174 count = count - 1 175 if count <= 0 then 176 os.exit(0) 177 end 178 end 179 180 -- Announce start on (re)connection 181 c["$SYS/self/online"] = function(self) 182 self("Online", "test/status/mqttagent") 183 end 184 -- Pushover alert on deconnection 185 c["$SYS/self/offline"] = function(self) 186 http.post("https://api.pushover.net/1/messages.json", { 187 body = urlencode({ 188 token = "My Pushover Token String", 189 user = "My Pushover User String", 190 message = "Mqttagent Disconnected" 191 }) 192 }) 193 end 194 ``` 195 196 ### One-way MQTT Bridge 197 198 ```lua 199 -- Create the source client 200 local source = mqttclient.new{ 201 connection = "mqtt.example.com:1883", 202 user_name = "mqttagent", 203 password = "1234", 204 } 205 -- Create the destination client 206 local dest = mqttclient.new{ "127.0.0.1:1883" } 207 -- Make the one-way bridge 208 source["#"] = function(self, message, topic) 209 dest(message, topic) 210 end 211 ``` 212 213 ### RRD Update Using JSON Payload Form Tasmota Energy Sensor 214 215 ```lua 216 local json = require("json") 217 local c = mqttclient.new{ connection = "127.0.0.1:1883" } 218 219 function millis(n) 220 if n then 221 return math.floor(n * 1000 + 0.5) 222 else 223 return "U" 224 end 225 end 226 227 c["tele/+/SENSOR"] = function(self, message, topic) 228 -- Deduce file name from middle part of the topic 229 local name = string.sub(topic, 6, -8) 230 -- Sanity check 231 if string.find(name, "[^_%w%-]") then return end 232 -- Decode JSON payload 233 local obj, err = json.decode(message) 234 if err then 235 print(err) 236 return 237 end 238 if not obj.ENERGY then return end 239 240 os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" .. 241 name .. ".rrd\" N:" .. 242 millis(obj.ENERGY.Total) .. 243 ":" .. (obj.ENERGY.Period or "U") .. 244 ":" .. (obj.ENERGY.Power or "U") .. 245 ":" .. millis(obj.ENERGY.Factor) .. 246 ":" .. (obj.ENERGY.Voltage or "U") .. 247 ":" .. millis(obj.ENERGY.Current)) 248 end 249 ``` 250 251 ### Backup Monitoring 252 253 ```lua 254 local c = mqttclient.new{ connection = "127.0.0.1:1883" } 255 -- List of topic suffix to check 256 local backups = { 257 ["dest1/src1"] = 0, 258 ["dest1/src2"] = 0, 259 ["dest2/src2"] = 0, 260 ["dest3/src1"] = 0, 261 } 262 263 -- Check every day at 5:02:42 UTC that all backups have been done 264 function next_check() 265 local tbl = os.date("!*t") 266 tbl.hour = 5 267 tbl.min = 2 268 tbl.sec = 42 269 return os.time(tbl) + 86400 270 end 271 272 timer.new(next_check(), function(self, t) 273 local msg = "" 274 -- Accumulate missing backup names in msg and reset the table 275 for k, v in pairs(backups) do 276 if v == 0 then 277 msg = msg .. (#msg > 0 and ", " or "") .. k 278 end 279 backups[k] = 0 280 end 281 -- Send an alert when a backup is missing 282 if #msg > 0 then 283 c("Missing: " .. msg, "alert/backup") 284 end 285 -- Check again tomorrow 286 self:schedule(t + 86400) 287 end) 288 289 -- Mark backups as seen when notified 290 c["backup/#"] = function(self, _, topic, t) 291 local name = string.sub(topic, 8) 292 if backups[name] then 293 backups[name] = t 294 end 295 end 296 ```