README.md (6975B)
1 # mqttagent 2 3 [data:image/s3,"s3://crabby-images/f5a71/f5a71a25008e22601f711c60594ef07221e4e11a" alt="Casual Maintenance Intended"](https://casuallymaintained.tech/) 4 5 This is a daemon MQTT client which runs Lua callbacks on received messages. 6 7 It started with a backup graph that did not match the connectivity graph, 8 so some data had to be pushed while other had to be pulled. 9 So I needed machine-to-machine signalling to pull data from a source after it 10 has been pushed completely. 11 12 I thought of MQTT for the signalling, and a kind of 13 cron-but-for-MQTT-messages-instead-of-time to run actions. 14 But then I imagined writing a parser for a crontab-like configuration file, 15 and then writing such a configuration file, so I reconsidered my life choices. 16 17 It turns out that a Lua script is much easier for development (thanks to 18 existing interpreters), for setup (thanks to a friendlier language), and 19 for maintenance (thanks to basic logic not being scattered across a lot 20 of small shell scripts). 21 22 ## Manual 23 24 ### Commands 25 26 The provided commands provide varying levels of extra primitives bound 27 to the Lua environment. 28 29 For now, the generated commands run `mqttagent.lua` in the current 30 directory and stay in the foreground. 31 32 Once the script is initially run, the program runs until all connections 33 and all timers are finished (or the script calls `os.exit`). 34 35 ### MQTT client creation 36 37 ```lua 38 client = mqttclient.new(config) 39 ``` 40 41 It creates a new MQTT client, using a configuration table which 42 is deserialized into [a `go-mqtt/mqtt.Config` 43 structure](https://pkg.go.dev/github.com/go-mqtt/mqtt#Config), 44 with the extra field `connection` used to make the `Dialer`. 45 46 ### MQTT reception callbacks 47 48 The client has a table-like API to set or reset callbacks: 49 50 ```lua 51 -- Set a callback 52 client[topic_filer_string] = callback_function 53 -- Call or query an existing callback 54 client[topic_filter_string](self, message, topic, t) 55 local f = client[topic_filter_string] 56 -- Reset a callback 57 client[topic_filter_string] = nil 58 ``` 59 60 The callbacks are called with four parameters: 61 62 - the client object (`self`); 63 - the received message, as a string; 64 - the topic of the received message, as a string; 65 - the reception time, as a number in seconds (compatible with timers). 66 67 Setting a new callback automatically subscribes to the filter, 68 and resetting the callback automaticall unsubscribes. 69 70 Note that when subscribing to overlapping-but-different filters, messages 71 may be duplicated (i.e. when the overlap makes the broker send the message 72 twice, each callback will be called twice). 73 Since QoS is not supported yet, callbacks should always be ready to handle 74 multiple and missing messages anyway. 75 76 ### MQTT message sending 77 78 The client has a function-like API to send messages: 79 80 ```lua 81 -- Send a ping to the server 82 client() 83 -- Send a message to the given topic 84 client(message, topic) 85 ``` 86 87 ### Timers 88 89 Timers are created using `timer.new` with a time and a callback: 90 91 ```lua 92 timer_obj = timer.new(t, callback) 93 ``` 94 95 Timers **are NOT** automatically repeated, the callback is called only 96 once, after `t`, and then the timer is destroyed unless explicitly 97 rescheduled: 98 99 ```lua 100 timer_obj:schedule(next_t) 101 ``` 102 103 To make repeating timers, explicitly call `self:schedule` within the 104 callback with the next time. 105 106 When the time given in `timer.new` or `:schedule` is in the past, the 107 callback is called as soon as possible. 108 109 The `:cancel` method is also available to de-schedule a timer without 110 destroying it. 111 112 ## Examples 113 114 ### Simplest client prints one message and leaves 115 116 ```lua 117 -- Create an anonymous client 118 local c = mqttclient.new({ connection = "127.0.0.1:1883" }) 119 120 -- Subscribe to all topics under `test` 121 c["test/#"] = function(self, message, topic) 122 -- Print the received message 123 print(message) 124 -- Unsubscribe 125 c["test/#"] = nil 126 end 127 ``` 128 129 ### Client with LWT, keepalive and timer 130 131 ```lua 132 -- Keep alive in seconds 133 local keepalive = 60 134 -- Create the client 135 local c = mqttclient.new{ 136 connection = "127.0.0.1:1883", 137 user_name = "mqttagent", 138 password = "1234", 139 will = { 140 topic = "test/status/mqttagent", 141 message = "Offline", 142 }, 143 keep_alive = keepalive, 144 } 145 146 -- Setup a ping timer for the keepalive 147 if keepalive > 0 then 148 -- Create a new timer, dropping the variable (`self` is enough) 149 timer.new(os.time() + keepalive, function(self, t) 150 -- Send a ping 151 c() 152 -- Schedule next keepalive 153 self:schedule(t + keepalive) 154 end) 155 end 156 157 -- Print the next 10 messages send to `test` topic 158 local count = 10 159 c["test"] = function(self, message, topic) 160 print(message) 161 count = count - 1 162 if count <= 0 then 163 os.exit(0) 164 end 165 end 166 167 -- Announce start 168 c("Online", "test/status/mqttagent") 169 ``` 170 171 ### One-way MQTT Bridge 172 173 ```lua 174 -- Create the source client 175 local source = mqttclient.new{ 176 connection = "mqtt.example.com:1883", 177 user_name = "mqttagent", 178 password = "1234", 179 } 180 -- Create the destination client 181 local dest = mqttclient.new{ "127.0.0.1:1883" } 182 -- Make the one-way bridge 183 source["#"] = function(self, message, topic) 184 dest(message, topic) 185 end 186 ``` 187 188 ### RRD Update Using JSON Payload Form Tasmota Energy Sensor 189 190 ```lua 191 local json = require("json") 192 local c = mqttclient.new{ connection = "127.0.0.1:1883" } 193 194 function millis(n) 195 if n then 196 return math.floor(n * 1000 + 0.5) 197 else 198 return "U" 199 end 200 end 201 202 c["tele/+/SENSOR"] = function(self, message, topic) 203 -- Deduce file name from middle part of the topic 204 local name = string.sub(topic, 6, -8) 205 -- Sanity check 206 if string.find(name, "[^_%w%-]") then return end 207 -- Decode JSON payload 208 local obj, err = json.decode(message) 209 if err then 210 print(err) 211 return 212 end 213 if not obj.ENERGY then return end 214 215 os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" .. 216 name .. ".rrd\" N:" .. 217 millis(obj.ENERGY.Total) .. 218 ":" .. (obj.ENERGY.Period or "U") .. 219 ":" .. (obj.ENERGY.Power or "U") .. 220 ":" .. millis(obj.ENERGY.Factor) .. 221 ":" .. (obj.ENERGY.Voltage or "U") .. 222 ":" .. millis(obj.ENERGY.Current)) 223 end 224 ``` 225 226 ### Backup Monitoring 227 228 ```lua 229 local c = mqttclient.new{ connection = "127.0.0.1:1883" } 230 -- List of topic suffix to check 231 local backups = { 232 ["dest1/src1"] = 0, 233 ["dest1/src2"] = 0, 234 ["dest2/src2"] = 0, 235 ["dest3/src1"] = 0, 236 } 237 238 -- Check every day at 5:02:42 UTC that all backups have been done 239 function next_check() 240 local tbl = os.date("!*t") 241 tbl.hour = 5 242 tbl.min = 2 243 tbl.sec = 42 244 return os.time(tbl) + 86400 245 end 246 247 timer.new(next_check(), function(self, t) 248 local msg = "" 249 -- Accumulate missing backup names in msg and reset the table 250 for k, v in pairs(backups) do 251 if v == 0 then 252 msg = msg .. (#msg > 0 and ", " or "") .. k 253 end 254 backups[k] = 0 255 end 256 -- Send an alert when a backup is missing 257 if #msg > 0 then 258 c("Missing: " .. msg, "alert/backup") 259 end 260 -- Check again tomorrow 261 self:schedule(t + 86400) 262 end) 263 264 -- Mark backups as seen when notified 265 c["backup/#"] = function(self, _, topic, t) 266 local name = string.sub(topic, 8) 267 if backups[name] then 268 backups[name] = t 269 end 270 end 271 ```