mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

mqttagent.go (18976B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package mqttagent
     18 
     19 import (
     20 	"errors"
     21 	"fmt"
     22 	"log"
     23 	"os"
     24 	"strings"
     25 	"time"
     26 
     27 	"github.com/go-mqtt/mqtt"
     28 	"github.com/yuin/gluamapper"
     29 	"github.com/yuin/gopher-lua"
     30 )
     31 
     32 type MqttAgent interface {
     33 	Setup(L *lua.LState)
     34 	Teardown(L *lua.LState)
     35 }
     36 
     37 type MqttReloadingAgent interface {
     38 	Setup(L *lua.LState)
     39 	ReloadBegin(oldL, newL *lua.LState)
     40 	ReloadAbort(oldL, newL *lua.LState)
     41 	ReloadEnd(oldL, newL *lua.LState)
     42 	Teardown(L *lua.LState)
     43 }
     44 
     45 type mqttMessage struct {
     46 	Timestamp float64
     47 	ClientId  int
     48 	Topic     []byte
     49 	Message   []byte
     50 }
     51 
     52 func Run(agent MqttAgent, main_script string, capacity int) {
     53 	fromMqtt := make(chan mqttMessage, capacity)
     54 
     55 	L := lua.NewState()
     56 	defer L.Close()
     57 
     58 	agent.Setup(L)
     59 	defer agent.Teardown(L)
     60 
     61 	hostname, err := os.Hostname()
     62 	if err != nil {
     63 		hostname = "<unknown>"
     64 	}
     65 
     66 	idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid())
     67 	registerMqttClientType(L)
     68 	registerTimerType(L)
     69 	registerState(L, agent, idString, fromMqtt)
     70 	defer cleanupClients(L)
     71 
     72 	if err := L.DoFile(main_script); err != nil {
     73 		panic(err)
     74 	}
     75 
     76 	timer := time.NewTimer(0)
     77 	defer timer.Stop()
     78 
     79 	log.Println(idString, "started")
     80 
     81 	for {
     82 		select {
     83 		case msg, ok := <-fromMqtt:
     84 
     85 			if !ok {
     86 				log.Println("fromMqtt is closed")
     87 				break
     88 			}
     89 
     90 			processMsg(L, &msg)
     91 
     92 		case <-timer.C:
     93 		}
     94 
     95 		runTimers(L, timer)
     96 
     97 		if stateReloadRequested(L) {
     98 			L = reload(L, main_script)
     99 			runTimers(L, timer)
    100 			stateRequestReload(L, lua.LNil)
    101 		}
    102 
    103 		if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) {
    104 			break
    105 		}
    106 	}
    107 
    108 	log.Println(idString, "finished")
    109 }
    110 
    111 func cleanupClients(L *lua.LState) {
    112 	cnxTbl := stateCnxTable(L)
    113 	if cnxTbl == nil {
    114 		return
    115 	}
    116 
    117 	L.ForEach(cnxTbl, func(key, value lua.LValue) {
    118 		cnx := value.(*lua.LTable)
    119 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    120 		if err := client.Disconnect(nil); err != nil {
    121 			log.Printf("cleanup client %s: %v", lua.LVAsString(key), err)
    122 		}
    123 	})
    124 }
    125 
    126 func dispatchMsg(L *lua.LState, msg *mqttMessage, cnx, key, value lua.LValue) {
    127 	skey, ok := key.(lua.LString)
    128 	topic := string(msg.Topic)
    129 
    130 	if ok && match(topic, string(skey)) {
    131 		err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true},
    132 			cnx,
    133 			lua.LString(string(msg.Message)),
    134 			lua.LString(topic),
    135 			lua.LNumber(msg.Timestamp))
    136 		if err != nil {
    137 			panic(err)
    138 		}
    139 	}
    140 }
    141 
    142 func matchSliced(actual, filter []string) bool {
    143 	if len(filter) == 0 {
    144 		return len(actual) == 0
    145 	}
    146 
    147 	if filter[0] == "#" {
    148 		if len(filter) == 1 {
    149 			return true
    150 		}
    151 
    152 		for i := range actual {
    153 			if matchSliced(actual[i:], filter[1:]) {
    154 				return true
    155 			}
    156 		}
    157 
    158 		return false
    159 	}
    160 
    161 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    162 		return matchSliced(actual[1:], filter[1:])
    163 	}
    164 
    165 	return false
    166 }
    167 
    168 func match(actual, filter string) bool {
    169 	return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/"))
    170 }
    171 
    172 func tableIsEmpty(t *lua.LTable) bool {
    173 	key, _ := t.Next(lua.LNil)
    174 	return key == lua.LNil
    175 }
    176 
    177 func processMsg(L *lua.LState, msg *mqttMessage) {
    178 	cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable)
    179 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    180 	L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) })
    181 
    182 	if tableIsEmpty(subTbl) {
    183 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    184 		if err := client.Disconnect(nil); err != nil {
    185 			log.Println("disconnect empty client:", err)
    186 		}
    187 		L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil)
    188 	}
    189 }
    190 
    191 func mqttRead(client *mqtt.Client, toLua chan<- mqttMessage, id int) {
    192 	var big *mqtt.BigMessage
    193 
    194 	for {
    195 		message, topic, err := client.ReadSlices()
    196 		t := float64(time.Now().UnixMicro()) * 1.0e-6
    197 
    198 		switch {
    199 		case err == nil:
    200 			toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)}
    201 
    202 		case errors.As(err, &big):
    203 			data, err := big.ReadAll()
    204 			if err != nil {
    205 				log.Println("mqttRead big message:", err)
    206 			} else {
    207 				toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data}
    208 			}
    209 
    210 		case errors.Is(err, mqtt.ErrClosed):
    211 			log.Println("mqttRead finishing:", err)
    212 			return
    213 
    214 		case mqtt.IsConnectionRefused(err):
    215 			log.Println("mqttRead connection refused:", err)
    216 			time.Sleep(15 * time.Minute)
    217 
    218 		default:
    219 			log.Println("mqttRead:", err)
    220 			time.Sleep(2 * time.Second)
    221 		}
    222 	}
    223 }
    224 
    225 func reload(oldL *lua.LState, main_script string) *lua.LState {
    226 	log.Println("Reloading", main_script)
    227 	agent := stateAgent(oldL)
    228 	reloader, isReloader := agent.(MqttReloadingAgent)
    229 
    230 	newL := lua.NewState()
    231 
    232 	if isReloader {
    233 		reloader.ReloadBegin(oldL, newL)
    234 	} else {
    235 		agent.Setup(newL)
    236 	}
    237 
    238 	registerMqttClientType(newL)
    239 	registerTimerType(newL)
    240 
    241 	stateReloadBegin(oldL, newL)
    242 
    243 	if err := newL.DoFile(main_script); err != nil {
    244 		log.Println("Reload failed:", err)
    245 		stateReloadAbort(oldL, newL)
    246 		if isReloader {
    247 			reloader.ReloadAbort(oldL, newL)
    248 		} else {
    249 			agent.Teardown(newL)
    250 		}
    251 		newL.Close()
    252 		return oldL
    253 	} else {
    254 		stateReloadEnd(oldL, newL)
    255 		if isReloader {
    256 			reloader.ReloadEnd(oldL, newL)
    257 		} else {
    258 			agent.Teardown(oldL)
    259 		}
    260 		oldL.Close()
    261 		log.Println("Reload successful")
    262 		return newL
    263 	}
    264 }
    265 
    266 func dup(src []byte) []byte {
    267 	res := make([]byte, len(src))
    268 	copy(res, src)
    269 	return res
    270 }
    271 
    272 func newUserData(L *lua.LState, v interface{}) *lua.LUserData {
    273 	res := L.NewUserData()
    274 	res.Value = v
    275 	return res
    276 }
    277 
    278 /********** State Object in the Lua Interpreter **********/
    279 
    280 const luaStateName = "_mqttagent"
    281 const keyChanToLua = 1
    282 const keyAgent = 2
    283 const keyClientPrefix = 3
    284 const keyClientNextId = 4
    285 const keyCfgMap = 5
    286 const keyCnxTable = 6
    287 const keyTimerTable = 7
    288 const keyReloadRequest = 8
    289 const keyOldCfgMap = 9
    290 
    291 func registerState(L *lua.LState, agent MqttAgent, clientPrefix string, toLua chan<- mqttMessage) {
    292 	st := L.NewTable()
    293 	L.RawSetInt(st, keyChanToLua, newUserData(L, toLua))
    294 	L.RawSetInt(st, keyAgent, newUserData(L, agent))
    295 	L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix))
    296 	L.RawSetInt(st, keyClientNextId, lua.LNumber(1))
    297 	L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap)))
    298 	L.RawSetInt(st, keyCnxTable, L.NewTable())
    299 	L.RawSetInt(st, keyTimerTable, L.NewTable())
    300 	L.SetGlobal(luaStateName, st)
    301 	L.SetGlobal("reload", L.NewFunction(requestReload))
    302 }
    303 
    304 func stateReloadBegin(oldL, newL *lua.LState) {
    305 	oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable)
    306 	toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- mqttMessage)
    307 	agent := oldL.RawGetInt(oldSt, keyAgent).(*lua.LUserData).Value.(MqttAgent)
    308 	clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix)
    309 	nextId := oldL.RawGetInt(oldSt, keyClientNextId)
    310 	cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    311 
    312 	st := newL.NewTable()
    313 	newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua))
    314 	newL.RawSetInt(st, keyAgent, newUserData(newL, agent))
    315 	newL.RawSetInt(st, keyClientPrefix, clientPrefix)
    316 	newL.RawSetInt(st, keyClientNextId, nextId)
    317 	newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap)))
    318 	newL.RawSetInt(st, keyCnxTable, newL.NewTable())
    319 	newL.RawSetInt(st, keyTimerTable, newL.NewTable())
    320 	newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap))
    321 	newL.SetGlobal(luaStateName, st)
    322 	newL.SetGlobal("reload", newL.NewFunction(requestReload))
    323 }
    324 
    325 func stateReloadAbort(oldL, newL *lua.LState) {
    326 	statePartialCleanup(newL, oldL)
    327 }
    328 
    329 func stateReloadEnd(oldL, newL *lua.LState) {
    330 	statePartialCleanup(oldL, newL)
    331 	newSt := newL.GetGlobal(luaStateName).(*lua.LTable)
    332 	newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil)
    333 }
    334 
    335 func statePartialCleanup(staleL, keptL *lua.LState) {
    336 	staleCnxTable := stateCnxTable(staleL)
    337 	keptCnxTable := stateCnxTable(keptL)
    338 
    339 	staleL.ForEach(staleCnxTable, func(key, value lua.LValue) {
    340 		clientId := int(key.(lua.LNumber))
    341 		if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil {
    342 			client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    343 			client.Close()
    344 		}
    345 	})
    346 
    347 	keptL.ForEach(keptCnxTable, func(key, value lua.LValue) {
    348 		clientId := int(key.(lua.LNumber))
    349 		keptCnx := value.(*lua.LTable)
    350 		if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) {
    351 			return
    352 		}
    353 
    354 		client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    355 		staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable)
    356 
    357 		if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) {
    358 			panic("This shouldn't happen")
    359 		}
    360 
    361 		keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable)
    362 		staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable)
    363 
    364 		staleL.ForEach(staleSub, func(key, _ lua.LValue) {
    365 			topic := string(key.(lua.LString))
    366 			if keptL.GetField(keptSub, topic) == lua.LNil {
    367 				log.Println("Unsubscribing from stale topic", topic)
    368 				if err := client.Unsubscribe(nil, topic); err != nil {
    369 					log.Println("Failed to unsubscribe:", err)
    370 				}
    371 			}
    372 		})
    373 
    374 		keptL.ForEach(keptSub, func(key, _ lua.LValue) {
    375 			topic := string(key.(lua.LString))
    376 			if staleL.GetField(staleSub, topic) == lua.LNil {
    377 				log.Println("Subscribing to new topic", topic)
    378 				if err := client.Subscribe(nil, topic); err != nil {
    379 					log.Println("Failed to subscribe:", err)
    380 				}
    381 			}
    382 		})
    383 
    384 		keptL.RawSetInt(keptCnx, keyReused, lua.LNil)
    385 	})
    386 }
    387 
    388 func stateValue(L *lua.LState, key int) lua.LValue {
    389 	st := L.GetGlobal(luaStateName)
    390 	return L.RawGetInt(st.(*lua.LTable), key)
    391 }
    392 
    393 func stateChanToLua(L *lua.LState) chan<- mqttMessage {
    394 	ud := stateValue(L, keyChanToLua)
    395 	return ud.(*lua.LUserData).Value.(chan<- mqttMessage)
    396 }
    397 
    398 func stateAgent(L *lua.LState) MqttAgent {
    399 	ud := stateValue(L, keyAgent)
    400 	return ud.(*lua.LUserData).Value.(MqttAgent)
    401 }
    402 
    403 func stateClientNextId(L *lua.LState) (int, string) {
    404 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    405 	result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber))
    406 	L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1))
    407 	prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix))
    408 	return result, fmt.Sprintf("%s-%d", prefix, result)
    409 }
    410 
    411 func stateCfgMap(L *lua.LState) mqttConfigMap {
    412 	return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    413 }
    414 
    415 func stateCnxTable(L *lua.LState) *lua.LTable {
    416 	return stateValue(L, keyCnxTable).(*lua.LTable)
    417 }
    418 
    419 func stateTimerTable(L *lua.LState) *lua.LTable {
    420 	return stateValue(L, keyTimerTable).(*lua.LTable)
    421 }
    422 
    423 func stateReloadRequested(L *lua.LState) bool {
    424 	return lua.LVAsBool(stateValue(L, keyReloadRequest))
    425 }
    426 
    427 func stateRequestReload(L *lua.LState, v lua.LValue) {
    428 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    429 	L.RawSetInt(st, keyReloadRequest, v)
    430 }
    431 
    432 func stateOldCfgMap(L *lua.LState) mqttConfigMap {
    433 	val := stateValue(L, keyOldCfgMap)
    434 	if val == lua.LNil {
    435 		return nil
    436 	} else {
    437 		return val.(*lua.LUserData).Value.(mqttConfigMap)
    438 	}
    439 }
    440 
    441 func requestReload(L *lua.LState) int {
    442 	stateRequestReload(L, lua.LTrue)
    443 	return 0
    444 }
    445 
    446 /********** Lua Object for MQTT client **********/
    447 
    448 const luaMqttClientTypeName = "mqttclient"
    449 const keyClient = 1
    450 const keySubTable = 2
    451 const keyReused = 3
    452 
    453 func registerMqttClientType(L *lua.LState) {
    454 	mt := L.NewTypeMetatable(luaMqttClientTypeName)
    455 	L.SetGlobal(luaMqttClientTypeName, mt)
    456 	L.SetField(mt, "new", L.NewFunction(newMqttClient))
    457 	L.SetField(mt, "__call", L.NewFunction(luaPublish))
    458 	L.SetField(mt, "__index", L.NewFunction(luaQuery))
    459 	L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe))
    460 }
    461 
    462 type mqttConfig struct {
    463 	Connection     string
    464 	TLS            bool
    465 	PauseTimeout   string
    466 	AtLeastOnceMax int
    467 	ExactlyOnceMax int
    468 	UserName       string
    469 	Password       string
    470 	Will           struct {
    471 		Topic       string
    472 		Message     string
    473 		Retain      bool
    474 		AtLeastOnce bool
    475 		ExactlyOnce bool
    476 	}
    477 	KeepAlive    uint16
    478 	CleanSession bool
    479 }
    480 
    481 type mqttClientEntry struct {
    482 	client *mqtt.Client
    483 	id     int
    484 }
    485 
    486 type mqttConfigMap map[mqttConfig]mqttClientEntry
    487 
    488 func mqttConfigBytes(src string) []byte {
    489 	if src == "" {
    490 		return nil
    491 	} else {
    492 		return []byte(src)
    493 	}
    494 }
    495 
    496 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) {
    497 	pto, err := time.ParseDuration(config.PauseTimeout)
    498 	if err != nil {
    499 		pto = time.Second
    500 	}
    501 
    502 	var dialer mqtt.Dialer
    503 
    504 	if config.TLS {
    505 		dialer = mqtt.NewTLSDialer("tcp", config.Connection, nil)
    506 	} else {
    507 		dialer = mqtt.NewDialer("tcp", config.Connection)
    508 	}
    509 
    510 	processed_cfg := mqtt.Config{
    511 		Dialer:         dialer,
    512 		PauseTimeout:   pto,
    513 		AtLeastOnceMax: config.AtLeastOnceMax,
    514 		ExactlyOnceMax: config.ExactlyOnceMax,
    515 		UserName:       config.UserName,
    516 		Password:       mqttConfigBytes(config.Password),
    517 		Will: struct {
    518 			Topic       string
    519 			Message     []byte
    520 			Retain      bool
    521 			AtLeastOnce bool
    522 			ExactlyOnce bool
    523 		}{
    524 			Topic:       config.Will.Topic,
    525 			Message:     mqttConfigBytes(config.Will.Message),
    526 			Retain:      config.Will.Retain,
    527 			AtLeastOnce: config.Will.AtLeastOnce,
    528 			ExactlyOnce: config.Will.ExactlyOnce,
    529 		},
    530 		KeepAlive:    config.KeepAlive,
    531 		CleanSession: config.CleanSession,
    532 	}
    533 
    534 	return mqtt.VolatileSession(id, &processed_cfg)
    535 }
    536 
    537 func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue {
    538 	res := L.NewTable()
    539 	L.RawSetInt(res, keyClient, newUserData(L, client))
    540 	L.RawSetInt(res, keySubTable, L.NewTable())
    541 	if reused {
    542 		L.RawSetInt(res, keyReused, lua.LTrue)
    543 	}
    544 	L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName))
    545 	L.RawSetInt(stateCnxTable(L), id, res)
    546 	return res
    547 }
    548 
    549 func newMqttClient(L *lua.LState) int {
    550 	var config mqttConfig
    551 	if err := gluamapper.Map(L.CheckTable(1), &config); err != nil {
    552 		log.Println("newMqttClient:", err)
    553 		L.Push(lua.LNil)
    554 		L.Push(lua.LString(err.Error()))
    555 		return 2
    556 	}
    557 
    558 	cfgMap := stateCfgMap(L)
    559 
    560 	if cfg, found := cfgMap[config]; found {
    561 		res := L.RawGetInt(stateCnxTable(L), cfg.id)
    562 		tbl := res.(*lua.LTable)
    563 		if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client {
    564 			panic("Inconsistent configuration table")
    565 		}
    566 
    567 		L.Push(res)
    568 		return 1
    569 	}
    570 
    571 	if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil {
    572 		if cfg, found := oldCfgMap[config]; found {
    573 			cfgMap[config] = cfg
    574 			L.Push(registerClient(L, cfg.id, cfg.client, true))
    575 			return 1
    576 		}
    577 	}
    578 
    579 	id, idString := stateClientNextId(L)
    580 	client, err := newClient(&config, idString)
    581 	if err != nil {
    582 		log.Println("newMqttClient:", err)
    583 		L.Push(lua.LNil)
    584 		L.Push(lua.LString(err.Error()))
    585 		return 2
    586 	}
    587 	go mqttRead(client, stateChanToLua(L), id)
    588 
    589 	cfgMap[config] = mqttClientEntry{id: id, client: client}
    590 
    591 	L.Push(registerClient(L, id, client, false))
    592 	return 1
    593 }
    594 
    595 func luaPublish(L *lua.LState) int {
    596 	cnx := L.CheckTable(1)
    597 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    598 
    599 	if L.GetTop() == 1 {
    600 		if err := client.Ping(nil); err != nil {
    601 			log.Println("luaPing:", err)
    602 			L.Push(lua.LNil)
    603 			L.Push(lua.LString(err.Error()))
    604 			return 2
    605 		} else {
    606 			L.Push(lua.LTrue)
    607 			return 1
    608 		}
    609 	}
    610 
    611 	message := L.CheckString(2)
    612 	topic := L.CheckString(3)
    613 
    614 	if err := client.Publish(nil, []byte(message), topic); err != nil {
    615 		L.Push(lua.LNil)
    616 		L.Push(lua.LString(err.Error()))
    617 		return 2
    618 	} else {
    619 		L.Push(lua.LTrue)
    620 		return 1
    621 	}
    622 }
    623 
    624 func luaQuery(L *lua.LState) int {
    625 	cnx := L.CheckTable(1)
    626 	topic := L.CheckString(2)
    627 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    628 	L.Push(L.GetField(subTbl, topic))
    629 	return 1
    630 }
    631 
    632 func luaSubscribe(L *lua.LState) int {
    633 	var err error
    634 	cnx := L.CheckTable(1)
    635 	topic := L.CheckString(2)
    636 	callback := L.OptFunction(3, nil)
    637 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    638 	tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    639 
    640 	_, is_new := L.GetField(tbl, topic).(*lua.LNilType)
    641 
    642 	if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
    643 		if callback == nil {
    644 			err = client.Unsubscribe(nil, topic)
    645 		} else if is_new {
    646 			err = client.Subscribe(nil, topic)
    647 		}
    648 	}
    649 
    650 	if err != nil {
    651 		log.Println("luaSubscribe:", err)
    652 		L.Push(lua.LNil)
    653 		L.Push(lua.LString(err.Error()))
    654 		return 2
    655 	} else {
    656 		if callback == nil {
    657 			if is_new {
    658 				log.Printf("Not subscribed to %q", topic)
    659 			} else {
    660 				log.Printf("Unsubscribed from %q", topic)
    661 			}
    662 			L.SetField(tbl, topic, lua.LNil)
    663 		} else {
    664 			if is_new {
    665 				log.Printf("Subscribed to %q", topic)
    666 			} else {
    667 				log.Printf("Updating subscription to %q", topic)
    668 			}
    669 			L.SetField(tbl, topic, callback)
    670 		}
    671 
    672 		L.Push(lua.LTrue)
    673 		return 1
    674 	}
    675 }
    676 
    677 /********** Lua Object for timers **********/
    678 
    679 const luaTimerTypeName = "timer"
    680 
    681 func registerTimerType(L *lua.LState) {
    682 	mt := L.NewTypeMetatable(luaTimerTypeName)
    683 	L.SetGlobal(luaTimerTypeName, mt)
    684 	L.SetField(mt, "new", L.NewFunction(newTimer))
    685 	L.SetField(mt, "schedule", L.NewFunction(timerSchedule))
    686 	L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods))
    687 }
    688 
    689 func newTimer(L *lua.LState) int {
    690 	atTime := L.Get(1)
    691 	cb := L.CheckFunction(2)
    692 	L.Pop(2)
    693 	L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName))
    694 	L.Push(cb)
    695 	L.Push(atTime)
    696 	return timerSchedule(L)
    697 }
    698 
    699 var timerMethods = map[string]lua.LGFunction{
    700 	"cancel":   timerCancel,
    701 	"schedule": timerSchedule,
    702 }
    703 
    704 func timerCancel(L *lua.LState) int {
    705 	timer := L.CheckFunction(1)
    706 	L.RawSet(stateTimerTable(L), timer, lua.LNil)
    707 	return 0
    708 }
    709 
    710 func timerSchedule(L *lua.LState) int {
    711 	timer := L.CheckFunction(1)
    712 	atTime := lua.LNil
    713 	if L.Get(2) != lua.LNil {
    714 		atTime = L.CheckNumber(2)
    715 	}
    716 
    717 	L.RawSet(stateTimerTable(L), timer, atTime)
    718 	return 0
    719 }
    720 
    721 func toTime(lsec lua.LNumber) time.Time {
    722 	fsec := float64(lsec)
    723 	sec := int64(fsec)
    724 	nsec := int64((fsec - float64(sec)) * 1.0e9)
    725 
    726 	return time.Unix(sec, nsec)
    727 }
    728 
    729 func runTimers(L *lua.LState, parentTimer *time.Timer) {
    730 	hasNext := false
    731 	var nextTime time.Time
    732 
    733 	now := time.Now()
    734 	timers := stateTimerTable(L)
    735 
    736 	timer, luaT := timers.Next(lua.LNil)
    737 	for timer != lua.LNil {
    738 		t := toTime(luaT.(lua.LNumber))
    739 		if t.Compare(now) <= 0 {
    740 			L.RawSet(timers, timer, lua.LNil)
    741 			err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT)
    742 			if err != nil {
    743 				panic(err)
    744 			}
    745 			timer = lua.LNil
    746 			hasNext = false
    747 		} else if !hasNext || t.Compare(nextTime) < 0 {
    748 			hasNext = true
    749 			nextTime = t
    750 		}
    751 
    752 		timer, luaT = timers.Next(timer)
    753 	}
    754 
    755 	if hasNext {
    756 		parentTimer.Reset(time.Until(nextTime))
    757 	} else {
    758 		parentTimer.Stop()
    759 	}
    760 }