mqttagent

MQTT Lua Agent
git clone https://git.instinctive.eu/mqttagent.git
Log | Files | Refs | README | LICENSE

mqttagent.go (21029B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package mqttagent
     18 
     19 import (
     20 	"errors"
     21 	"fmt"
     22 	"log"
     23 	"os"
     24 	"strings"
     25 	"time"
     26 
     27 	"github.com/go-mqtt/mqtt"
     28 	"github.com/yuin/gluamapper"
     29 	"github.com/yuin/gopher-lua"
     30 )
     31 
     32 type MqttAgent interface {
     33 	Setup(L *lua.LState)
     34 	Teardown(L *lua.LState)
     35 }
     36 
     37 type MqttReloadingAgent interface {
     38 	Setup(L *lua.LState)
     39 	ReloadBegin(oldL, newL *lua.LState)
     40 	ReloadAbort(oldL, newL *lua.LState)
     41 	ReloadEnd(oldL, newL *lua.LState)
     42 	Teardown(L *lua.LState)
     43 }
     44 
     45 type mqttMessage struct {
     46 	Timestamp float64
     47 	ClientId  int
     48 	Topic     []byte
     49 	Message   []byte
     50 }
     51 
     52 const internalOffline = "$SYS/self/offline"
     53 const internalOnline = "$SYS/self/online"
     54 
     55 func isInternal(topic string) bool {
     56 	return strings.HasPrefix(topic, "$SYS/self")
     57 }
     58 
     59 func Run(agent MqttAgent, main_script string, capacity int) {
     60 	fromMqtt := make(chan mqttMessage, capacity)
     61 
     62 	L := lua.NewState()
     63 	defer L.Close()
     64 
     65 	agent.Setup(L)
     66 	defer agent.Teardown(L)
     67 
     68 	hostname, err := os.Hostname()
     69 	if err != nil {
     70 		hostname = "<unknown>"
     71 	}
     72 
     73 	idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid())
     74 	registerMqttClientType(L)
     75 	registerTimerType(L)
     76 	registerState(L, agent, idString, fromMqtt)
     77 	defer cleanupClients(L)
     78 
     79 	if err := L.DoFile(main_script); err != nil {
     80 		panic(err)
     81 	}
     82 
     83 	timer := time.NewTimer(0)
     84 	defer timer.Stop()
     85 
     86 	log.Println(idString, "started")
     87 
     88 	for {
     89 		select {
     90 		case msg, ok := <-fromMqtt:
     91 
     92 			if !ok {
     93 				log.Println("fromMqtt is closed")
     94 				break
     95 			}
     96 
     97 			processMsg(L, &msg)
     98 
     99 		case <-timer.C:
    100 		}
    101 
    102 		runTimers(L, timer)
    103 
    104 		if stateReloadRequested(L) {
    105 			L = reload(L, main_script)
    106 			runTimers(L, timer)
    107 			stateRequestReload(L, lua.LNil)
    108 		}
    109 
    110 		if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) {
    111 			break
    112 		}
    113 	}
    114 
    115 	log.Println(idString, "finished")
    116 }
    117 
    118 func cleanupClients(L *lua.LState) {
    119 	cnxTbl := stateCnxTable(L)
    120 	if cnxTbl == nil {
    121 		return
    122 	}
    123 
    124 	L.ForEach(cnxTbl, func(key, value lua.LValue) {
    125 		cnx := value.(*lua.LTable)
    126 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    127 		close(L.RawGetInt(cnx, keyCloseSig).(*lua.LUserData).Value.(chan struct{}))
    128 		if err := client.Disconnect(nil); err != nil {
    129 			log.Printf("cleanup client %s: %v", lua.LVAsString(key), err)
    130 		}
    131 	})
    132 }
    133 
    134 func dispatchMsg(L *lua.LState, msg *mqttMessage, cnx, key, value lua.LValue) {
    135 	skey, ok := key.(lua.LString)
    136 	topic := string(msg.Topic)
    137 
    138 	if ok && match(topic, string(skey)) {
    139 		err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true},
    140 			cnx,
    141 			lua.LString(string(msg.Message)),
    142 			lua.LString(topic),
    143 			lua.LNumber(msg.Timestamp))
    144 		if err != nil {
    145 			panic(err)
    146 		}
    147 	}
    148 }
    149 
    150 func matchSliced(actual, filter []string) bool {
    151 	if len(filter) == 0 {
    152 		return len(actual) == 0
    153 	}
    154 
    155 	if filter[0] == "#" {
    156 		if len(filter) == 1 {
    157 			return true
    158 		}
    159 
    160 		for i := range actual {
    161 			if matchSliced(actual[i:], filter[1:]) {
    162 				return true
    163 			}
    164 		}
    165 
    166 		return false
    167 	}
    168 
    169 	if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) {
    170 		return matchSliced(actual[1:], filter[1:])
    171 	}
    172 
    173 	return false
    174 }
    175 
    176 func match(actual, filter string) bool {
    177 	return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/"))
    178 }
    179 
    180 func tableIsEmpty(t *lua.LTable) bool {
    181 	key, _ := t.Next(lua.LNil)
    182 	return key == lua.LNil
    183 }
    184 
    185 func processMsg(L *lua.LState, msg *mqttMessage) {
    186 	cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable)
    187 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    188 	L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) })
    189 
    190 	if tableIsEmpty(subTbl) {
    191 		client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    192 		close(L.RawGetInt(cnx, keyCloseSig).(*lua.LUserData).Value.(chan struct{}))
    193 		if err := client.Disconnect(nil); err != nil {
    194 			log.Println("disconnect empty client:", err)
    195 		}
    196 		L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil)
    197 	}
    198 }
    199 
    200 func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, closeSig <-chan struct{}, keepAliveS uint16) {
    201 	var tickerCh <-chan time.Time
    202 	var ticker *time.Ticker = nil
    203 	defer func() {
    204 		if ticker != nil {
    205 			ticker.Stop()
    206 		}
    207 	}()
    208 
    209 	keepAlive := time.Duration(keepAliveS) * time.Second
    210 
    211 	for {
    212 		select {
    213 		case <-closeSig:
    214 			return
    215 		case <-client.Online():
    216 		}
    217 
    218 		log.Println("Online client", id)
    219 		toLua <- mqttMessage{
    220 			Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
    221 			ClientId:  id,
    222 			Topic:     []byte(internalOnline),
    223 			Message:   []byte{},
    224 		}
    225 
    226 		if keepAliveS > 0 {
    227 			ticker = time.NewTicker(keepAlive)
    228 			tickerCh = ticker.C
    229 		}
    230 
    231 		online := true
    232 
    233 		for online {
    234 			select {
    235 			case <-closeSig:
    236 				return
    237 			case <-client.Offline():
    238 				online = false
    239 			case <-tickerCh:
    240 				if err := client.Ping(nil); err != nil {
    241 					log.Println("Ping:", err)
    242 				}
    243 			}
    244 		}
    245 
    246 		if ticker != nil {
    247 			ticker.Stop()
    248 			ticker = nil
    249 			tickerCh = nil
    250 		}
    251 
    252 		log.Println("Offline client", id)
    253 		toLua <- mqttMessage{
    254 			Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6,
    255 			ClientId:  id,
    256 			Topic:     []byte(internalOffline),
    257 			Message:   []byte{},
    258 		}
    259 	}
    260 }
    261 
    262 func mqttRead(client *mqtt.Client, toLua chan<- mqttMessage, id int) {
    263 	var big *mqtt.BigMessage
    264 
    265 	for {
    266 		message, topic, err := client.ReadSlices()
    267 		t := float64(time.Now().UnixMicro()) * 1.0e-6
    268 
    269 		switch {
    270 		case err == nil:
    271 			toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)}
    272 
    273 		case errors.As(err, &big):
    274 			data, err := big.ReadAll()
    275 			if err != nil {
    276 				log.Println("mqttRead big message:", err)
    277 			} else {
    278 				toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data}
    279 			}
    280 
    281 		case errors.Is(err, mqtt.ErrClosed):
    282 			log.Println("mqttRead finishing:", err)
    283 			return
    284 
    285 		case mqtt.IsConnectionRefused(err):
    286 			log.Println("mqttRead connection refused:", err)
    287 			time.Sleep(15 * time.Minute)
    288 
    289 		default:
    290 			log.Println("mqttRead:", err)
    291 			time.Sleep(2 * time.Second)
    292 		}
    293 	}
    294 }
    295 
    296 func reload(oldL *lua.LState, main_script string) *lua.LState {
    297 	log.Println("Reloading", main_script)
    298 	agent := stateAgent(oldL)
    299 	reloader, isReloader := agent.(MqttReloadingAgent)
    300 
    301 	newL := lua.NewState()
    302 
    303 	if isReloader {
    304 		reloader.ReloadBegin(oldL, newL)
    305 	} else {
    306 		agent.Setup(newL)
    307 	}
    308 
    309 	registerMqttClientType(newL)
    310 	registerTimerType(newL)
    311 
    312 	stateReloadBegin(oldL, newL)
    313 
    314 	if err := newL.DoFile(main_script); err != nil {
    315 		log.Println("Reload failed:", err)
    316 		stateReloadAbort(oldL, newL)
    317 		if isReloader {
    318 			reloader.ReloadAbort(oldL, newL)
    319 		} else {
    320 			agent.Teardown(newL)
    321 		}
    322 		newL.Close()
    323 		return oldL
    324 	} else {
    325 		stateReloadEnd(oldL, newL)
    326 		if isReloader {
    327 			reloader.ReloadEnd(oldL, newL)
    328 		} else {
    329 			agent.Teardown(oldL)
    330 		}
    331 		oldL.Close()
    332 		log.Println("Reload successful")
    333 		return newL
    334 	}
    335 }
    336 
    337 func dup(src []byte) []byte {
    338 	res := make([]byte, len(src))
    339 	copy(res, src)
    340 	return res
    341 }
    342 
    343 func newUserData(L *lua.LState, v interface{}) *lua.LUserData {
    344 	res := L.NewUserData()
    345 	res.Value = v
    346 	return res
    347 }
    348 
    349 /********** State Object in the Lua Interpreter **********/
    350 
    351 const luaStateName = "_mqttagent"
    352 const keyChanToLua = 1
    353 const keyAgent = 2
    354 const keyClientPrefix = 3
    355 const keyClientNextId = 4
    356 const keyCfgMap = 5
    357 const keyCnxTable = 6
    358 const keyTimerTable = 7
    359 const keyReloadRequest = 8
    360 const keyOldCfgMap = 9
    361 
    362 func registerState(L *lua.LState, agent MqttAgent, clientPrefix string, toLua chan<- mqttMessage) {
    363 	st := L.NewTable()
    364 	L.RawSetInt(st, keyChanToLua, newUserData(L, toLua))
    365 	L.RawSetInt(st, keyAgent, newUserData(L, agent))
    366 	L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix))
    367 	L.RawSetInt(st, keyClientNextId, lua.LNumber(1))
    368 	L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap)))
    369 	L.RawSetInt(st, keyCnxTable, L.NewTable())
    370 	L.RawSetInt(st, keyTimerTable, L.NewTable())
    371 	L.SetGlobal(luaStateName, st)
    372 	L.SetGlobal("reload", L.NewFunction(requestReload))
    373 }
    374 
    375 func stateReloadBegin(oldL, newL *lua.LState) {
    376 	oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable)
    377 	toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- mqttMessage)
    378 	agent := oldL.RawGetInt(oldSt, keyAgent).(*lua.LUserData).Value.(MqttAgent)
    379 	clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix)
    380 	nextId := oldL.RawGetInt(oldSt, keyClientNextId)
    381 	cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    382 
    383 	st := newL.NewTable()
    384 	newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua))
    385 	newL.RawSetInt(st, keyAgent, newUserData(newL, agent))
    386 	newL.RawSetInt(st, keyClientPrefix, clientPrefix)
    387 	newL.RawSetInt(st, keyClientNextId, nextId)
    388 	newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap)))
    389 	newL.RawSetInt(st, keyCnxTable, newL.NewTable())
    390 	newL.RawSetInt(st, keyTimerTable, newL.NewTable())
    391 	newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap))
    392 	newL.SetGlobal(luaStateName, st)
    393 	newL.SetGlobal("reload", newL.NewFunction(requestReload))
    394 }
    395 
    396 func stateReloadAbort(oldL, newL *lua.LState) {
    397 	statePartialCleanup(newL, oldL)
    398 }
    399 
    400 func stateReloadEnd(oldL, newL *lua.LState) {
    401 	statePartialCleanup(oldL, newL)
    402 	newSt := newL.GetGlobal(luaStateName).(*lua.LTable)
    403 	newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil)
    404 }
    405 
    406 func statePartialCleanup(staleL, keptL *lua.LState) {
    407 	staleCnxTable := stateCnxTable(staleL)
    408 	keptCnxTable := stateCnxTable(keptL)
    409 
    410 	staleL.ForEach(staleCnxTable, func(key, value lua.LValue) {
    411 		clientId := int(key.(lua.LNumber))
    412 		if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil {
    413 			tbl := value.(*lua.LTable)
    414 			client := staleL.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    415 			close(staleL.RawGetInt(tbl, keyCloseSig).(*lua.LUserData).Value.(chan struct{}))
    416 			if err := client.Disconnect(nil); err != nil {
    417 				log.Printf("cleanup stale client %s: %v", lua.LVAsString(key), err)
    418 			}
    419 		}
    420 	})
    421 
    422 	keptL.ForEach(keptCnxTable, func(key, value lua.LValue) {
    423 		clientId := int(key.(lua.LNumber))
    424 		keptCnx := value.(*lua.LTable)
    425 		if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) {
    426 			return
    427 		}
    428 
    429 		client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    430 		staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable)
    431 
    432 		if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) {
    433 			panic("This shouldn't happen")
    434 		}
    435 
    436 		keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable)
    437 		staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable)
    438 
    439 		staleL.ForEach(staleSub, func(key, _ lua.LValue) {
    440 			topic := string(key.(lua.LString))
    441 			if !isInternal(topic) && keptL.GetField(keptSub, topic) == lua.LNil {
    442 				log.Println("Unsubscribing from stale topic", topic)
    443 				if err := client.Unsubscribe(nil, topic); err != nil {
    444 					log.Println("Failed to unsubscribe:", err)
    445 				}
    446 			}
    447 		})
    448 
    449 		keptL.ForEach(keptSub, func(key, _ lua.LValue) {
    450 			topic := string(key.(lua.LString))
    451 			if !isInternal(topic) && staleL.GetField(staleSub, topic) == lua.LNil {
    452 				log.Println("Subscribing to new topic", topic)
    453 				if err := client.Subscribe(nil, topic); err != nil {
    454 					log.Println("Failed to subscribe:", err)
    455 				}
    456 			}
    457 		})
    458 
    459 		keptL.RawSetInt(keptCnx, keyReused, lua.LNil)
    460 	})
    461 }
    462 
    463 func stateValue(L *lua.LState, key int) lua.LValue {
    464 	st := L.GetGlobal(luaStateName)
    465 	return L.RawGetInt(st.(*lua.LTable), key)
    466 }
    467 
    468 func stateChanToLua(L *lua.LState) chan<- mqttMessage {
    469 	ud := stateValue(L, keyChanToLua)
    470 	return ud.(*lua.LUserData).Value.(chan<- mqttMessage)
    471 }
    472 
    473 func stateAgent(L *lua.LState) MqttAgent {
    474 	ud := stateValue(L, keyAgent)
    475 	return ud.(*lua.LUserData).Value.(MqttAgent)
    476 }
    477 
    478 func stateClientNextId(L *lua.LState) (int, string) {
    479 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    480 	result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber))
    481 	L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1))
    482 	prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix))
    483 	return result, fmt.Sprintf("%s-%d", prefix, result)
    484 }
    485 
    486 func stateCfgMap(L *lua.LState) mqttConfigMap {
    487 	return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap)
    488 }
    489 
    490 func stateCnxTable(L *lua.LState) *lua.LTable {
    491 	return stateValue(L, keyCnxTable).(*lua.LTable)
    492 }
    493 
    494 func stateTimerTable(L *lua.LState) *lua.LTable {
    495 	return stateValue(L, keyTimerTable).(*lua.LTable)
    496 }
    497 
    498 func stateReloadRequested(L *lua.LState) bool {
    499 	return lua.LVAsBool(stateValue(L, keyReloadRequest))
    500 }
    501 
    502 func stateRequestReload(L *lua.LState, v lua.LValue) {
    503 	st := L.GetGlobal(luaStateName).(*lua.LTable)
    504 	L.RawSetInt(st, keyReloadRequest, v)
    505 }
    506 
    507 func stateOldCfgMap(L *lua.LState) mqttConfigMap {
    508 	val := stateValue(L, keyOldCfgMap)
    509 	if val == lua.LNil {
    510 		return nil
    511 	} else {
    512 		return val.(*lua.LUserData).Value.(mqttConfigMap)
    513 	}
    514 }
    515 
    516 func requestReload(L *lua.LState) int {
    517 	stateRequestReload(L, lua.LTrue)
    518 	return 0
    519 }
    520 
    521 /********** Lua Object for MQTT client **********/
    522 
    523 const luaMqttClientTypeName = "mqttclient"
    524 const keyClient = 1
    525 const keyCloseSig = 2
    526 const keySubTable = 3
    527 const keyReused = 4
    528 
    529 func registerMqttClientType(L *lua.LState) {
    530 	mt := L.NewTypeMetatable(luaMqttClientTypeName)
    531 	L.SetGlobal(luaMqttClientTypeName, mt)
    532 	L.SetField(mt, "new", L.NewFunction(newMqttClient))
    533 	L.SetField(mt, "__call", L.NewFunction(luaPublish))
    534 	L.SetField(mt, "__index", L.NewFunction(luaQuery))
    535 	L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe))
    536 }
    537 
    538 type mqttConfig struct {
    539 	Connection     string
    540 	TLS            bool
    541 	PauseTimeout   string
    542 	AtLeastOnceMax int
    543 	ExactlyOnceMax int
    544 	UserName       string
    545 	Password       string
    546 	Will           struct {
    547 		Topic       string
    548 		Message     string
    549 		Retain      bool
    550 		AtLeastOnce bool
    551 		ExactlyOnce bool
    552 	}
    553 	KeepAlive    uint16
    554 	CleanSession bool
    555 }
    556 
    557 type mqttClientEntry struct {
    558 	client   *mqtt.Client
    559 	closeSig chan struct{}
    560 	id       int
    561 }
    562 
    563 type mqttConfigMap map[mqttConfig]mqttClientEntry
    564 
    565 func mqttConfigBytes(src string) []byte {
    566 	if src == "" {
    567 		return nil
    568 	} else {
    569 		return []byte(src)
    570 	}
    571 }
    572 
    573 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) {
    574 	pto, err := time.ParseDuration(config.PauseTimeout)
    575 	if err != nil {
    576 		pto = time.Second
    577 	}
    578 
    579 	var dialer mqtt.Dialer
    580 
    581 	if config.TLS {
    582 		dialer = mqtt.NewTLSDialer("tcp", config.Connection, nil)
    583 	} else {
    584 		dialer = mqtt.NewDialer("tcp", config.Connection)
    585 	}
    586 
    587 	processed_cfg := mqtt.Config{
    588 		Dialer:         dialer,
    589 		PauseTimeout:   pto,
    590 		AtLeastOnceMax: config.AtLeastOnceMax,
    591 		ExactlyOnceMax: config.ExactlyOnceMax,
    592 		UserName:       config.UserName,
    593 		Password:       mqttConfigBytes(config.Password),
    594 		Will: struct {
    595 			Topic       string
    596 			Message     []byte
    597 			Retain      bool
    598 			AtLeastOnce bool
    599 			ExactlyOnce bool
    600 		}{
    601 			Topic:       config.Will.Topic,
    602 			Message:     mqttConfigBytes(config.Will.Message),
    603 			Retain:      config.Will.Retain,
    604 			AtLeastOnce: config.Will.AtLeastOnce,
    605 			ExactlyOnce: config.Will.ExactlyOnce,
    606 		},
    607 		KeepAlive:    config.KeepAlive,
    608 		CleanSession: config.CleanSession,
    609 	}
    610 
    611 	return mqtt.VolatileSession(id, &processed_cfg)
    612 }
    613 
    614 func registerClient(L *lua.LState, id int, client *mqtt.Client, closeSig chan struct{}, reused bool) lua.LValue {
    615 	res := L.NewTable()
    616 	L.RawSetInt(res, keyClient, newUserData(L, client))
    617 	L.RawSetInt(res, keyCloseSig, newUserData(L, closeSig))
    618 	L.RawSetInt(res, keySubTable, L.NewTable())
    619 	if reused {
    620 		L.RawSetInt(res, keyReused, lua.LTrue)
    621 	}
    622 	L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName))
    623 	L.RawSetInt(stateCnxTable(L), id, res)
    624 	return res
    625 }
    626 
    627 func newMqttClient(L *lua.LState) int {
    628 	var config mqttConfig
    629 	if err := gluamapper.Map(L.CheckTable(1), &config); err != nil {
    630 		log.Println("newMqttClient:", err)
    631 		L.Push(lua.LNil)
    632 		L.Push(lua.LString(err.Error()))
    633 		return 2
    634 	}
    635 
    636 	cfgMap := stateCfgMap(L)
    637 
    638 	if cfg, found := cfgMap[config]; found {
    639 		res := L.RawGetInt(stateCnxTable(L), cfg.id)
    640 		tbl := res.(*lua.LTable)
    641 		if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client {
    642 			panic("Inconsistent configuration table")
    643 		}
    644 
    645 		L.Push(res)
    646 		return 1
    647 	}
    648 
    649 	if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil {
    650 		if cfg, found := oldCfgMap[config]; found {
    651 			cfgMap[config] = cfg
    652 			L.Push(registerClient(L, cfg.id, cfg.client, cfg.closeSig, true))
    653 			return 1
    654 		}
    655 	}
    656 
    657 	id, idString := stateClientNextId(L)
    658 	client, err := newClient(&config, idString)
    659 	if err != nil {
    660 		log.Println("newMqttClient:", err)
    661 		L.Push(lua.LNil)
    662 		L.Push(lua.LString(err.Error()))
    663 		return 2
    664 	}
    665 	closeSig := make(chan struct{}, 1)
    666 	go mqttMonitor(client, stateChanToLua(L), id, closeSig, config.KeepAlive)
    667 	go mqttRead(client, stateChanToLua(L), id)
    668 
    669 	cfgMap[config] = mqttClientEntry{id: id, client: client}
    670 
    671 	L.Push(registerClient(L, id, client, closeSig, false))
    672 	return 1
    673 }
    674 
    675 func luaPublish(L *lua.LState) int {
    676 	cnx := L.CheckTable(1)
    677 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    678 
    679 	if L.GetTop() == 1 {
    680 		if err := client.Ping(nil); err != nil {
    681 			log.Println("luaPing:", err)
    682 			L.Push(lua.LNil)
    683 			L.Push(lua.LString(err.Error()))
    684 			return 2
    685 		} else {
    686 			L.Push(lua.LTrue)
    687 			return 1
    688 		}
    689 	}
    690 
    691 	message := L.CheckString(2)
    692 	topic := L.CheckString(3)
    693 
    694 	if err := client.Publish(nil, []byte(message), topic); err != nil {
    695 		L.Push(lua.LNil)
    696 		L.Push(lua.LString(err.Error()))
    697 		return 2
    698 	} else {
    699 		L.Push(lua.LTrue)
    700 		return 1
    701 	}
    702 }
    703 
    704 func luaQuery(L *lua.LState) int {
    705 	cnx := L.CheckTable(1)
    706 	topic := L.CheckString(2)
    707 	subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    708 	L.Push(L.GetField(subTbl, topic))
    709 	return 1
    710 }
    711 
    712 func luaSubscribe(L *lua.LState) int {
    713 	var err error
    714 	cnx := L.CheckTable(1)
    715 	topic := L.CheckString(2)
    716 	callback := L.OptFunction(3, nil)
    717 	client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client)
    718 	tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable)
    719 
    720 	_, is_new := L.GetField(tbl, topic).(*lua.LNilType)
    721 
    722 	if !isInternal(topic) && lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) {
    723 		if callback == nil {
    724 			err = client.Unsubscribe(nil, topic)
    725 		} else if is_new {
    726 			err = client.Subscribe(nil, topic)
    727 		}
    728 	}
    729 
    730 	if err != nil {
    731 		log.Println("luaSubscribe:", err)
    732 		L.Push(lua.LNil)
    733 		L.Push(lua.LString(err.Error()))
    734 		return 2
    735 	} else {
    736 		if callback == nil {
    737 			if is_new {
    738 				log.Printf("Not subscribed to %q", topic)
    739 			} else {
    740 				log.Printf("Unsubscribed from %q", topic)
    741 			}
    742 			L.SetField(tbl, topic, lua.LNil)
    743 		} else {
    744 			if is_new {
    745 				log.Printf("Subscribed to %q", topic)
    746 			} else {
    747 				log.Printf("Updating subscription to %q", topic)
    748 			}
    749 			L.SetField(tbl, topic, callback)
    750 		}
    751 
    752 		L.Push(lua.LTrue)
    753 		return 1
    754 	}
    755 }
    756 
    757 /********** Lua Object for timers **********/
    758 
    759 const luaTimerTypeName = "timer"
    760 
    761 func registerTimerType(L *lua.LState) {
    762 	mt := L.NewTypeMetatable(luaTimerTypeName)
    763 	L.SetGlobal(luaTimerTypeName, mt)
    764 	L.SetField(mt, "new", L.NewFunction(newTimer))
    765 	L.SetField(mt, "schedule", L.NewFunction(timerSchedule))
    766 	L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods))
    767 }
    768 
    769 func newTimer(L *lua.LState) int {
    770 	atTime := L.Get(1)
    771 	cb := L.CheckFunction(2)
    772 	L.Pop(2)
    773 	L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName))
    774 	L.Push(cb)
    775 	L.Push(atTime)
    776 	return timerSchedule(L)
    777 }
    778 
    779 var timerMethods = map[string]lua.LGFunction{
    780 	"cancel":   timerCancel,
    781 	"schedule": timerSchedule,
    782 }
    783 
    784 func timerCancel(L *lua.LState) int {
    785 	timer := L.CheckFunction(1)
    786 	L.RawSet(stateTimerTable(L), timer, lua.LNil)
    787 	return 0
    788 }
    789 
    790 func timerSchedule(L *lua.LState) int {
    791 	timer := L.CheckFunction(1)
    792 	atTime := lua.LNil
    793 	if L.Get(2) != lua.LNil {
    794 		atTime = L.CheckNumber(2)
    795 	}
    796 
    797 	L.RawSet(stateTimerTable(L), timer, atTime)
    798 	return 0
    799 }
    800 
    801 func toTime(lsec lua.LNumber) time.Time {
    802 	fsec := float64(lsec)
    803 	sec := int64(fsec)
    804 	nsec := int64((fsec - float64(sec)) * 1.0e9)
    805 
    806 	return time.Unix(sec, nsec)
    807 }
    808 
    809 func runTimers(L *lua.LState, parentTimer *time.Timer) {
    810 	hasNext := false
    811 	var nextTime time.Time
    812 
    813 	now := time.Now()
    814 	timers := stateTimerTable(L)
    815 
    816 	timer, luaT := timers.Next(lua.LNil)
    817 	for timer != lua.LNil {
    818 		t := toTime(luaT.(lua.LNumber))
    819 		if t.Compare(now) <= 0 {
    820 			L.RawSet(timers, timer, lua.LNil)
    821 			err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT)
    822 			if err != nil {
    823 				panic(err)
    824 			}
    825 			timer = lua.LNil
    826 			hasNext = false
    827 		} else if !hasNext || t.Compare(nextTime) < 0 {
    828 			hasNext = true
    829 			nextTime = t
    830 		}
    831 
    832 		timer, luaT = timers.Next(timer)
    833 	}
    834 
    835 	if hasNext {
    836 		parentTimer.Reset(time.Until(nextTime))
    837 	} else {
    838 		parentTimer.Stop()
    839 	}
    840 }