mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

README.md (7444B)


      1 # mqttagent
      2 
      3 [![Casual Maintenance Intended](https://casuallymaintained.tech/badge.svg)](https://casuallymaintained.tech/)
      4 
      5 This is a daemon MQTT client which runs Lua callbacks on received messages.
      6 
      7 It started with a backup graph that did not match the connectivity graph,
      8 so some data had to be pushed while other had to be pulled.
      9 So I needed machine-to-machine signalling to pull data from a source after it
     10 has been pushed completely.
     11 
     12 I thought of MQTT for the signalling, and a kind of
     13 cron-but-for-MQTT-messages-instead-of-time to run actions.
     14 But then I imagined writing a parser for a crontab-like configuration file,
     15 and then writing such a configuration file, so I reconsidered my life choices.
     16 
     17 It turns out that a Lua script is much easier for development (thanks to
     18 existing interpreters), for setup (thanks to a friendlier language), and
     19 for maintenance (thanks to basic logic not being scattered across a lot
     20 of small shell scripts).
     21 
     22 ## Manual
     23 
     24 ### Commands
     25 
     26 The provided commands provide varying levels of extra primitives bound
     27 to the Lua environment.
     28 
     29 For now, the generated commands run `mqttagent.lua` in the current
     30 directory and stay in the foreground.
     31 
     32 Once the script is initially run, the program runs until all connections
     33 and all timers are finished (or the script calls `os.exit`).
     34 
     35 ### MQTT client creation
     36 
     37 ```lua
     38 client = mqttclient.new(config)
     39 ```
     40 
     41 It creates a new MQTT client, using a configuration table which
     42 is deserialized into [a `go-mqtt/mqtt.Config`
     43 structure](https://pkg.go.dev/github.com/go-mqtt/mqtt#Config),
     44 with the extra field `connection` used to make the `Dialer`.
     45 
     46 ### MQTT reception callbacks
     47 
     48 The client has a table-like API to set or reset callbacks:
     49 
     50 ```lua
     51 -- Set a callback
     52 client[topic_filer_string] = callback_function
     53 -- Call or query an existing callback
     54 client[topic_filter_string](self, message, topic, t)
     55 local f = client[topic_filter_string]
     56 -- Reset a callback
     57 client[topic_filter_string] = nil
     58 ```
     59 
     60 The callbacks are called with four parameters:
     61 
     62  - the client object (`self`);
     63  - the received message, as a string;
     64  - the topic of the received message, as a string;
     65  - the reception time, as a number in seconds (compatible with timers).
     66 
     67 Setting a new callback automatically subscribes to the filter,
     68 and resetting the callback automaticall unsubscribes.
     69 
     70 Note that when subscribing to overlapping-but-different filters, messages
     71 may be duplicated (i.e. when the overlap makes the broker send the message
     72 twice, each callback will be called twice).
     73 Since QoS is not supported yet, callbacks should always be ready to handle
     74 multiple and missing messages anyway.
     75 
     76 ### Internal event callbacks
     77 
     78 When the connection of a client comes up or down, Lua callback is triggered
     79 as if an empty message has been sent on topic `$SYS/self/online` or
     80 `$SYS/self/offline`.
     81 
     82 Note that due to being offline, the client should not be used in the offline
     83 callback.
     84 
     85 Also note that mqttagent assumes without checking that subscriptions are
     86 preserved by the broker when coming back online.
     87 The online callback is a good place to resubscribe if needed.
     88 
     89 ### MQTT message sending
     90 
     91 The client has a function-like API to send messages:
     92 
     93 ```lua
     94 -- Send a ping to the server
     95 client()
     96 -- Send a message to the given topic
     97 client(message, topic)
     98 ```
     99 
    100 ### Timers
    101 
    102 Timers are created using `timer.new` with a time and a callback:
    103 
    104 ```lua
    105 timer_obj = timer.new(t, callback)
    106 ```
    107 
    108 Timers **are NOT** automatically repeated, the callback is called only
    109 once, after `t`, and then the timer is destroyed unless explicitly
    110 rescheduled:
    111 
    112 ```lua
    113 timer_obj:schedule(next_t)
    114 ```
    115 
    116 To make repeating timers, explicitly call `self:schedule` within the
    117 callback with the next time.
    118 
    119 When the time given in `timer.new` or `:schedule` is in the past, the
    120 callback is called as soon as possible.
    121 
    122 The `:cancel` method is also available to de-schedule a timer without
    123 destroying it.
    124 
    125 ## Examples
    126 
    127 ### Simplest client prints one message and leaves
    128 
    129 ```lua
    130 -- Create an anonymous client
    131 local c = mqttclient.new({ connection = "127.0.0.1:1883" })
    132 
    133 -- Subscribe to all topics under `test`
    134 c["test/#"] = function(self, message, topic)
    135 	-- Print the received message
    136 	print(message)
    137 	-- Unsubscribe
    138 	c["test/#"] = nil
    139 end
    140 ```
    141 
    142 ### Client with LWT, keepalive and timer
    143 
    144 ```lua
    145 -- Keep alive in seconds
    146 local keepalive = 60
    147 -- Create the client
    148 local c = mqttclient.new{
    149 	connection = "127.0.0.1:1883",
    150 	user_name = "mqttagent",
    151 	password = "1234",
    152 	will = {
    153 		topic = "test/status/mqttagent",
    154 		message = "Offline",
    155 	},
    156 	keep_alive = keepalive,
    157 }
    158 
    159 -- Setup a ping timer for the keepalive
    160 if keepalive > 0 then
    161 	-- Create a new timer, dropping the variable (`self` is enough)
    162 	timer.new(os.time() + keepalive, function(self, t)
    163 		-- Send a ping
    164 		c()
    165 		-- Schedule next keepalive
    166 		self:schedule(t + keepalive)
    167 	end)
    168 end
    169 
    170 -- Print the next 10 messages send to `test` topic
    171 local count = 10
    172 c["test"] = function(self, message, topic)
    173 	print(message)
    174 	count = count - 1
    175 	if count <= 0 then
    176 		os.exit(0)
    177 	end
    178 end
    179 
    180 -- Announce start
    181 c("Online", "test/status/mqttagent")
    182 ```
    183 
    184 ### One-way MQTT Bridge
    185 
    186 ```lua
    187 -- Create the source client
    188 local source = mqttclient.new{
    189 	connection = "mqtt.example.com:1883",
    190 	user_name = "mqttagent",
    191 	password = "1234",
    192 }
    193 -- Create the destination client
    194 local dest = mqttclient.new{ "127.0.0.1:1883" }
    195 -- Make the one-way bridge
    196 source["#"] = function(self, message, topic)
    197 	dest(message, topic)
    198 end
    199 ```
    200 
    201 ### RRD Update Using JSON Payload Form Tasmota Energy Sensor
    202 
    203 ```lua
    204 local json = require("json")
    205 local c = mqttclient.new{ connection = "127.0.0.1:1883" }
    206 
    207 function millis(n)
    208 	if n then
    209 		return math.floor(n * 1000 + 0.5)
    210 	else
    211 		return "U"
    212 	end
    213 end
    214 
    215 c["tele/+/SENSOR"] = function(self, message, topic)
    216 	-- Deduce file name from middle part of the topic
    217 	local name = string.sub(topic, 6, -8)
    218 	-- Sanity check
    219 	if string.find(name, "[^_%w%-]") then return end
    220 	-- Decode JSON payload
    221 	local obj, err = json.decode(message)
    222 	if err then
    223 		print(err)
    224 		return
    225 	end
    226 	if not obj.ENERGY then return end
    227 
    228 	os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" ..
    229 	            name .. ".rrd\" N:" ..
    230 	            millis(obj.ENERGY.Total) ..
    231 	            ":" .. (obj.ENERGY.Period or "U") ..
    232 	            ":" .. (obj.ENERGY.Power or "U") ..
    233 	            ":" .. millis(obj.ENERGY.Factor) ..
    234 	            ":" .. (obj.ENERGY.Voltage or "U") ..
    235 	            ":" .. millis(obj.ENERGY.Current))
    236 end
    237 ```
    238 
    239 ### Backup Monitoring
    240 
    241 ```lua
    242 local c = mqttclient.new{ connection = "127.0.0.1:1883" }
    243 -- List of topic suffix to check
    244 local backups = {
    245 	["dest1/src1"] = 0,
    246 	["dest1/src2"] = 0,
    247 	["dest2/src2"] = 0,
    248 	["dest3/src1"] = 0,
    249 }
    250 
    251 -- Check every day at 5:02:42 UTC that all backups have been done
    252 function next_check()
    253 	local tbl = os.date("!*t")
    254 	tbl.hour = 5
    255 	tbl.min = 2
    256 	tbl.sec = 42
    257 	return os.time(tbl) + 86400
    258 end
    259 
    260 timer.new(next_check(), function(self, t)
    261 	local msg = ""
    262 	-- Accumulate missing backup names in msg and reset the table
    263 	for k, v in pairs(backups) do
    264 		if v == 0 then
    265 			msg = msg .. (#msg > 0 and ", " or "") .. k
    266 		end
    267 		backups[k] = 0
    268 	end
    269 	-- Send an alert when a backup is missing
    270 	if #msg > 0 then
    271 		c("Missing: " .. msg, "alert/backup")
    272 	end
    273 	-- Check again tomorrow
    274 	self:schedule(t + 86400)
    275 end)
    276 
    277 -- Mark backups as seen when notified
    278 c["backup/#"] = function(self, _, topic, t)
    279 	local name = string.sub(topic, 8)
    280 	if backups[name] then
    281 		backups[name] = t
    282 	end
    283 end
    284 ```