README.md (7444B)
1 # mqttagent 2 3 [](https://casuallymaintained.tech/) 4 5 This is a daemon MQTT client which runs Lua callbacks on received messages. 6 7 It started with a backup graph that did not match the connectivity graph, 8 so some data had to be pushed while other had to be pulled. 9 So I needed machine-to-machine signalling to pull data from a source after it 10 has been pushed completely. 11 12 I thought of MQTT for the signalling, and a kind of 13 cron-but-for-MQTT-messages-instead-of-time to run actions. 14 But then I imagined writing a parser for a crontab-like configuration file, 15 and then writing such a configuration file, so I reconsidered my life choices. 16 17 It turns out that a Lua script is much easier for development (thanks to 18 existing interpreters), for setup (thanks to a friendlier language), and 19 for maintenance (thanks to basic logic not being scattered across a lot 20 of small shell scripts). 21 22 ## Manual 23 24 ### Commands 25 26 The provided commands provide varying levels of extra primitives bound 27 to the Lua environment. 28 29 For now, the generated commands run `mqttagent.lua` in the current 30 directory and stay in the foreground. 31 32 Once the script is initially run, the program runs until all connections 33 and all timers are finished (or the script calls `os.exit`). 34 35 ### MQTT client creation 36 37 ```lua 38 client = mqttclient.new(config) 39 ``` 40 41 It creates a new MQTT client, using a configuration table which 42 is deserialized into [a `go-mqtt/mqtt.Config` 43 structure](https://pkg.go.dev/github.com/go-mqtt/mqtt#Config), 44 with the extra field `connection` used to make the `Dialer`. 45 46 ### MQTT reception callbacks 47 48 The client has a table-like API to set or reset callbacks: 49 50 ```lua 51 -- Set a callback 52 client[topic_filer_string] = callback_function 53 -- Call or query an existing callback 54 client[topic_filter_string](self, message, topic, t) 55 local f = client[topic_filter_string] 56 -- Reset a callback 57 client[topic_filter_string] = nil 58 ``` 59 60 The callbacks are called with four parameters: 61 62 - the client object (`self`); 63 - the received message, as a string; 64 - the topic of the received message, as a string; 65 - the reception time, as a number in seconds (compatible with timers). 66 67 Setting a new callback automatically subscribes to the filter, 68 and resetting the callback automaticall unsubscribes. 69 70 Note that when subscribing to overlapping-but-different filters, messages 71 may be duplicated (i.e. when the overlap makes the broker send the message 72 twice, each callback will be called twice). 73 Since QoS is not supported yet, callbacks should always be ready to handle 74 multiple and missing messages anyway. 75 76 ### Internal event callbacks 77 78 When the connection of a client comes up or down, Lua callback is triggered 79 as if an empty message has been sent on topic `$SYS/self/online` or 80 `$SYS/self/offline`. 81 82 Note that due to being offline, the client should not be used in the offline 83 callback. 84 85 Also note that mqttagent assumes without checking that subscriptions are 86 preserved by the broker when coming back online. 87 The online callback is a good place to resubscribe if needed. 88 89 ### MQTT message sending 90 91 The client has a function-like API to send messages: 92 93 ```lua 94 -- Send a ping to the server 95 client() 96 -- Send a message to the given topic 97 client(message, topic) 98 ``` 99 100 ### Timers 101 102 Timers are created using `timer.new` with a time and a callback: 103 104 ```lua 105 timer_obj = timer.new(t, callback) 106 ``` 107 108 Timers **are NOT** automatically repeated, the callback is called only 109 once, after `t`, and then the timer is destroyed unless explicitly 110 rescheduled: 111 112 ```lua 113 timer_obj:schedule(next_t) 114 ``` 115 116 To make repeating timers, explicitly call `self:schedule` within the 117 callback with the next time. 118 119 When the time given in `timer.new` or `:schedule` is in the past, the 120 callback is called as soon as possible. 121 122 The `:cancel` method is also available to de-schedule a timer without 123 destroying it. 124 125 ## Examples 126 127 ### Simplest client prints one message and leaves 128 129 ```lua 130 -- Create an anonymous client 131 local c = mqttclient.new({ connection = "127.0.0.1:1883" }) 132 133 -- Subscribe to all topics under `test` 134 c["test/#"] = function(self, message, topic) 135 -- Print the received message 136 print(message) 137 -- Unsubscribe 138 c["test/#"] = nil 139 end 140 ``` 141 142 ### Client with LWT, keepalive and timer 143 144 ```lua 145 -- Keep alive in seconds 146 local keepalive = 60 147 -- Create the client 148 local c = mqttclient.new{ 149 connection = "127.0.0.1:1883", 150 user_name = "mqttagent", 151 password = "1234", 152 will = { 153 topic = "test/status/mqttagent", 154 message = "Offline", 155 }, 156 keep_alive = keepalive, 157 } 158 159 -- Setup a ping timer for the keepalive 160 if keepalive > 0 then 161 -- Create a new timer, dropping the variable (`self` is enough) 162 timer.new(os.time() + keepalive, function(self, t) 163 -- Send a ping 164 c() 165 -- Schedule next keepalive 166 self:schedule(t + keepalive) 167 end) 168 end 169 170 -- Print the next 10 messages send to `test` topic 171 local count = 10 172 c["test"] = function(self, message, topic) 173 print(message) 174 count = count - 1 175 if count <= 0 then 176 os.exit(0) 177 end 178 end 179 180 -- Announce start 181 c("Online", "test/status/mqttagent") 182 ``` 183 184 ### One-way MQTT Bridge 185 186 ```lua 187 -- Create the source client 188 local source = mqttclient.new{ 189 connection = "mqtt.example.com:1883", 190 user_name = "mqttagent", 191 password = "1234", 192 } 193 -- Create the destination client 194 local dest = mqttclient.new{ "127.0.0.1:1883" } 195 -- Make the one-way bridge 196 source["#"] = function(self, message, topic) 197 dest(message, topic) 198 end 199 ``` 200 201 ### RRD Update Using JSON Payload Form Tasmota Energy Sensor 202 203 ```lua 204 local json = require("json") 205 local c = mqttclient.new{ connection = "127.0.0.1:1883" } 206 207 function millis(n) 208 if n then 209 return math.floor(n * 1000 + 0.5) 210 else 211 return "U" 212 end 213 end 214 215 c["tele/+/SENSOR"] = function(self, message, topic) 216 -- Deduce file name from middle part of the topic 217 local name = string.sub(topic, 6, -8) 218 -- Sanity check 219 if string.find(name, "[^_%w%-]") then return end 220 -- Decode JSON payload 221 local obj, err = json.decode(message) 222 if err then 223 print(err) 224 return 225 end 226 if not obj.ENERGY then return end 227 228 os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" .. 229 name .. ".rrd\" N:" .. 230 millis(obj.ENERGY.Total) .. 231 ":" .. (obj.ENERGY.Period or "U") .. 232 ":" .. (obj.ENERGY.Power or "U") .. 233 ":" .. millis(obj.ENERGY.Factor) .. 234 ":" .. (obj.ENERGY.Voltage or "U") .. 235 ":" .. millis(obj.ENERGY.Current)) 236 end 237 ``` 238 239 ### Backup Monitoring 240 241 ```lua 242 local c = mqttclient.new{ connection = "127.0.0.1:1883" } 243 -- List of topic suffix to check 244 local backups = { 245 ["dest1/src1"] = 0, 246 ["dest1/src2"] = 0, 247 ["dest2/src2"] = 0, 248 ["dest3/src1"] = 0, 249 } 250 251 -- Check every day at 5:02:42 UTC that all backups have been done 252 function next_check() 253 local tbl = os.date("!*t") 254 tbl.hour = 5 255 tbl.min = 2 256 tbl.sec = 42 257 return os.time(tbl) + 86400 258 end 259 260 timer.new(next_check(), function(self, t) 261 local msg = "" 262 -- Accumulate missing backup names in msg and reset the table 263 for k, v in pairs(backups) do 264 if v == 0 then 265 msg = msg .. (#msg > 0 and ", " or "") .. k 266 end 267 backups[k] = 0 268 end 269 -- Send an alert when a backup is missing 270 if #msg > 0 then 271 c("Missing: " .. msg, "alert/backup") 272 end 273 -- Check again tomorrow 274 self:schedule(t + 86400) 275 end) 276 277 -- Mark backups as seen when notified 278 c["backup/#"] = function(self, _, topic, t) 279 local name = string.sub(topic, 8) 280 if backups[name] then 281 backups[name] = t 282 end 283 end 284 ```