mqttagent.go (21029B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package mqttagent 18 19 import ( 20 "errors" 21 "fmt" 22 "log" 23 "os" 24 "strings" 25 "time" 26 27 "github.com/go-mqtt/mqtt" 28 "github.com/yuin/gluamapper" 29 "github.com/yuin/gopher-lua" 30 ) 31 32 type MqttAgent interface { 33 Setup(L *lua.LState) 34 Teardown(L *lua.LState) 35 } 36 37 type MqttReloadingAgent interface { 38 Setup(L *lua.LState) 39 ReloadBegin(oldL, newL *lua.LState) 40 ReloadAbort(oldL, newL *lua.LState) 41 ReloadEnd(oldL, newL *lua.LState) 42 Teardown(L *lua.LState) 43 } 44 45 type mqttMessage struct { 46 Timestamp float64 47 ClientId int 48 Topic []byte 49 Message []byte 50 } 51 52 const internalOffline = "$SYS/self/offline" 53 const internalOnline = "$SYS/self/online" 54 55 func isInternal(topic string) bool { 56 return strings.HasPrefix(topic, "$SYS/self") 57 } 58 59 func Run(agent MqttAgent, main_script string, capacity int) { 60 fromMqtt := make(chan mqttMessage, capacity) 61 62 L := lua.NewState() 63 defer L.Close() 64 65 agent.Setup(L) 66 defer agent.Teardown(L) 67 68 hostname, err := os.Hostname() 69 if err != nil { 70 hostname = "<unknown>" 71 } 72 73 idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()) 74 registerMqttClientType(L) 75 registerTimerType(L) 76 registerState(L, agent, idString, fromMqtt) 77 defer cleanupClients(L) 78 79 if err := L.DoFile(main_script); err != nil { 80 panic(err) 81 } 82 83 timer := time.NewTimer(0) 84 defer timer.Stop() 85 86 log.Println(idString, "started") 87 88 for { 89 select { 90 case msg, ok := <-fromMqtt: 91 92 if !ok { 93 log.Println("fromMqtt is closed") 94 break 95 } 96 97 processMsg(L, &msg) 98 99 case <-timer.C: 100 } 101 102 runTimers(L, timer) 103 104 if stateReloadRequested(L) { 105 L = reload(L, main_script) 106 runTimers(L, timer) 107 stateRequestReload(L, lua.LNil) 108 } 109 110 if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) { 111 break 112 } 113 } 114 115 log.Println(idString, "finished") 116 } 117 118 func cleanupClients(L *lua.LState) { 119 cnxTbl := stateCnxTable(L) 120 if cnxTbl == nil { 121 return 122 } 123 124 L.ForEach(cnxTbl, func(key, value lua.LValue) { 125 cnx := value.(*lua.LTable) 126 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 127 close(L.RawGetInt(cnx, keyCloseSig).(*lua.LUserData).Value.(chan struct{})) 128 if err := client.Disconnect(nil); err != nil { 129 log.Printf("cleanup client %s: %v", lua.LVAsString(key), err) 130 } 131 }) 132 } 133 134 func dispatchMsg(L *lua.LState, msg *mqttMessage, cnx, key, value lua.LValue) { 135 skey, ok := key.(lua.LString) 136 topic := string(msg.Topic) 137 138 if ok && match(topic, string(skey)) { 139 err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, 140 cnx, 141 lua.LString(string(msg.Message)), 142 lua.LString(topic), 143 lua.LNumber(msg.Timestamp)) 144 if err != nil { 145 panic(err) 146 } 147 } 148 } 149 150 func matchSliced(actual, filter []string) bool { 151 if len(filter) == 0 { 152 return len(actual) == 0 153 } 154 155 if filter[0] == "#" { 156 if len(filter) == 1 { 157 return true 158 } 159 160 for i := range actual { 161 if matchSliced(actual[i:], filter[1:]) { 162 return true 163 } 164 } 165 166 return false 167 } 168 169 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 170 return matchSliced(actual[1:], filter[1:]) 171 } 172 173 return false 174 } 175 176 func match(actual, filter string) bool { 177 return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) 178 } 179 180 func tableIsEmpty(t *lua.LTable) bool { 181 key, _ := t.Next(lua.LNil) 182 return key == lua.LNil 183 } 184 185 func processMsg(L *lua.LState, msg *mqttMessage) { 186 cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) 187 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 188 L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) }) 189 190 if tableIsEmpty(subTbl) { 191 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 192 close(L.RawGetInt(cnx, keyCloseSig).(*lua.LUserData).Value.(chan struct{})) 193 if err := client.Disconnect(nil); err != nil { 194 log.Println("disconnect empty client:", err) 195 } 196 L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) 197 } 198 } 199 200 func mqttMonitor(client *mqtt.Client, toLua chan<- mqttMessage, id int, closeSig <-chan struct{}, keepAliveS uint16) { 201 var tickerCh <-chan time.Time 202 var ticker *time.Ticker = nil 203 defer func() { 204 if ticker != nil { 205 ticker.Stop() 206 } 207 }() 208 209 keepAlive := time.Duration(keepAliveS) * time.Second 210 211 for { 212 select { 213 case <-closeSig: 214 return 215 case <-client.Online(): 216 } 217 218 log.Println("Online client", id) 219 toLua <- mqttMessage{ 220 Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6, 221 ClientId: id, 222 Topic: []byte(internalOnline), 223 Message: []byte{}, 224 } 225 226 if keepAliveS > 0 { 227 ticker = time.NewTicker(keepAlive) 228 tickerCh = ticker.C 229 } 230 231 online := true 232 233 for online { 234 select { 235 case <-closeSig: 236 return 237 case <-client.Offline(): 238 online = false 239 case <-tickerCh: 240 if err := client.Ping(nil); err != nil { 241 log.Println("Ping:", err) 242 } 243 } 244 } 245 246 if ticker != nil { 247 ticker.Stop() 248 ticker = nil 249 tickerCh = nil 250 } 251 252 log.Println("Offline client", id) 253 toLua <- mqttMessage{ 254 Timestamp: float64(time.Now().UnixMicro()) * 1.0e-6, 255 ClientId: id, 256 Topic: []byte(internalOffline), 257 Message: []byte{}, 258 } 259 } 260 } 261 262 func mqttRead(client *mqtt.Client, toLua chan<- mqttMessage, id int) { 263 var big *mqtt.BigMessage 264 265 for { 266 message, topic, err := client.ReadSlices() 267 t := float64(time.Now().UnixMicro()) * 1.0e-6 268 269 switch { 270 case err == nil: 271 toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)} 272 273 case errors.As(err, &big): 274 data, err := big.ReadAll() 275 if err != nil { 276 log.Println("mqttRead big message:", err) 277 } else { 278 toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data} 279 } 280 281 case errors.Is(err, mqtt.ErrClosed): 282 log.Println("mqttRead finishing:", err) 283 return 284 285 case mqtt.IsConnectionRefused(err): 286 log.Println("mqttRead connection refused:", err) 287 time.Sleep(15 * time.Minute) 288 289 default: 290 log.Println("mqttRead:", err) 291 time.Sleep(2 * time.Second) 292 } 293 } 294 } 295 296 func reload(oldL *lua.LState, main_script string) *lua.LState { 297 log.Println("Reloading", main_script) 298 agent := stateAgent(oldL) 299 reloader, isReloader := agent.(MqttReloadingAgent) 300 301 newL := lua.NewState() 302 303 if isReloader { 304 reloader.ReloadBegin(oldL, newL) 305 } else { 306 agent.Setup(newL) 307 } 308 309 registerMqttClientType(newL) 310 registerTimerType(newL) 311 312 stateReloadBegin(oldL, newL) 313 314 if err := newL.DoFile(main_script); err != nil { 315 log.Println("Reload failed:", err) 316 stateReloadAbort(oldL, newL) 317 if isReloader { 318 reloader.ReloadAbort(oldL, newL) 319 } else { 320 agent.Teardown(newL) 321 } 322 newL.Close() 323 return oldL 324 } else { 325 stateReloadEnd(oldL, newL) 326 if isReloader { 327 reloader.ReloadEnd(oldL, newL) 328 } else { 329 agent.Teardown(oldL) 330 } 331 oldL.Close() 332 log.Println("Reload successful") 333 return newL 334 } 335 } 336 337 func dup(src []byte) []byte { 338 res := make([]byte, len(src)) 339 copy(res, src) 340 return res 341 } 342 343 func newUserData(L *lua.LState, v interface{}) *lua.LUserData { 344 res := L.NewUserData() 345 res.Value = v 346 return res 347 } 348 349 /********** State Object in the Lua Interpreter **********/ 350 351 const luaStateName = "_mqttagent" 352 const keyChanToLua = 1 353 const keyAgent = 2 354 const keyClientPrefix = 3 355 const keyClientNextId = 4 356 const keyCfgMap = 5 357 const keyCnxTable = 6 358 const keyTimerTable = 7 359 const keyReloadRequest = 8 360 const keyOldCfgMap = 9 361 362 func registerState(L *lua.LState, agent MqttAgent, clientPrefix string, toLua chan<- mqttMessage) { 363 st := L.NewTable() 364 L.RawSetInt(st, keyChanToLua, newUserData(L, toLua)) 365 L.RawSetInt(st, keyAgent, newUserData(L, agent)) 366 L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix)) 367 L.RawSetInt(st, keyClientNextId, lua.LNumber(1)) 368 L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap))) 369 L.RawSetInt(st, keyCnxTable, L.NewTable()) 370 L.RawSetInt(st, keyTimerTable, L.NewTable()) 371 L.SetGlobal(luaStateName, st) 372 L.SetGlobal("reload", L.NewFunction(requestReload)) 373 } 374 375 func stateReloadBegin(oldL, newL *lua.LState) { 376 oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable) 377 toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- mqttMessage) 378 agent := oldL.RawGetInt(oldSt, keyAgent).(*lua.LUserData).Value.(MqttAgent) 379 clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix) 380 nextId := oldL.RawGetInt(oldSt, keyClientNextId) 381 cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 382 383 st := newL.NewTable() 384 newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua)) 385 newL.RawSetInt(st, keyAgent, newUserData(newL, agent)) 386 newL.RawSetInt(st, keyClientPrefix, clientPrefix) 387 newL.RawSetInt(st, keyClientNextId, nextId) 388 newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap))) 389 newL.RawSetInt(st, keyCnxTable, newL.NewTable()) 390 newL.RawSetInt(st, keyTimerTable, newL.NewTable()) 391 newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap)) 392 newL.SetGlobal(luaStateName, st) 393 newL.SetGlobal("reload", newL.NewFunction(requestReload)) 394 } 395 396 func stateReloadAbort(oldL, newL *lua.LState) { 397 statePartialCleanup(newL, oldL) 398 } 399 400 func stateReloadEnd(oldL, newL *lua.LState) { 401 statePartialCleanup(oldL, newL) 402 newSt := newL.GetGlobal(luaStateName).(*lua.LTable) 403 newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil) 404 } 405 406 func statePartialCleanup(staleL, keptL *lua.LState) { 407 staleCnxTable := stateCnxTable(staleL) 408 keptCnxTable := stateCnxTable(keptL) 409 410 staleL.ForEach(staleCnxTable, func(key, value lua.LValue) { 411 clientId := int(key.(lua.LNumber)) 412 if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil { 413 tbl := value.(*lua.LTable) 414 client := staleL.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 415 close(staleL.RawGetInt(tbl, keyCloseSig).(*lua.LUserData).Value.(chan struct{})) 416 if err := client.Disconnect(nil); err != nil { 417 log.Printf("cleanup stale client %s: %v", lua.LVAsString(key), err) 418 } 419 } 420 }) 421 422 keptL.ForEach(keptCnxTable, func(key, value lua.LValue) { 423 clientId := int(key.(lua.LNumber)) 424 keptCnx := value.(*lua.LTable) 425 if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) { 426 return 427 } 428 429 client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 430 staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable) 431 432 if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) { 433 panic("This shouldn't happen") 434 } 435 436 keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable) 437 staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable) 438 439 staleL.ForEach(staleSub, func(key, _ lua.LValue) { 440 topic := string(key.(lua.LString)) 441 if !isInternal(topic) && keptL.GetField(keptSub, topic) == lua.LNil { 442 log.Println("Unsubscribing from stale topic", topic) 443 if err := client.Unsubscribe(nil, topic); err != nil { 444 log.Println("Failed to unsubscribe:", err) 445 } 446 } 447 }) 448 449 keptL.ForEach(keptSub, func(key, _ lua.LValue) { 450 topic := string(key.(lua.LString)) 451 if !isInternal(topic) && staleL.GetField(staleSub, topic) == lua.LNil { 452 log.Println("Subscribing to new topic", topic) 453 if err := client.Subscribe(nil, topic); err != nil { 454 log.Println("Failed to subscribe:", err) 455 } 456 } 457 }) 458 459 keptL.RawSetInt(keptCnx, keyReused, lua.LNil) 460 }) 461 } 462 463 func stateValue(L *lua.LState, key int) lua.LValue { 464 st := L.GetGlobal(luaStateName) 465 return L.RawGetInt(st.(*lua.LTable), key) 466 } 467 468 func stateChanToLua(L *lua.LState) chan<- mqttMessage { 469 ud := stateValue(L, keyChanToLua) 470 return ud.(*lua.LUserData).Value.(chan<- mqttMessage) 471 } 472 473 func stateAgent(L *lua.LState) MqttAgent { 474 ud := stateValue(L, keyAgent) 475 return ud.(*lua.LUserData).Value.(MqttAgent) 476 } 477 478 func stateClientNextId(L *lua.LState) (int, string) { 479 st := L.GetGlobal(luaStateName).(*lua.LTable) 480 result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber)) 481 L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1)) 482 prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix)) 483 return result, fmt.Sprintf("%s-%d", prefix, result) 484 } 485 486 func stateCfgMap(L *lua.LState) mqttConfigMap { 487 return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 488 } 489 490 func stateCnxTable(L *lua.LState) *lua.LTable { 491 return stateValue(L, keyCnxTable).(*lua.LTable) 492 } 493 494 func stateTimerTable(L *lua.LState) *lua.LTable { 495 return stateValue(L, keyTimerTable).(*lua.LTable) 496 } 497 498 func stateReloadRequested(L *lua.LState) bool { 499 return lua.LVAsBool(stateValue(L, keyReloadRequest)) 500 } 501 502 func stateRequestReload(L *lua.LState, v lua.LValue) { 503 st := L.GetGlobal(luaStateName).(*lua.LTable) 504 L.RawSetInt(st, keyReloadRequest, v) 505 } 506 507 func stateOldCfgMap(L *lua.LState) mqttConfigMap { 508 val := stateValue(L, keyOldCfgMap) 509 if val == lua.LNil { 510 return nil 511 } else { 512 return val.(*lua.LUserData).Value.(mqttConfigMap) 513 } 514 } 515 516 func requestReload(L *lua.LState) int { 517 stateRequestReload(L, lua.LTrue) 518 return 0 519 } 520 521 /********** Lua Object for MQTT client **********/ 522 523 const luaMqttClientTypeName = "mqttclient" 524 const keyClient = 1 525 const keyCloseSig = 2 526 const keySubTable = 3 527 const keyReused = 4 528 529 func registerMqttClientType(L *lua.LState) { 530 mt := L.NewTypeMetatable(luaMqttClientTypeName) 531 L.SetGlobal(luaMqttClientTypeName, mt) 532 L.SetField(mt, "new", L.NewFunction(newMqttClient)) 533 L.SetField(mt, "__call", L.NewFunction(luaPublish)) 534 L.SetField(mt, "__index", L.NewFunction(luaQuery)) 535 L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) 536 } 537 538 type mqttConfig struct { 539 Connection string 540 TLS bool 541 PauseTimeout string 542 AtLeastOnceMax int 543 ExactlyOnceMax int 544 UserName string 545 Password string 546 Will struct { 547 Topic string 548 Message string 549 Retain bool 550 AtLeastOnce bool 551 ExactlyOnce bool 552 } 553 KeepAlive uint16 554 CleanSession bool 555 } 556 557 type mqttClientEntry struct { 558 client *mqtt.Client 559 closeSig chan struct{} 560 id int 561 } 562 563 type mqttConfigMap map[mqttConfig]mqttClientEntry 564 565 func mqttConfigBytes(src string) []byte { 566 if src == "" { 567 return nil 568 } else { 569 return []byte(src) 570 } 571 } 572 573 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) { 574 pto, err := time.ParseDuration(config.PauseTimeout) 575 if err != nil { 576 pto = time.Second 577 } 578 579 var dialer mqtt.Dialer 580 581 if config.TLS { 582 dialer = mqtt.NewTLSDialer("tcp", config.Connection, nil) 583 } else { 584 dialer = mqtt.NewDialer("tcp", config.Connection) 585 } 586 587 processed_cfg := mqtt.Config{ 588 Dialer: dialer, 589 PauseTimeout: pto, 590 AtLeastOnceMax: config.AtLeastOnceMax, 591 ExactlyOnceMax: config.ExactlyOnceMax, 592 UserName: config.UserName, 593 Password: mqttConfigBytes(config.Password), 594 Will: struct { 595 Topic string 596 Message []byte 597 Retain bool 598 AtLeastOnce bool 599 ExactlyOnce bool 600 }{ 601 Topic: config.Will.Topic, 602 Message: mqttConfigBytes(config.Will.Message), 603 Retain: config.Will.Retain, 604 AtLeastOnce: config.Will.AtLeastOnce, 605 ExactlyOnce: config.Will.ExactlyOnce, 606 }, 607 KeepAlive: config.KeepAlive, 608 CleanSession: config.CleanSession, 609 } 610 611 return mqtt.VolatileSession(id, &processed_cfg) 612 } 613 614 func registerClient(L *lua.LState, id int, client *mqtt.Client, closeSig chan struct{}, reused bool) lua.LValue { 615 res := L.NewTable() 616 L.RawSetInt(res, keyClient, newUserData(L, client)) 617 L.RawSetInt(res, keyCloseSig, newUserData(L, closeSig)) 618 L.RawSetInt(res, keySubTable, L.NewTable()) 619 if reused { 620 L.RawSetInt(res, keyReused, lua.LTrue) 621 } 622 L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName)) 623 L.RawSetInt(stateCnxTable(L), id, res) 624 return res 625 } 626 627 func newMqttClient(L *lua.LState) int { 628 var config mqttConfig 629 if err := gluamapper.Map(L.CheckTable(1), &config); err != nil { 630 log.Println("newMqttClient:", err) 631 L.Push(lua.LNil) 632 L.Push(lua.LString(err.Error())) 633 return 2 634 } 635 636 cfgMap := stateCfgMap(L) 637 638 if cfg, found := cfgMap[config]; found { 639 res := L.RawGetInt(stateCnxTable(L), cfg.id) 640 tbl := res.(*lua.LTable) 641 if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client { 642 panic("Inconsistent configuration table") 643 } 644 645 L.Push(res) 646 return 1 647 } 648 649 if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil { 650 if cfg, found := oldCfgMap[config]; found { 651 cfgMap[config] = cfg 652 L.Push(registerClient(L, cfg.id, cfg.client, cfg.closeSig, true)) 653 return 1 654 } 655 } 656 657 id, idString := stateClientNextId(L) 658 client, err := newClient(&config, idString) 659 if err != nil { 660 log.Println("newMqttClient:", err) 661 L.Push(lua.LNil) 662 L.Push(lua.LString(err.Error())) 663 return 2 664 } 665 closeSig := make(chan struct{}, 1) 666 go mqttMonitor(client, stateChanToLua(L), id, closeSig, config.KeepAlive) 667 go mqttRead(client, stateChanToLua(L), id) 668 669 cfgMap[config] = mqttClientEntry{id: id, client: client} 670 671 L.Push(registerClient(L, id, client, closeSig, false)) 672 return 1 673 } 674 675 func luaPublish(L *lua.LState) int { 676 cnx := L.CheckTable(1) 677 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 678 679 if L.GetTop() == 1 { 680 if err := client.Ping(nil); err != nil { 681 log.Println("luaPing:", err) 682 L.Push(lua.LNil) 683 L.Push(lua.LString(err.Error())) 684 return 2 685 } else { 686 L.Push(lua.LTrue) 687 return 1 688 } 689 } 690 691 message := L.CheckString(2) 692 topic := L.CheckString(3) 693 694 if err := client.Publish(nil, []byte(message), topic); err != nil { 695 L.Push(lua.LNil) 696 L.Push(lua.LString(err.Error())) 697 return 2 698 } else { 699 L.Push(lua.LTrue) 700 return 1 701 } 702 } 703 704 func luaQuery(L *lua.LState) int { 705 cnx := L.CheckTable(1) 706 topic := L.CheckString(2) 707 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 708 L.Push(L.GetField(subTbl, topic)) 709 return 1 710 } 711 712 func luaSubscribe(L *lua.LState) int { 713 var err error 714 cnx := L.CheckTable(1) 715 topic := L.CheckString(2) 716 callback := L.OptFunction(3, nil) 717 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 718 tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 719 720 _, is_new := L.GetField(tbl, topic).(*lua.LNilType) 721 722 if !isInternal(topic) && lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) { 723 if callback == nil { 724 err = client.Unsubscribe(nil, topic) 725 } else if is_new { 726 err = client.Subscribe(nil, topic) 727 } 728 } 729 730 if err != nil { 731 log.Println("luaSubscribe:", err) 732 L.Push(lua.LNil) 733 L.Push(lua.LString(err.Error())) 734 return 2 735 } else { 736 if callback == nil { 737 if is_new { 738 log.Printf("Not subscribed to %q", topic) 739 } else { 740 log.Printf("Unsubscribed from %q", topic) 741 } 742 L.SetField(tbl, topic, lua.LNil) 743 } else { 744 if is_new { 745 log.Printf("Subscribed to %q", topic) 746 } else { 747 log.Printf("Updating subscription to %q", topic) 748 } 749 L.SetField(tbl, topic, callback) 750 } 751 752 L.Push(lua.LTrue) 753 return 1 754 } 755 } 756 757 /********** Lua Object for timers **********/ 758 759 const luaTimerTypeName = "timer" 760 761 func registerTimerType(L *lua.LState) { 762 mt := L.NewTypeMetatable(luaTimerTypeName) 763 L.SetGlobal(luaTimerTypeName, mt) 764 L.SetField(mt, "new", L.NewFunction(newTimer)) 765 L.SetField(mt, "schedule", L.NewFunction(timerSchedule)) 766 L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods)) 767 } 768 769 func newTimer(L *lua.LState) int { 770 atTime := L.Get(1) 771 cb := L.CheckFunction(2) 772 L.Pop(2) 773 L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName)) 774 L.Push(cb) 775 L.Push(atTime) 776 return timerSchedule(L) 777 } 778 779 var timerMethods = map[string]lua.LGFunction{ 780 "cancel": timerCancel, 781 "schedule": timerSchedule, 782 } 783 784 func timerCancel(L *lua.LState) int { 785 timer := L.CheckFunction(1) 786 L.RawSet(stateTimerTable(L), timer, lua.LNil) 787 return 0 788 } 789 790 func timerSchedule(L *lua.LState) int { 791 timer := L.CheckFunction(1) 792 atTime := lua.LNil 793 if L.Get(2) != lua.LNil { 794 atTime = L.CheckNumber(2) 795 } 796 797 L.RawSet(stateTimerTable(L), timer, atTime) 798 return 0 799 } 800 801 func toTime(lsec lua.LNumber) time.Time { 802 fsec := float64(lsec) 803 sec := int64(fsec) 804 nsec := int64((fsec - float64(sec)) * 1.0e9) 805 806 return time.Unix(sec, nsec) 807 } 808 809 func runTimers(L *lua.LState, parentTimer *time.Timer) { 810 hasNext := false 811 var nextTime time.Time 812 813 now := time.Now() 814 timers := stateTimerTable(L) 815 816 timer, luaT := timers.Next(lua.LNil) 817 for timer != lua.LNil { 818 t := toTime(luaT.(lua.LNumber)) 819 if t.Compare(now) <= 0 { 820 L.RawSet(timers, timer, lua.LNil) 821 err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT) 822 if err != nil { 823 panic(err) 824 } 825 timer = lua.LNil 826 hasNext = false 827 } else if !hasNext || t.Compare(nextTime) < 0 { 828 hasNext = true 829 nextTime = t 830 } 831 832 timer, luaT = timers.Next(timer) 833 } 834 835 if hasNext { 836 parentTimer.Reset(time.Until(nextTime)) 837 } else { 838 parentTimer.Stop() 839 } 840 }