mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

README.md (7827B)


      1 # mqttagent
      2 
      3 [![Casual Maintenance Intended](https://casuallymaintained.tech/badge.svg)](https://casuallymaintained.tech/)
      4 
      5 This is a daemon MQTT client which runs Lua callbacks on received messages.
      6 
      7 It started with a backup graph that did not match the connectivity graph,
      8 so some data had to be pushed while other had to be pulled.
      9 So I needed machine-to-machine signalling to pull data from a source after it
     10 has been pushed completely.
     11 
     12 I thought of MQTT for the signalling, and a kind of
     13 cron-but-for-MQTT-messages-instead-of-time to run actions.
     14 But then I imagined writing a parser for a crontab-like configuration file,
     15 and then writing such a configuration file, so I reconsidered my life choices.
     16 
     17 It turns out that a Lua script is much easier for development (thanks to
     18 existing interpreters), for setup (thanks to a friendlier language), and
     19 for maintenance (thanks to basic logic not being scattered across a lot
     20 of small shell scripts).
     21 
     22 ## Manual
     23 
     24 ### Commands
     25 
     26 The provided commands provide varying levels of extra primitives bound
     27 to the Lua environment.
     28 
     29 For now, the generated commands run `mqttagent.lua` in the current
     30 directory and stay in the foreground.
     31 
     32 Once the script is initially run, the program runs until all connections
     33 and all timers are finished (or the script calls `os.exit`).
     34 
     35 ### MQTT client creation
     36 
     37 ```lua
     38 client = mqttclient.new(config)
     39 ```
     40 
     41 It creates a new MQTT client, using a configuration table which
     42 is deserialized into [a `go-mqtt/mqtt.Config`
     43 structure](https://pkg.go.dev/github.com/go-mqtt/mqtt#Config),
     44 with the extra field `connection` used to make the `Dialer`.
     45 
     46 ### MQTT reception callbacks
     47 
     48 The client has a table-like API to set or reset callbacks:
     49 
     50 ```lua
     51 -- Set a callback
     52 client[topic_filer_string] = callback_function
     53 -- Call or query an existing callback
     54 client[topic_filter_string](self, message, topic, t)
     55 local f = client[topic_filter_string]
     56 -- Reset a callback
     57 client[topic_filter_string] = nil
     58 ```
     59 
     60 The callbacks are called with four parameters:
     61 
     62  - the client object (`self`);
     63  - the received message, as a string;
     64  - the topic of the received message, as a string;
     65  - the reception time, as a number in seconds (compatible with timers).
     66 
     67 Setting a new callback automatically subscribes to the filter,
     68 and resetting the callback automaticall unsubscribes.
     69 
     70 Note that when subscribing to overlapping-but-different filters, messages
     71 may be duplicated (i.e. when the overlap makes the broker send the message
     72 twice, each callback will be called twice).
     73 Since QoS is not supported yet, callbacks should always be ready to handle
     74 multiple and missing messages anyway.
     75 
     76 ### Internal event callbacks
     77 
     78 When the connection of a client comes up or down, Lua callback is triggered
     79 as if an empty message has been sent on topic `$SYS/self/online` or
     80 `$SYS/self/offline`.
     81 
     82 Note that due to being offline, the client should not be used in the offline
     83 callback.
     84 
     85 Also note that mqttagent assumes without checking that subscriptions are
     86 preserved by the broker when coming back online.
     87 The online callback is a good place to resubscribe if needed.
     88 
     89 ### MQTT message sending
     90 
     91 The client has a function-like API to send messages:
     92 
     93 ```lua
     94 -- Send a ping to the server
     95 client()
     96 -- Send a message to the given topic
     97 client(message, topic)
     98 ```
     99 
    100 ### Timers
    101 
    102 Timers are created using `timer.new` with a time and a callback:
    103 
    104 ```lua
    105 timer_obj = timer.new(t, callback)
    106 ```
    107 
    108 Timers **are NOT** automatically repeated, the callback is called only
    109 once, after `t`, and then the timer is destroyed unless explicitly
    110 rescheduled:
    111 
    112 ```lua
    113 timer_obj:schedule(next_t)
    114 ```
    115 
    116 To make repeating timers, explicitly call `self:schedule` within the
    117 callback with the next time.
    118 
    119 When the time given in `timer.new` or `:schedule` is in the past, the
    120 callback is called as soon as possible.
    121 
    122 The `:cancel` method is also available to de-schedule a timer without
    123 destroying it.
    124 
    125 ## Examples
    126 
    127 ### Simplest client prints one message and leaves
    128 
    129 ```lua
    130 -- Create an anonymous client
    131 local c = mqttclient.new({ connection = "127.0.0.1:1883" })
    132 
    133 -- Subscribe to all topics under `test`
    134 c["test/#"] = function(self, message, topic)
    135 	-- Print the received message
    136 	print(message)
    137 	-- Unsubscribe
    138 	c["test/#"] = nil
    139 end
    140 ```
    141 
    142 ### Client with LWT, keepalive, timer, and internal events
    143 
    144 ```lua
    145 -- Keep alive in seconds
    146 local keepalive = 60
    147 -- Create the client
    148 local c = mqttclient.new{
    149 	connection = "127.0.0.1:1883",
    150 	user_name = "mqttagent",
    151 	password = "1234",
    152 	will = {
    153 		topic = "test/status/mqttagent",
    154 		message = "Offline",
    155 	},
    156 	keep_alive = keepalive,
    157 }
    158 
    159 -- Setup a ping timer (not needed anymore for the keepalive)
    160 if keepalive > 0 then
    161 	-- Create a new timer, dropping the variable (`self` is enough)
    162 	timer.new(os.time() + keepalive, function(self, t)
    163 		-- Send a ping
    164 		c()
    165 		-- Schedule next keepalive
    166 		self:schedule(t + keepalive)
    167 	end)
    168 end
    169 
    170 -- Print the next 10 messages send to `test` topic
    171 local count = 10
    172 c["test"] = function(self, message, topic)
    173 	print(message)
    174 	count = count - 1
    175 	if count <= 0 then
    176 		os.exit(0)
    177 	end
    178 end
    179 
    180 -- Announce start on (re)connection
    181 c["$SYS/self/online"] = function(self)
    182 	self("Online", "test/status/mqttagent")
    183 end
    184 -- Pushover alert on deconnection
    185 c["$SYS/self/offline"] = function(self)
    186 	http.post("https://api.pushover.net/1/messages.json", {
    187 		body = urlencode({
    188 			token = "My Pushover Token String",
    189 			user = "My Pushover User String",
    190 			message = "Mqttagent Disconnected"
    191 		})
    192 	})
    193 end
    194 ```
    195 
    196 ### One-way MQTT Bridge
    197 
    198 ```lua
    199 -- Create the source client
    200 local source = mqttclient.new{
    201 	connection = "mqtt.example.com:1883",
    202 	user_name = "mqttagent",
    203 	password = "1234",
    204 }
    205 -- Create the destination client
    206 local dest = mqttclient.new{ "127.0.0.1:1883" }
    207 -- Make the one-way bridge
    208 source["#"] = function(self, message, topic)
    209 	dest(message, topic)
    210 end
    211 ```
    212 
    213 ### RRD Update Using JSON Payload Form Tasmota Energy Sensor
    214 
    215 ```lua
    216 local json = require("json")
    217 local c = mqttclient.new{ connection = "127.0.0.1:1883" }
    218 
    219 function millis(n)
    220 	if n then
    221 		return math.floor(n * 1000 + 0.5)
    222 	else
    223 		return "U"
    224 	end
    225 end
    226 
    227 c["tele/+/SENSOR"] = function(self, message, topic)
    228 	-- Deduce file name from middle part of the topic
    229 	local name = string.sub(topic, 6, -8)
    230 	-- Sanity check
    231 	if string.find(name, "[^_%w%-]") then return end
    232 	-- Decode JSON payload
    233 	local obj, err = json.decode(message)
    234 	if err then
    235 		print(err)
    236 		return
    237 	end
    238 	if not obj.ENERGY then return end
    239 
    240 	os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" ..
    241 	            name .. ".rrd\" N:" ..
    242 	            millis(obj.ENERGY.Total) ..
    243 	            ":" .. (obj.ENERGY.Period or "U") ..
    244 	            ":" .. (obj.ENERGY.Power or "U") ..
    245 	            ":" .. millis(obj.ENERGY.Factor) ..
    246 	            ":" .. (obj.ENERGY.Voltage or "U") ..
    247 	            ":" .. millis(obj.ENERGY.Current))
    248 end
    249 ```
    250 
    251 ### Backup Monitoring
    252 
    253 ```lua
    254 local c = mqttclient.new{ connection = "127.0.0.1:1883" }
    255 -- List of topic suffix to check
    256 local backups = {
    257 	["dest1/src1"] = 0,
    258 	["dest1/src2"] = 0,
    259 	["dest2/src2"] = 0,
    260 	["dest3/src1"] = 0,
    261 }
    262 
    263 -- Check every day at 5:02:42 UTC that all backups have been done
    264 function next_check()
    265 	local tbl = os.date("!*t")
    266 	tbl.hour = 5
    267 	tbl.min = 2
    268 	tbl.sec = 42
    269 	return os.time(tbl) + 86400
    270 end
    271 
    272 timer.new(next_check(), function(self, t)
    273 	local msg = ""
    274 	-- Accumulate missing backup names in msg and reset the table
    275 	for k, v in pairs(backups) do
    276 		if v == 0 then
    277 			msg = msg .. (#msg > 0 and ", " or "") .. k
    278 		end
    279 		backups[k] = 0
    280 	end
    281 	-- Send an alert when a backup is missing
    282 	if #msg > 0 then
    283 		c("Missing: " .. msg, "alert/backup")
    284 	end
    285 	-- Check again tomorrow
    286 	self:schedule(t + 86400)
    287 end)
    288 
    289 -- Mark backups as seen when notified
    290 c["backup/#"] = function(self, _, topic, t)
    291 	local name = string.sub(topic, 8)
    292 	if backups[name] then
    293 		backups[name] = t
    294 	end
    295 end
    296 ```