mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

mqttagent.go (18589B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package mqttagent
     18 
     19 import (
     20 	"errors"
     21 	"fmt"
     22 	"log"
     23 	"os"
     24 	"strings"
     25 	"time"
     26 
     27 	"github.com/go-mqtt/mqtt"
     28 	"github.com/yuin/gluamapper"
     29 	"github.com/yuin/gopher-lua"
     30 )
     31 
     32 type MqttAgent interface {
     33 	Setup(L *lua.LState)
     34 	Log(L *lua.LState, msg *MqttMessage)
     35 	Teardown(L *lua.LState)
     36 }
     37 
     38 type MqttReloadingAgent interface {
     39 	Setup(L *lua.LState)
     40 	Log(L *lua.LState, msg *MqttMessage)
     41 	ReloadBegin(oldL, newL *lua.LState)
     42 	ReloadAbort(oldL, newL *lua.LState)
     43 	ReloadEnd(oldL, newL *lua.LState)
     44 	Teardown(L *lua.LState)
     45 }
     46 
     47 type MqttMessage struct {
     48 	Timestamp float64
     49 	ClientId  int
     50 	Topic     []byte
     51 	Message   []byte
     52 }
     53 
     54 func Run(agent MqttAgent, main_script string, capacity int) {
     55 	fromMqtt := make(chan MqttMessage, capacity)
     56 
     57 	L := lua.NewState()
     58 	defer L.Close()
     59 
     60 	agent.Setup(L)
     61 	defer agent.Teardown(L)
     62 
     63 	hostname, err := os.Hostname()
     64 	if err != nil {
     65 		hostname = "<unknown>"
     66 	}
     67 
     68 	idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid())
     69 	registerMqttClientType(L)
     70 	registerTimerType(L)
     71 	registerState(L, idString, fromMqtt)
     72 	defer cleanupClients(L)
     73 
     74 	if err := L.DoFile(main_script); err != nil {
     75 		panic(err)
     76 	}
     77 
     78 	timer := time.NewTimer(0)
     79 	defer timer.Stop()
     80 
     81 	log.Println(idString, "started")
     82 
     83 	for {
     84 		select {
     85 		case msg, ok := <-fromMqtt:
     86 
     87 			if !ok {
     88 				log.Println("fromMqtt is closed")
     89 				break
     90 			}
     91 
     92 			processMsg(L, agent, &msg)
     93 
     94 		case <-timer.C:
     95 		}
     96 
     97 		runTimers(L, timer)
     98 
     99 		if stateReloadRequested(L) {
    100 			L = reload(L, agent, main_script)
    101 			runTimers(L, timer)
    102 			stateRequestReload(L, lua.LNil)
    103 		}
    104 
    105 		if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) {
    106 			break
    107 		}
    108 	}
    109 
    110 	log.Println(idString, "finished")
    111 }
    112 
    113 func cleanupClients(L *lua.LState) {
    114 	cnxTbl := stateCnxTable(L)
    115 	if cnxTbl == nil {
    116 		return
    117 	}
    118 
    119 	L.ForEach(cnxTbl, func(key, value lua.LValue) {
    120 		cnx := value.(*lua.LTable)
    121 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    122 		if err := client.Disconnect(nil); err != nil {
    123 			log.Printf("cleanup client %s: %v", lua.LVAsString(key), err)
    124 		}
    125 	})
    126 }
    127 
    128 func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) {
    129 	skey, ok := key.(lua.LString)
    130 	topic := string(msg.Topic)
    131 
    132 	if ok && match(topic, string(skey)) {
    133 		err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true},
    134 			cnx,
    135 			lua.LString(string(msg.Message)),
    136 			lua.LString(topic),
    137 			lua.LNumber(msg.Timestamp))
    138 		if err != nil {
    139 			panic(err)
    140 		}
    141 	}
    142 }
    143 
    144 func matchSliced(actual, filter []string) bool {
    145 	if len(filter) == 0 {
    146 		return len(actual) == 0
    147 	}
    148 
    149 	if filter[0] == "#" {
    150 		if len(filter) == 1 {
    151 			return true
    152 		}
    153 
    154 		for i := range actual {
    155 			if matchSliced(actual[i:], filter[1:]) {
    156 				return true
    157 			}
    158 		}
    159 
    160 		return false
    161 	}
    162 
    163 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    164 		return matchSliced(actual[1:], filter[1:])
    165 	}
    166 
    167 	return false
    168 }
    169 
    170 func match(actual, filter string) bool {
    171 	return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/"))
    172 }
    173 
    174 func tableIsEmpty(t *lua.LTable) bool {
    175 	key, _ := t.Next(lua.LNil)
    176 	return key == lua.LNil
    177 }
    178 
    179 func processMsg(L *lua.LState, agent MqttAgent, msg *MqttMessage) {
    180 	agent.Log(L, msg)
    181 
    182 	cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable)
    183 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    184 	L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) })
    185 
    186 	if tableIsEmpty(subTbl) {
    187 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    188 		if err := client.Disconnect(nil); err != nil {
    189 			log.Println("disconnect empty client:", err)
    190 		}
    191 		L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil)
    192 	}
    193 }
    194 
    195 func mqttRead(client *mqtt.Client, toLua chan<- MqttMessage, id int) {
    196 	var big *mqtt.BigMessage
    197 
    198 	for {
    199 		message, topic, err := client.ReadSlices()
    200 		t := float64(time.Now().UnixMicro()) * 1.0e-6
    201 
    202 		switch {
    203 		case err == nil:
    204 			toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)}
    205 
    206 		case errors.As(err, &big):
    207 			data, err := big.ReadAll()
    208 			if err != nil {
    209 				log.Println("mqttRead big message:", err)
    210 			} else {
    211 				toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data}
    212 			}
    213 
    214 		case errors.Is(err, mqtt.ErrClosed):
    215 			log.Println("mqttRead finishing:", err)
    216 			return
    217 
    218 		case mqtt.IsConnectionRefused(err):
    219 			log.Println("mqttRead connection refused:", err)
    220 			time.Sleep(15 * time.Minute)
    221 
    222 		default:
    223 			log.Println("mqttRead:", err)
    224 			time.Sleep(2 * time.Second)
    225 		}
    226 	}
    227 }
    228 
    229 func reload(oldL *lua.LState, agent MqttAgent, main_script string) *lua.LState {
    230 	log.Println("Reloading", main_script)
    231 	reloader, isReloader := agent.(MqttReloadingAgent)
    232 
    233 	newL := lua.NewState()
    234 
    235 	if isReloader {
    236 		reloader.ReloadBegin(oldL, newL)
    237 	} else {
    238 		agent.Setup(newL)
    239 	}
    240 
    241 	registerMqttClientType(newL)
    242 	registerTimerType(newL)
    243 
    244 	stateReloadBegin(oldL, newL)
    245 
    246 	if err := newL.DoFile(main_script); err != nil {
    247 		log.Println("Reload failed:", err)
    248 		stateReloadAbort(oldL, newL)
    249 		if isReloader {
    250 			reloader.ReloadAbort(oldL, newL)
    251 		} else {
    252 			agent.Teardown(newL)
    253 		}
    254 		newL.Close()
    255 		return oldL
    256 	} else {
    257 		stateReloadEnd(oldL, newL)
    258 		if isReloader {
    259 			reloader.ReloadEnd(oldL, newL)
    260 		} else {
    261 			agent.Teardown(oldL)
    262 		}
    263 		oldL.Close()
    264 		log.Println("Reload successful")
    265 		return newL
    266 	}
    267 }
    268 
    269 func dup(src []byte) []byte {
    270 	res := make([]byte, len(src))
    271 	copy(res, src)
    272 	return res
    273 }
    274 
    275 func newUserData(L *lua.LState, v interface{}) *lua.LUserData {
    276 	res := L.NewUserData()
    277 	res.Value = v
    278 	return res
    279 }
    280 
    281 /********** State Object in the Lua Interpreter **********/
    282 
    283 const luaStateName = "_mqttagent"
    284 const keyChanToLua = 1
    285 const keyClientPrefix = 2
    286 const keyClientNextId = 3
    287 const keyCfgMap = 4
    288 const keyCnxTable = 5
    289 const keyTimerTable = 6
    290 const keyReloadRequest = 7
    291 const keyOldCfgMap = 8
    292 
    293 func registerState(L *lua.LState, clientPrefix string, toLua chan<- MqttMessage) {
    294 	st := L.NewTable()
    295 	L.RawSetInt(st, keyChanToLua, newUserData(L, toLua))
    296 	L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix))
    297 	L.RawSetInt(st, keyClientNextId, lua.LNumber(1))
    298 	L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap)))
    299 	L.RawSetInt(st, keyCnxTable, L.NewTable())
    300 	L.RawSetInt(st, keyTimerTable, L.NewTable())
    301 	L.SetGlobal(luaStateName, st)
    302 	L.SetGlobal("reload", L.NewFunction(requestReload))
    303 }
    304 
    305 func stateReloadBegin(oldL, newL *lua.LState) {
    306 	oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable)
    307 	toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- MqttMessage)
    308 	clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix)
    309 	nextId := oldL.RawGetInt(oldSt, keyClientNextId)
    310 	cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    311 
    312 	st := newL.NewTable()
    313 	newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua))
    314 	newL.RawSetInt(st, keyClientPrefix, clientPrefix)
    315 	newL.RawSetInt(st, keyClientNextId, nextId)
    316 	newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap)))
    317 	newL.RawSetInt(st, keyCnxTable, newL.NewTable())
    318 	newL.RawSetInt(st, keyTimerTable, newL.NewTable())
    319 	newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap))
    320 	newL.SetGlobal(luaStateName, st)
    321 	newL.SetGlobal("reload", newL.NewFunction(requestReload))
    322 }
    323 
    324 func stateReloadAbort(oldL, newL *lua.LState) {
    325 	statePartialCleanup(newL, oldL)
    326 }
    327 
    328 func stateReloadEnd(oldL, newL *lua.LState) {
    329 	statePartialCleanup(oldL, newL)
    330 	newSt := newL.GetGlobal(luaStateName).(*lua.LTable)
    331 	newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil)
    332 }
    333 
    334 func statePartialCleanup(staleL, keptL *lua.LState) {
    335 	staleCnxTable := stateCnxTable(staleL)
    336 	keptCnxTable := stateCnxTable(keptL)
    337 
    338 	staleL.ForEach(staleCnxTable, func(key, value lua.LValue) {
    339 		clientId := int(key.(lua.LNumber))
    340 		if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil {
    341 			client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    342 			client.Close()
    343 		}
    344 	})
    345 
    346 	keptL.ForEach(keptCnxTable, func(key, value lua.LValue) {
    347 		clientId := int(key.(lua.LNumber))
    348 		keptCnx := value.(*lua.LTable)
    349 		if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) {
    350 			return
    351 		}
    352 
    353 		client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    354 		staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable)
    355 
    356 		if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) {
    357 			panic("This shouldn't happen")
    358 		}
    359 
    360 		keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable)
    361 		staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable)
    362 
    363 		staleL.ForEach(staleSub, func(key, _ lua.LValue) {
    364 			topic := string(key.(lua.LString))
    365 			if keptL.GetField(keptSub, topic) == lua.LNil {
    366 				log.Println("Unsubscribing from stale topic", topic)
    367 				if err := client.Unsubscribe(nil, topic); err != nil {
    368 					log.Println("Failed to unsubscribe:", err)
    369 				}
    370 			}
    371 		})
    372 
    373 		keptL.ForEach(keptSub, func(key, _ lua.LValue) {
    374 			topic := string(key.(lua.LString))
    375 			if staleL.GetField(staleSub, topic) == lua.LNil {
    376 				log.Println("Subscribing to new topic", topic)
    377 				if err := client.Subscribe(nil, topic); err != nil {
    378 					log.Println("Failed to subscribe:", err)
    379 				}
    380 			}
    381 		})
    382 
    383 		keptL.RawSetInt(keptCnx, keyReused, lua.LNil)
    384 	})
    385 }
    386 
    387 func stateValue(L *lua.LState, key int) lua.LValue {
    388 	st := L.GetGlobal(luaStateName)
    389 	return L.RawGetInt(st.(*lua.LTable), key)
    390 }
    391 
    392 func stateChanToLua(L *lua.LState) chan<- MqttMessage {
    393 	ud := stateValue(L, keyChanToLua)
    394 	return ud.(*lua.LUserData).Value.(chan<- MqttMessage)
    395 }
    396 
    397 func stateClientNextId(L *lua.LState) (int, string) {
    398 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    399 	result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber))
    400 	L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1))
    401 	prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix))
    402 	return result, fmt.Sprintf("%s-%d", prefix, result)
    403 }
    404 
    405 func stateCfgMap(L *lua.LState) mqttConfigMap {
    406 	return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    407 }
    408 
    409 func stateCnxTable(L *lua.LState) *lua.LTable {
    410 	return stateValue(L, keyCnxTable).(*lua.LTable)
    411 }
    412 
    413 func stateTimerTable(L *lua.LState) *lua.LTable {
    414 	return stateValue(L, keyTimerTable).(*lua.LTable)
    415 }
    416 
    417 func stateReloadRequested(L *lua.LState) bool {
    418 	return lua.LVAsBool(stateValue(L, keyReloadRequest))
    419 }
    420 
    421 func stateRequestReload(L *lua.LState, v lua.LValue) {
    422 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    423 	L.RawSetInt(st, keyReloadRequest, v)
    424 }
    425 
    426 func stateOldCfgMap(L *lua.LState) mqttConfigMap {
    427 	val := stateValue(L, keyOldCfgMap)
    428 	if val == lua.LNil {
    429 		return nil
    430 	} else {
    431 		return val.(*lua.LUserData).Value.(mqttConfigMap)
    432 	}
    433 }
    434 
    435 func requestReload(L *lua.LState) int {
    436 	stateRequestReload(L, lua.LTrue)
    437 	return 0
    438 }
    439 
    440 /********** Lua Object for MQTT client **********/
    441 
    442 const luaMqttClientTypeName = "mqttclient"
    443 const keyClient = 1
    444 const keySubTable = 2
    445 const keyReused = 3
    446 
    447 func registerMqttClientType(L *lua.LState) {
    448 	mt := L.NewTypeMetatable(luaMqttClientTypeName)
    449 	L.SetGlobal(luaMqttClientTypeName, mt)
    450 	L.SetField(mt, "new", L.NewFunction(newMqttClient))
    451 	L.SetField(mt, "__call", L.NewFunction(luaPublish))
    452 	L.SetField(mt, "__index", L.NewFunction(luaQuery))
    453 	L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe))
    454 }
    455 
    456 type mqttConfig struct {
    457 	Connection     string
    458 	PauseTimeout   string
    459 	AtLeastOnceMax int
    460 	ExactlyOnceMax int
    461 	UserName       string
    462 	Password       string
    463 	Will           struct {
    464 		Topic       string
    465 		Message     string
    466 		Retain      bool
    467 		AtLeastOnce bool
    468 		ExactlyOnce bool
    469 	}
    470 	KeepAlive    uint16
    471 	CleanSession bool
    472 }
    473 
    474 type mqttClientEntry struct {
    475 	client *mqtt.Client
    476 	id     int
    477 }
    478 
    479 type mqttConfigMap map[mqttConfig]mqttClientEntry
    480 
    481 func mqttConfigBytes(src string) []byte {
    482 	if src == "" {
    483 		return nil
    484 	} else {
    485 		return []byte(src)
    486 	}
    487 }
    488 
    489 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) {
    490 	pto, err := time.ParseDuration(config.PauseTimeout)
    491 	if err != nil {
    492 		pto = time.Second
    493 	}
    494 
    495 	processed_cfg := mqtt.Config{
    496 		Dialer:         mqtt.NewDialer("tcp", config.Connection),
    497 		PauseTimeout:   pto,
    498 		AtLeastOnceMax: config.AtLeastOnceMax,
    499 		ExactlyOnceMax: config.ExactlyOnceMax,
    500 		UserName:       config.UserName,
    501 		Password:       mqttConfigBytes(config.Password),
    502 		Will: struct {
    503 			Topic       string
    504 			Message     []byte
    505 			Retain      bool
    506 			AtLeastOnce bool
    507 			ExactlyOnce bool
    508 		}{
    509 			Topic:       config.Will.Topic,
    510 			Message:     mqttConfigBytes(config.Will.Message),
    511 			Retain:      config.Will.Retain,
    512 			AtLeastOnce: config.Will.AtLeastOnce,
    513 			ExactlyOnce: config.Will.ExactlyOnce,
    514 		},
    515 		KeepAlive:    config.KeepAlive,
    516 		CleanSession: config.CleanSession,
    517 	}
    518 
    519 	return mqtt.VolatileSession(id, &processed_cfg)
    520 }
    521 
    522 func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue {
    523 	res := L.NewTable()
    524 	L.RawSetInt(res, keyClient, newUserData(L, client))
    525 	L.RawSetInt(res, keySubTable, L.NewTable())
    526 	if reused {
    527 		L.RawSetInt(res, keyReused, lua.LTrue)
    528 	}
    529 	L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName))
    530 	L.RawSetInt(stateCnxTable(L), id, res)
    531 	return res
    532 }
    533 
    534 func newMqttClient(L *lua.LState) int {
    535 	var config mqttConfig
    536 	if err := gluamapper.Map(L.CheckTable(1), &config); err != nil {
    537 		log.Println("newMqttClient:", err)
    538 		L.Push(lua.LNil)
    539 		L.Push(lua.LString(err.Error()))
    540 		return 2
    541 	}
    542 
    543 	cfgMap := stateCfgMap(L)
    544 
    545 	if cfg, found := cfgMap[config]; found {
    546 		res := L.RawGetInt(stateCnxTable(L), cfg.id)
    547 		tbl := res.(*lua.LTable)
    548 		if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client {
    549 			panic("Inconsistent configuration table")
    550 		}
    551 
    552 		L.Push(res)
    553 		return 1
    554 	}
    555 
    556 	if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil {
    557 		if cfg, found := oldCfgMap[config]; found {
    558 			cfgMap[config] = cfg
    559 			L.Push(registerClient(L, cfg.id, cfg.client, true))
    560 			return 1
    561 		}
    562 	}
    563 
    564 	id, idString := stateClientNextId(L)
    565 	client, err := newClient(&config, idString)
    566 	if err != nil {
    567 		log.Println("newMqttClient:", err)
    568 		L.Push(lua.LNil)
    569 		L.Push(lua.LString(err.Error()))
    570 		return 2
    571 	}
    572 	go mqttRead(client, stateChanToLua(L), id)
    573 
    574 	cfgMap[config] = mqttClientEntry{id: id, client: client}
    575 
    576 	L.Push(registerClient(L, id, client, false))
    577 	return 1
    578 }
    579 
    580 func luaPublish(L *lua.LState) int {
    581 	cnx := L.CheckTable(1)
    582 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    583 
    584 	if L.GetTop() == 1 {
    585 		if err := client.Ping(nil); err != nil {
    586 			log.Println("luaPing:", err)
    587 			L.Push(lua.LNil)
    588 			L.Push(lua.LString(err.Error()))
    589 			return 2
    590 		} else {
    591 			L.Push(lua.LTrue)
    592 			return 1
    593 		}
    594 	}
    595 
    596 	message := L.CheckString(2)
    597 	topic := L.CheckString(3)
    598 
    599 	if err := client.Publish(nil, []byte(message), topic); err != nil {
    600 		L.Push(lua.LNil)
    601 		L.Push(lua.LString(err.Error()))
    602 		return 2
    603 	} else {
    604 		L.Push(lua.LTrue)
    605 		return 1
    606 	}
    607 }
    608 
    609 func luaQuery(L *lua.LState) int {
    610 	cnx := L.CheckTable(1)
    611 	topic := L.CheckString(2)
    612 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    613 	L.Push(L.GetField(subTbl, topic))
    614 	return 1
    615 }
    616 
    617 func luaSubscribe(L *lua.LState) int {
    618 	var err error
    619 	cnx := L.CheckTable(1)
    620 	topic := L.CheckString(2)
    621 	callback := L.OptFunction(3, nil)
    622 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    623 	tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    624 
    625 	_, is_new := L.GetField(tbl, topic).(*lua.LNilType)
    626 
    627 	if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
    628 		if callback == nil {
    629 			err = client.Unsubscribe(nil, topic)
    630 		} else if is_new {
    631 			err = client.Subscribe(nil, topic)
    632 		}
    633 	}
    634 
    635 	if err != nil {
    636 		log.Println("luaSubscribe:", err)
    637 		L.Push(lua.LNil)
    638 		L.Push(lua.LString(err.Error()))
    639 		return 2
    640 	} else {
    641 		if callback == nil {
    642 			if is_new {
    643 				log.Printf("Not subscribed to %q", topic)
    644 			} else {
    645 				log.Printf("Unsubscribed from %q", topic)
    646 			}
    647 			L.SetField(tbl, topic, lua.LNil)
    648 		} else {
    649 			if is_new {
    650 				log.Printf("Subscribed to %q", topic)
    651 			} else {
    652 				log.Printf("Updating subscription to %q", topic)
    653 			}
    654 			L.SetField(tbl, topic, callback)
    655 		}
    656 
    657 		L.Push(lua.LTrue)
    658 		return 1
    659 	}
    660 }
    661 
    662 /********** Lua Object for timers **********/
    663 
    664 const luaTimerTypeName = "timer"
    665 
    666 func registerTimerType(L *lua.LState) {
    667 	mt := L.NewTypeMetatable(luaTimerTypeName)
    668 	L.SetGlobal(luaTimerTypeName, mt)
    669 	L.SetField(mt, "new", L.NewFunction(newTimer))
    670 	L.SetField(mt, "schedule", L.NewFunction(timerSchedule))
    671 	L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods))
    672 }
    673 
    674 func newTimer(L *lua.LState) int {
    675 	atTime := L.Get(1)
    676 	cb := L.CheckFunction(2)
    677 	L.Pop(2)
    678 	L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName))
    679 	L.Push(cb)
    680 	L.Push(atTime)
    681 	return timerSchedule(L)
    682 }
    683 
    684 var timerMethods = map[string]lua.LGFunction{
    685 	"cancel":   timerCancel,
    686 	"schedule": timerSchedule,
    687 }
    688 
    689 func timerCancel(L *lua.LState) int {
    690 	timer := L.CheckFunction(1)
    691 	L.RawSet(stateTimerTable(L), timer, lua.LNil)
    692 	return 0
    693 }
    694 
    695 func timerSchedule(L *lua.LState) int {
    696 	timer := L.CheckFunction(1)
    697 	atTime := lua.LNil
    698 	if L.Get(2) != lua.LNil {
    699 		atTime = L.CheckNumber(2)
    700 	}
    701 
    702 	L.RawSet(stateTimerTable(L), timer, atTime)
    703 	return 0
    704 }
    705 
    706 func toTime(lsec lua.LNumber) time.Time {
    707 	fsec := float64(lsec)
    708 	sec := int64(fsec)
    709 	nsec := int64((fsec - float64(sec)) * 1.0e9)
    710 
    711 	return time.Unix(sec, nsec)
    712 }
    713 
    714 func runTimers(L *lua.LState, parentTimer *time.Timer) {
    715 	hasNext := false
    716 	var nextTime time.Time
    717 
    718 	now := time.Now()
    719 	timers := stateTimerTable(L)
    720 
    721 	timer, luaT := timers.Next(lua.LNil)
    722 	for timer != lua.LNil {
    723 		t := toTime(luaT.(lua.LNumber))
    724 		if t.Compare(now) <= 0 {
    725 			L.RawSet(timers, timer, lua.LNil)
    726 			err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT)
    727 			if err != nil {
    728 				panic(err)
    729 			}
    730 			timer = lua.LNil
    731 			hasNext = false
    732 		} else if !hasNext || t.Compare(nextTime) < 0 {
    733 			hasNext = true
    734 			nextTime = t
    735 		}
    736 
    737 		timer, luaT = timers.Next(timer)
    738 	}
    739 
    740 	if hasNext {
    741 		parentTimer.Reset(time.Until(nextTime))
    742 	} else {
    743 		parentTimer.Stop()
    744 	}
    745 }