mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

mqttagent.go (18744B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package mqttagent
     18 
     19 import (
     20 	"errors"
     21 	"fmt"
     22 	"log"
     23 	"os"
     24 	"strings"
     25 	"time"
     26 
     27 	"github.com/go-mqtt/mqtt"
     28 	"github.com/yuin/gluamapper"
     29 	"github.com/yuin/gopher-lua"
     30 )
     31 
     32 type MqttAgent interface {
     33 	Setup(L *lua.LState)
     34 	Log(L *lua.LState, msg *MqttMessage)
     35 	Teardown(L *lua.LState)
     36 }
     37 
     38 type MqttReloadingAgent interface {
     39 	Setup(L *lua.LState)
     40 	Log(L *lua.LState, msg *MqttMessage)
     41 	ReloadBegin(oldL, newL *lua.LState)
     42 	ReloadAbort(oldL, newL *lua.LState)
     43 	ReloadEnd(oldL, newL *lua.LState)
     44 	Teardown(L *lua.LState)
     45 }
     46 
     47 type MqttMessage struct {
     48 	Timestamp float64
     49 	ClientId  int
     50 	Topic     []byte
     51 	Message   []byte
     52 }
     53 
     54 func Run(agent MqttAgent, main_script string, capacity int) {
     55 	fromMqtt := make(chan MqttMessage, capacity)
     56 
     57 	L := lua.NewState()
     58 	defer L.Close()
     59 
     60 	agent.Setup(L)
     61 	defer agent.Teardown(L)
     62 
     63 	hostname, err := os.Hostname()
     64 	if err != nil {
     65 		hostname = "<unknown>"
     66 	}
     67 
     68 	idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid())
     69 	registerMqttClientType(L)
     70 	registerTimerType(L)
     71 	registerState(L, idString, fromMqtt)
     72 	defer cleanupClients(L)
     73 
     74 	if err := L.DoFile(main_script); err != nil {
     75 		panic(err)
     76 	}
     77 
     78 	timer := time.NewTimer(0)
     79 	defer timer.Stop()
     80 
     81 	log.Println(idString, "started")
     82 
     83 	for {
     84 		select {
     85 		case msg, ok := <-fromMqtt:
     86 
     87 			if !ok {
     88 				log.Println("fromMqtt is closed")
     89 				break
     90 			}
     91 
     92 			processMsg(L, agent, &msg)
     93 
     94 		case <-timer.C:
     95 		}
     96 
     97 		runTimers(L, timer)
     98 
     99 		if stateReloadRequested(L) {
    100 			L = reload(L, agent, main_script)
    101 			runTimers(L, timer)
    102 			stateRequestReload(L, lua.LNil)
    103 		}
    104 
    105 		if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) {
    106 			break
    107 		}
    108 	}
    109 
    110 	log.Println(idString, "finished")
    111 }
    112 
    113 func cleanupClients(L *lua.LState) {
    114 	cnxTbl := stateCnxTable(L)
    115 	if cnxTbl == nil {
    116 		return
    117 	}
    118 
    119 	L.ForEach(cnxTbl, func(key, value lua.LValue) {
    120 		cnx := value.(*lua.LTable)
    121 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    122 		if err := client.Disconnect(nil); err != nil {
    123 			log.Printf("cleanup client %s: %v", lua.LVAsString(key), err)
    124 		}
    125 	})
    126 }
    127 
    128 func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) {
    129 	skey, ok := key.(lua.LString)
    130 	topic := string(msg.Topic)
    131 
    132 	if ok && match(topic, string(skey)) {
    133 		err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true},
    134 			cnx,
    135 			lua.LString(string(msg.Message)),
    136 			lua.LString(topic),
    137 			lua.LNumber(msg.Timestamp))
    138 		if err != nil {
    139 			panic(err)
    140 		}
    141 	}
    142 }
    143 
    144 func matchSliced(actual, filter []string) bool {
    145 	if len(filter) == 0 {
    146 		return len(actual) == 0
    147 	}
    148 
    149 	if filter[0] == "#" {
    150 		if len(filter) == 1 {
    151 			return true
    152 		}
    153 
    154 		for i := range actual {
    155 			if matchSliced(actual[i:], filter[1:]) {
    156 				return true
    157 			}
    158 		}
    159 
    160 		return false
    161 	}
    162 
    163 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    164 		return matchSliced(actual[1:], filter[1:])
    165 	}
    166 
    167 	return false
    168 }
    169 
    170 func match(actual, filter string) bool {
    171 	return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/"))
    172 }
    173 
    174 func tableIsEmpty(t *lua.LTable) bool {
    175 	key, _ := t.Next(lua.LNil)
    176 	return key == lua.LNil
    177 }
    178 
    179 func processMsg(L *lua.LState, agent MqttAgent, msg *MqttMessage) {
    180 	agent.Log(L, msg)
    181 
    182 	cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable)
    183 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    184 	L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) })
    185 
    186 	if tableIsEmpty(subTbl) {
    187 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    188 		if err := client.Disconnect(nil); err != nil {
    189 			log.Println("disconnect empty client:", err)
    190 		}
    191 		L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil)
    192 	}
    193 }
    194 
    195 func mqttRead(client *mqtt.Client, toLua chan<- MqttMessage, id int) {
    196 	var big *mqtt.BigMessage
    197 
    198 	for {
    199 		message, topic, err := client.ReadSlices()
    200 		t := float64(time.Now().UnixMicro()) * 1.0e-6
    201 
    202 		switch {
    203 		case err == nil:
    204 			toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)}
    205 
    206 		case errors.As(err, &big):
    207 			data, err := big.ReadAll()
    208 			if err != nil {
    209 				log.Println("mqttRead big message:", err)
    210 			} else {
    211 				toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data}
    212 			}
    213 
    214 		case errors.Is(err, mqtt.ErrClosed):
    215 			log.Println("mqttRead finishing:", err)
    216 			return
    217 
    218 		case mqtt.IsConnectionRefused(err):
    219 			log.Println("mqttRead connection refused:", err)
    220 			time.Sleep(15 * time.Minute)
    221 
    222 		default:
    223 			log.Println("mqttRead:", err)
    224 			time.Sleep(2 * time.Second)
    225 		}
    226 	}
    227 }
    228 
    229 func reload(oldL *lua.LState, agent MqttAgent, main_script string) *lua.LState {
    230 	log.Println("Reloading", main_script)
    231 	reloader, isReloader := agent.(MqttReloadingAgent)
    232 
    233 	newL := lua.NewState()
    234 
    235 	if isReloader {
    236 		reloader.ReloadBegin(oldL, newL)
    237 	} else {
    238 		agent.Setup(newL)
    239 	}
    240 
    241 	registerMqttClientType(newL)
    242 	registerTimerType(newL)
    243 
    244 	stateReloadBegin(oldL, newL)
    245 
    246 	if err := newL.DoFile(main_script); err != nil {
    247 		log.Println("Reload failed:", err)
    248 		stateReloadAbort(oldL, newL)
    249 		if isReloader {
    250 			reloader.ReloadAbort(oldL, newL)
    251 		} else {
    252 			agent.Teardown(newL)
    253 		}
    254 		newL.Close()
    255 		return oldL
    256 	} else {
    257 		stateReloadEnd(oldL, newL)
    258 		if isReloader {
    259 			reloader.ReloadEnd(oldL, newL)
    260 		} else {
    261 			agent.Teardown(oldL)
    262 		}
    263 		oldL.Close()
    264 		log.Println("Reload successful")
    265 		return newL
    266 	}
    267 }
    268 
    269 func dup(src []byte) []byte {
    270 	res := make([]byte, len(src))
    271 	copy(res, src)
    272 	return res
    273 }
    274 
    275 func newUserData(L *lua.LState, v interface{}) *lua.LUserData {
    276 	res := L.NewUserData()
    277 	res.Value = v
    278 	return res
    279 }
    280 
    281 /********** State Object in the Lua Interpreter **********/
    282 
    283 const luaStateName = "_mqttagent"
    284 const keyChanToLua = 1
    285 const keyClientPrefix = 2
    286 const keyClientNextId = 3
    287 const keyCfgMap = 4
    288 const keyCnxTable = 5
    289 const keyTimerTable = 6
    290 const keyReloadRequest = 7
    291 const keyOldCfgMap = 8
    292 
    293 func registerState(L *lua.LState, clientPrefix string, toLua chan<- MqttMessage) {
    294 	st := L.NewTable()
    295 	L.RawSetInt(st, keyChanToLua, newUserData(L, toLua))
    296 	L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix))
    297 	L.RawSetInt(st, keyClientNextId, lua.LNumber(1))
    298 	L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap)))
    299 	L.RawSetInt(st, keyCnxTable, L.NewTable())
    300 	L.RawSetInt(st, keyTimerTable, L.NewTable())
    301 	L.SetGlobal(luaStateName, st)
    302 	L.SetGlobal("reload", L.NewFunction(requestReload))
    303 }
    304 
    305 func stateReloadBegin(oldL, newL *lua.LState) {
    306 	oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable)
    307 	toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- MqttMessage)
    308 	clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix)
    309 	nextId := oldL.RawGetInt(oldSt, keyClientNextId)
    310 	cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    311 
    312 	st := newL.NewTable()
    313 	newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua))
    314 	newL.RawSetInt(st, keyClientPrefix, clientPrefix)
    315 	newL.RawSetInt(st, keyClientNextId, nextId)
    316 	newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap)))
    317 	newL.RawSetInt(st, keyCnxTable, newL.NewTable())
    318 	newL.RawSetInt(st, keyTimerTable, newL.NewTable())
    319 	newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap))
    320 	newL.SetGlobal(luaStateName, st)
    321 	newL.SetGlobal("reload", newL.NewFunction(requestReload))
    322 }
    323 
    324 func stateReloadAbort(oldL, newL *lua.LState) {
    325 	statePartialCleanup(newL, oldL)
    326 }
    327 
    328 func stateReloadEnd(oldL, newL *lua.LState) {
    329 	statePartialCleanup(oldL, newL)
    330 	newSt := newL.GetGlobal(luaStateName).(*lua.LTable)
    331 	newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil)
    332 }
    333 
    334 func statePartialCleanup(staleL, keptL *lua.LState) {
    335 	staleCnxTable := stateCnxTable(staleL)
    336 	keptCnxTable := stateCnxTable(keptL)
    337 
    338 	staleL.ForEach(staleCnxTable, func(key, value lua.LValue) {
    339 		clientId := int(key.(lua.LNumber))
    340 		if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil {
    341 			client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    342 			client.Close()
    343 		}
    344 	})
    345 
    346 	keptL.ForEach(keptCnxTable, func(key, value lua.LValue) {
    347 		clientId := int(key.(lua.LNumber))
    348 		keptCnx := value.(*lua.LTable)
    349 		if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) {
    350 			return
    351 		}
    352 
    353 		client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    354 		staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable)
    355 
    356 		if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) {
    357 			panic("This shouldn't happen")
    358 		}
    359 
    360 		keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable)
    361 		staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable)
    362 
    363 		staleL.ForEach(staleSub, func(key, _ lua.LValue) {
    364 			topic := string(key.(lua.LString))
    365 			if keptL.GetField(keptSub, topic) == lua.LNil {
    366 				log.Println("Unsubscribing from stale topic", topic)
    367 				if err := client.Unsubscribe(nil, topic); err != nil {
    368 					log.Println("Failed to unsubscribe:", err)
    369 				}
    370 			}
    371 		})
    372 
    373 		keptL.ForEach(keptSub, func(key, _ lua.LValue) {
    374 			topic := string(key.(lua.LString))
    375 			if staleL.GetField(staleSub, topic) == lua.LNil {
    376 				log.Println("Subscribing to new topic", topic)
    377 				if err := client.Subscribe(nil, topic); err != nil {
    378 					log.Println("Failed to subscribe:", err)
    379 				}
    380 			}
    381 		})
    382 
    383 		keptL.RawSetInt(keptCnx, keyReused, lua.LNil)
    384 	})
    385 }
    386 
    387 func stateValue(L *lua.LState, key int) lua.LValue {
    388 	st := L.GetGlobal(luaStateName)
    389 	return L.RawGetInt(st.(*lua.LTable), key)
    390 }
    391 
    392 func stateChanToLua(L *lua.LState) chan<- MqttMessage {
    393 	ud := stateValue(L, keyChanToLua)
    394 	return ud.(*lua.LUserData).Value.(chan<- MqttMessage)
    395 }
    396 
    397 func stateClientNextId(L *lua.LState) (int, string) {
    398 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    399 	result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber))
    400 	L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1))
    401 	prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix))
    402 	return result, fmt.Sprintf("%s-%d", prefix, result)
    403 }
    404 
    405 func stateCfgMap(L *lua.LState) mqttConfigMap {
    406 	return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    407 }
    408 
    409 func stateCnxTable(L *lua.LState) *lua.LTable {
    410 	return stateValue(L, keyCnxTable).(*lua.LTable)
    411 }
    412 
    413 func stateTimerTable(L *lua.LState) *lua.LTable {
    414 	return stateValue(L, keyTimerTable).(*lua.LTable)
    415 }
    416 
    417 func stateReloadRequested(L *lua.LState) bool {
    418 	return lua.LVAsBool(stateValue(L, keyReloadRequest))
    419 }
    420 
    421 func stateRequestReload(L *lua.LState, v lua.LValue) {
    422 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    423 	L.RawSetInt(st, keyReloadRequest, v)
    424 }
    425 
    426 func stateOldCfgMap(L *lua.LState) mqttConfigMap {
    427 	val := stateValue(L, keyOldCfgMap)
    428 	if val == lua.LNil {
    429 		return nil
    430 	} else {
    431 		return val.(*lua.LUserData).Value.(mqttConfigMap)
    432 	}
    433 }
    434 
    435 func requestReload(L *lua.LState) int {
    436 	stateRequestReload(L, lua.LTrue)
    437 	return 0
    438 }
    439 
    440 /********** Lua Object for MQTT client **********/
    441 
    442 const luaMqttClientTypeName = "mqttclient"
    443 const keyClient = 1
    444 const keySubTable = 2
    445 const keyReused = 3
    446 
    447 func registerMqttClientType(L *lua.LState) {
    448 	mt := L.NewTypeMetatable(luaMqttClientTypeName)
    449 	L.SetGlobal(luaMqttClientTypeName, mt)
    450 	L.SetField(mt, "new", L.NewFunction(newMqttClient))
    451 	L.SetField(mt, "__call", L.NewFunction(luaPublish))
    452 	L.SetField(mt, "__index", L.NewFunction(luaQuery))
    453 	L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe))
    454 }
    455 
    456 type mqttConfig struct {
    457 	Connection     string
    458 	TLS            bool
    459 	PauseTimeout   string
    460 	AtLeastOnceMax int
    461 	ExactlyOnceMax int
    462 	UserName       string
    463 	Password       string
    464 	Will           struct {
    465 		Topic       string
    466 		Message     string
    467 		Retain      bool
    468 		AtLeastOnce bool
    469 		ExactlyOnce bool
    470 	}
    471 	KeepAlive    uint16
    472 	CleanSession bool
    473 }
    474 
    475 type mqttClientEntry struct {
    476 	client *mqtt.Client
    477 	id     int
    478 }
    479 
    480 type mqttConfigMap map[mqttConfig]mqttClientEntry
    481 
    482 func mqttConfigBytes(src string) []byte {
    483 	if src == "" {
    484 		return nil
    485 	} else {
    486 		return []byte(src)
    487 	}
    488 }
    489 
    490 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) {
    491 	pto, err := time.ParseDuration(config.PauseTimeout)
    492 	if err != nil {
    493 		pto = time.Second
    494 	}
    495 
    496 	var dialer mqtt.Dialer
    497 
    498 	if config.TLS {
    499 		dialer = mqtt.NewTLSDialer("tcp", config.Connection, nil)
    500 	} else {
    501 		dialer = mqtt.NewDialer("tcp", config.Connection)
    502 	}
    503 
    504 	processed_cfg := mqtt.Config{
    505 		Dialer:         dialer,
    506 		PauseTimeout:   pto,
    507 		AtLeastOnceMax: config.AtLeastOnceMax,
    508 		ExactlyOnceMax: config.ExactlyOnceMax,
    509 		UserName:       config.UserName,
    510 		Password:       mqttConfigBytes(config.Password),
    511 		Will: struct {
    512 			Topic       string
    513 			Message     []byte
    514 			Retain      bool
    515 			AtLeastOnce bool
    516 			ExactlyOnce bool
    517 		}{
    518 			Topic:       config.Will.Topic,
    519 			Message:     mqttConfigBytes(config.Will.Message),
    520 			Retain:      config.Will.Retain,
    521 			AtLeastOnce: config.Will.AtLeastOnce,
    522 			ExactlyOnce: config.Will.ExactlyOnce,
    523 		},
    524 		KeepAlive:    config.KeepAlive,
    525 		CleanSession: config.CleanSession,
    526 	}
    527 
    528 	return mqtt.VolatileSession(id, &processed_cfg)
    529 }
    530 
    531 func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue {
    532 	res := L.NewTable()
    533 	L.RawSetInt(res, keyClient, newUserData(L, client))
    534 	L.RawSetInt(res, keySubTable, L.NewTable())
    535 	if reused {
    536 		L.RawSetInt(res, keyReused, lua.LTrue)
    537 	}
    538 	L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName))
    539 	L.RawSetInt(stateCnxTable(L), id, res)
    540 	return res
    541 }
    542 
    543 func newMqttClient(L *lua.LState) int {
    544 	var config mqttConfig
    545 	if err := gluamapper.Map(L.CheckTable(1), &config); err != nil {
    546 		log.Println("newMqttClient:", err)
    547 		L.Push(lua.LNil)
    548 		L.Push(lua.LString(err.Error()))
    549 		return 2
    550 	}
    551 
    552 	cfgMap := stateCfgMap(L)
    553 
    554 	if cfg, found := cfgMap[config]; found {
    555 		res := L.RawGetInt(stateCnxTable(L), cfg.id)
    556 		tbl := res.(*lua.LTable)
    557 		if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client {
    558 			panic("Inconsistent configuration table")
    559 		}
    560 
    561 		L.Push(res)
    562 		return 1
    563 	}
    564 
    565 	if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil {
    566 		if cfg, found := oldCfgMap[config]; found {
    567 			cfgMap[config] = cfg
    568 			L.Push(registerClient(L, cfg.id, cfg.client, true))
    569 			return 1
    570 		}
    571 	}
    572 
    573 	id, idString := stateClientNextId(L)
    574 	client, err := newClient(&config, idString)
    575 	if err != nil {
    576 		log.Println("newMqttClient:", err)
    577 		L.Push(lua.LNil)
    578 		L.Push(lua.LString(err.Error()))
    579 		return 2
    580 	}
    581 	go mqttRead(client, stateChanToLua(L), id)
    582 
    583 	cfgMap[config] = mqttClientEntry{id: id, client: client}
    584 
    585 	L.Push(registerClient(L, id, client, false))
    586 	return 1
    587 }
    588 
    589 func luaPublish(L *lua.LState) int {
    590 	cnx := L.CheckTable(1)
    591 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    592 
    593 	if L.GetTop() == 1 {
    594 		if err := client.Ping(nil); err != nil {
    595 			log.Println("luaPing:", err)
    596 			L.Push(lua.LNil)
    597 			L.Push(lua.LString(err.Error()))
    598 			return 2
    599 		} else {
    600 			L.Push(lua.LTrue)
    601 			return 1
    602 		}
    603 	}
    604 
    605 	message := L.CheckString(2)
    606 	topic := L.CheckString(3)
    607 
    608 	if err := client.Publish(nil, []byte(message), topic); err != nil {
    609 		L.Push(lua.LNil)
    610 		L.Push(lua.LString(err.Error()))
    611 		return 2
    612 	} else {
    613 		L.Push(lua.LTrue)
    614 		return 1
    615 	}
    616 }
    617 
    618 func luaQuery(L *lua.LState) int {
    619 	cnx := L.CheckTable(1)
    620 	topic := L.CheckString(2)
    621 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    622 	L.Push(L.GetField(subTbl, topic))
    623 	return 1
    624 }
    625 
    626 func luaSubscribe(L *lua.LState) int {
    627 	var err error
    628 	cnx := L.CheckTable(1)
    629 	topic := L.CheckString(2)
    630 	callback := L.OptFunction(3, nil)
    631 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    632 	tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    633 
    634 	_, is_new := L.GetField(tbl, topic).(*lua.LNilType)
    635 
    636 	if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
    637 		if callback == nil {
    638 			err = client.Unsubscribe(nil, topic)
    639 		} else if is_new {
    640 			err = client.Subscribe(nil, topic)
    641 		}
    642 	}
    643 
    644 	if err != nil {
    645 		log.Println("luaSubscribe:", err)
    646 		L.Push(lua.LNil)
    647 		L.Push(lua.LString(err.Error()))
    648 		return 2
    649 	} else {
    650 		if callback == nil {
    651 			if is_new {
    652 				log.Printf("Not subscribed to %q", topic)
    653 			} else {
    654 				log.Printf("Unsubscribed from %q", topic)
    655 			}
    656 			L.SetField(tbl, topic, lua.LNil)
    657 		} else {
    658 			if is_new {
    659 				log.Printf("Subscribed to %q", topic)
    660 			} else {
    661 				log.Printf("Updating subscription to %q", topic)
    662 			}
    663 			L.SetField(tbl, topic, callback)
    664 		}
    665 
    666 		L.Push(lua.LTrue)
    667 		return 1
    668 	}
    669 }
    670 
    671 /********** Lua Object for timers **********/
    672 
    673 const luaTimerTypeName = "timer"
    674 
    675 func registerTimerType(L *lua.LState) {
    676 	mt := L.NewTypeMetatable(luaTimerTypeName)
    677 	L.SetGlobal(luaTimerTypeName, mt)
    678 	L.SetField(mt, "new", L.NewFunction(newTimer))
    679 	L.SetField(mt, "schedule", L.NewFunction(timerSchedule))
    680 	L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods))
    681 }
    682 
    683 func newTimer(L *lua.LState) int {
    684 	atTime := L.Get(1)
    685 	cb := L.CheckFunction(2)
    686 	L.Pop(2)
    687 	L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName))
    688 	L.Push(cb)
    689 	L.Push(atTime)
    690 	return timerSchedule(L)
    691 }
    692 
    693 var timerMethods = map[string]lua.LGFunction{
    694 	"cancel":   timerCancel,
    695 	"schedule": timerSchedule,
    696 }
    697 
    698 func timerCancel(L *lua.LState) int {
    699 	timer := L.CheckFunction(1)
    700 	L.RawSet(stateTimerTable(L), timer, lua.LNil)
    701 	return 0
    702 }
    703 
    704 func timerSchedule(L *lua.LState) int {
    705 	timer := L.CheckFunction(1)
    706 	atTime := lua.LNil
    707 	if L.Get(2) != lua.LNil {
    708 		atTime = L.CheckNumber(2)
    709 	}
    710 
    711 	L.RawSet(stateTimerTable(L), timer, atTime)
    712 	return 0
    713 }
    714 
    715 func toTime(lsec lua.LNumber) time.Time {
    716 	fsec := float64(lsec)
    717 	sec := int64(fsec)
    718 	nsec := int64((fsec - float64(sec)) * 1.0e9)
    719 
    720 	return time.Unix(sec, nsec)
    721 }
    722 
    723 func runTimers(L *lua.LState, parentTimer *time.Timer) {
    724 	hasNext := false
    725 	var nextTime time.Time
    726 
    727 	now := time.Now()
    728 	timers := stateTimerTable(L)
    729 
    730 	timer, luaT := timers.Next(lua.LNil)
    731 	for timer != lua.LNil {
    732 		t := toTime(luaT.(lua.LNumber))
    733 		if t.Compare(now) <= 0 {
    734 			L.RawSet(timers, timer, lua.LNil)
    735 			err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT)
    736 			if err != nil {
    737 				panic(err)
    738 			}
    739 			timer = lua.LNil
    740 			hasNext = false
    741 		} else if !hasNext || t.Compare(nextTime) < 0 {
    742 			hasNext = true
    743 			nextTime = t
    744 		}
    745 
    746 		timer, luaT = timers.Next(timer)
    747 	}
    748 
    749 	if hasNext {
    750 		parentTimer.Reset(time.Until(nextTime))
    751 	} else {
    752 		parentTimer.Stop()
    753 	}
    754 }