mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

README.md (6975B)


      1 # mqttagent
      2 
      3 [![Casual Maintenance Intended](https://casuallymaintained.tech/badge.svg)](https://casuallymaintained.tech/)
      4 
      5 This is a daemon MQTT client which runs Lua callbacks on received messages.
      6 
      7 It started with a backup graph that did not match the connectivity graph,
      8 so some data had to be pushed while other had to be pulled.
      9 So I needed machine-to-machine signalling to pull data from a source after it
     10 has been pushed completely.
     11 
     12 I thought of MQTT for the signalling, and a kind of
     13 cron-but-for-MQTT-messages-instead-of-time to run actions.
     14 But then I imagined writing a parser for a crontab-like configuration file,
     15 and then writing such a configuration file, so I reconsidered my life choices.
     16 
     17 It turns out that a Lua script is much easier for development (thanks to
     18 existing interpreters), for setup (thanks to a friendlier language), and
     19 for maintenance (thanks to basic logic not being scattered across a lot
     20 of small shell scripts).
     21 
     22 ## Manual
     23 
     24 ### Commands
     25 
     26 The provided commands provide varying levels of extra primitives bound
     27 to the Lua environment.
     28 
     29 For now, the generated commands run `mqttagent.lua` in the current
     30 directory and stay in the foreground.
     31 
     32 Once the script is initially run, the program runs until all connections
     33 and all timers are finished (or the script calls `os.exit`).
     34 
     35 ### MQTT client creation
     36 
     37 ```lua
     38 client = mqttclient.new(config)
     39 ```
     40 
     41 It creates a new MQTT client, using a configuration table which
     42 is deserialized into [a `go-mqtt/mqtt.Config`
     43 structure](https://pkg.go.dev/github.com/go-mqtt/mqtt#Config),
     44 with the extra field `connection` used to make the `Dialer`.
     45 
     46 ### MQTT reception callbacks
     47 
     48 The client has a table-like API to set or reset callbacks:
     49 
     50 ```lua
     51 -- Set a callback
     52 client[topic_filer_string] = callback_function
     53 -- Call or query an existing callback
     54 client[topic_filter_string](self, message, topic, t)
     55 local f = client[topic_filter_string]
     56 -- Reset a callback
     57 client[topic_filter_string] = nil
     58 ```
     59 
     60 The callbacks are called with four parameters:
     61 
     62  - the client object (`self`);
     63  - the received message, as a string;
     64  - the topic of the received message, as a string;
     65  - the reception time, as a number in seconds (compatible with timers).
     66 
     67 Setting a new callback automatically subscribes to the filter,
     68 and resetting the callback automaticall unsubscribes.
     69 
     70 Note that when subscribing to overlapping-but-different filters, messages
     71 may be duplicated (i.e. when the overlap makes the broker send the message
     72 twice, each callback will be called twice).
     73 Since QoS is not supported yet, callbacks should always be ready to handle
     74 multiple and missing messages anyway.
     75 
     76 ### MQTT message sending
     77 
     78 The client has a function-like API to send messages:
     79 
     80 ```lua
     81 -- Send a ping to the server
     82 client()
     83 -- Send a message to the given topic
     84 client(message, topic)
     85 ```
     86 
     87 ### Timers
     88 
     89 Timers are created using `timer.new` with a time and a callback:
     90 
     91 ```lua
     92 timer_obj = timer.new(t, callback)
     93 ```
     94 
     95 Timers **are NOT** automatically repeated, the callback is called only
     96 once, after `t`, and then the timer is destroyed unless explicitly
     97 rescheduled:
     98 
     99 ```lua
    100 timer_obj:schedule(next_t)
    101 ```
    102 
    103 To make repeating timers, explicitly call `self:schedule` within the
    104 callback with the next time.
    105 
    106 When the time given in `timer.new` or `:schedule` is in the past, the
    107 callback is called as soon as possible.
    108 
    109 The `:cancel` method is also available to de-schedule a timer without
    110 destroying it.
    111 
    112 ## Examples
    113 
    114 ### Simplest client prints one message and leaves
    115 
    116 ```lua
    117 -- Create an anonymous client
    118 local c = mqttclient.new({ connection = "127.0.0.1:1883" })
    119 
    120 -- Subscribe to all topics under `test`
    121 c["test/#"] = function(self, message, topic)
    122 	-- Print the received message
    123 	print(message)
    124 	-- Unsubscribe
    125 	c["test/#"] = nil
    126 end
    127 ```
    128 
    129 ### Client with LWT, keepalive and timer
    130 
    131 ```lua
    132 -- Keep alive in seconds
    133 local keepalive = 60
    134 -- Create the client
    135 local c = mqttclient.new{
    136 	connection = "127.0.0.1:1883",
    137 	user_name = "mqttagent",
    138 	password = "1234",
    139 	will = {
    140 		topic = "test/status/mqttagent",
    141 		message = "Offline",
    142 	},
    143 	keep_alive = keepalive,
    144 }
    145 
    146 -- Setup a ping timer for the keepalive
    147 if keepalive > 0 then
    148 	-- Create a new timer, dropping the variable (`self` is enough)
    149 	timer.new(os.time() + keepalive, function(self, t)
    150 		-- Send a ping
    151 		c()
    152 		-- Schedule next keepalive
    153 		self:schedule(t + keepalive)
    154 	end)
    155 end
    156 
    157 -- Print the next 10 messages send to `test` topic
    158 local count = 10
    159 c["test"] = function(self, message, topic)
    160 	print(message)
    161 	count = count - 1
    162 	if count <= 0 then
    163 		os.exit(0)
    164 	end
    165 end
    166 
    167 -- Announce start
    168 c("Online", "test/status/mqttagent")
    169 ```
    170 
    171 ### One-way MQTT Bridge
    172 
    173 ```lua
    174 -- Create the source client
    175 local source = mqttclient.new{
    176 	connection = "mqtt.example.com:1883",
    177 	user_name = "mqttagent",
    178 	password = "1234",
    179 }
    180 -- Create the destination client
    181 local dest = mqttclient.new{ "127.0.0.1:1883" }
    182 -- Make the one-way bridge
    183 source["#"] = function(self, message, topic)
    184 	dest(message, topic)
    185 end
    186 ```
    187 
    188 ### RRD Update Using JSON Payload Form Tasmota Energy Sensor
    189 
    190 ```lua
    191 local json = require("json")
    192 local c = mqttclient.new{ connection = "127.0.0.1:1883" }
    193 
    194 function millis(n)
    195 	if n then
    196 		return math.floor(n * 1000 + 0.5)
    197 	else
    198 		return "U"
    199 	end
    200 end
    201 
    202 c["tele/+/SENSOR"] = function(self, message, topic)
    203 	-- Deduce file name from middle part of the topic
    204 	local name = string.sub(topic, 6, -8)
    205 	-- Sanity check
    206 	if string.find(name, "[^_%w%-]") then return end
    207 	-- Decode JSON payload
    208 	local obj, err = json.decode(message)
    209 	if err then
    210 		print(err)
    211 		return
    212 	end
    213 	if not obj.ENERGY then return end
    214 
    215 	os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" ..
    216 	            name .. ".rrd\" N:" ..
    217 	            millis(obj.ENERGY.Total) ..
    218 	            ":" .. (obj.ENERGY.Period or "U") ..
    219 	            ":" .. (obj.ENERGY.Power or "U") ..
    220 	            ":" .. millis(obj.ENERGY.Factor) ..
    221 	            ":" .. (obj.ENERGY.Voltage or "U") ..
    222 	            ":" .. millis(obj.ENERGY.Current))
    223 end
    224 ```
    225 
    226 ### Backup Monitoring
    227 
    228 ```lua
    229 local c = mqttclient.new{ connection = "127.0.0.1:1883" }
    230 -- List of topic suffix to check
    231 local backups = {
    232 	["dest1/src1"] = 0,
    233 	["dest1/src2"] = 0,
    234 	["dest2/src2"] = 0,
    235 	["dest3/src1"] = 0,
    236 }
    237 
    238 -- Check every day at 5:02:42 UTC that all backups have been done
    239 function next_check()
    240 	local tbl = os.date("!*t")
    241 	tbl.hour = 5
    242 	tbl.min = 2
    243 	tbl.sec = 42
    244 	return os.time(tbl) + 86400
    245 end
    246 
    247 timer.new(next_check(), function(self, t)
    248 	local msg = ""
    249 	-- Accumulate missing backup names in msg and reset the table
    250 	for k, v in pairs(backups) do
    251 		if v == 0 then
    252 			msg = msg .. (#msg > 0 and ", " or "") .. k
    253 		end
    254 		backups[k] = 0
    255 	end
    256 	-- Send an alert when a backup is missing
    257 	if #msg > 0 then
    258 		c("Missing: " .. msg, "alert/backup")
    259 	end
    260 	-- Check again tomorrow
    261 	self:schedule(t + 86400)
    262 end)
    263 
    264 -- Mark backups as seen when notified
    265 c["backup/#"] = function(self, _, topic, t)
    266 	local name = string.sub(topic, 8)
    267 	if backups[name] then
    268 		backups[name] = t
    269 	end
    270 end
    271 ```