mqttagent.go (18744B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package mqttagent 18 19 import ( 20 "errors" 21 "fmt" 22 "log" 23 "os" 24 "strings" 25 "time" 26 27 "github.com/go-mqtt/mqtt" 28 "github.com/yuin/gluamapper" 29 "github.com/yuin/gopher-lua" 30 ) 31 32 type MqttAgent interface { 33 Setup(L *lua.LState) 34 Log(L *lua.LState, msg *MqttMessage) 35 Teardown(L *lua.LState) 36 } 37 38 type MqttReloadingAgent interface { 39 Setup(L *lua.LState) 40 Log(L *lua.LState, msg *MqttMessage) 41 ReloadBegin(oldL, newL *lua.LState) 42 ReloadAbort(oldL, newL *lua.LState) 43 ReloadEnd(oldL, newL *lua.LState) 44 Teardown(L *lua.LState) 45 } 46 47 type MqttMessage struct { 48 Timestamp float64 49 ClientId int 50 Topic []byte 51 Message []byte 52 } 53 54 func Run(agent MqttAgent, main_script string, capacity int) { 55 fromMqtt := make(chan MqttMessage, capacity) 56 57 L := lua.NewState() 58 defer L.Close() 59 60 agent.Setup(L) 61 defer agent.Teardown(L) 62 63 hostname, err := os.Hostname() 64 if err != nil { 65 hostname = "<unknown>" 66 } 67 68 idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()) 69 registerMqttClientType(L) 70 registerTimerType(L) 71 registerState(L, idString, fromMqtt) 72 defer cleanupClients(L) 73 74 if err := L.DoFile(main_script); err != nil { 75 panic(err) 76 } 77 78 timer := time.NewTimer(0) 79 defer timer.Stop() 80 81 log.Println(idString, "started") 82 83 for { 84 select { 85 case msg, ok := <-fromMqtt: 86 87 if !ok { 88 log.Println("fromMqtt is closed") 89 break 90 } 91 92 processMsg(L, agent, &msg) 93 94 case <-timer.C: 95 } 96 97 runTimers(L, timer) 98 99 if stateReloadRequested(L) { 100 L = reload(L, agent, main_script) 101 runTimers(L, timer) 102 stateRequestReload(L, lua.LNil) 103 } 104 105 if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) { 106 break 107 } 108 } 109 110 log.Println(idString, "finished") 111 } 112 113 func cleanupClients(L *lua.LState) { 114 cnxTbl := stateCnxTable(L) 115 if cnxTbl == nil { 116 return 117 } 118 119 L.ForEach(cnxTbl, func(key, value lua.LValue) { 120 cnx := value.(*lua.LTable) 121 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 122 if err := client.Disconnect(nil); err != nil { 123 log.Printf("cleanup client %s: %v", lua.LVAsString(key), err) 124 } 125 }) 126 } 127 128 func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) { 129 skey, ok := key.(lua.LString) 130 topic := string(msg.Topic) 131 132 if ok && match(topic, string(skey)) { 133 err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, 134 cnx, 135 lua.LString(string(msg.Message)), 136 lua.LString(topic), 137 lua.LNumber(msg.Timestamp)) 138 if err != nil { 139 panic(err) 140 } 141 } 142 } 143 144 func matchSliced(actual, filter []string) bool { 145 if len(filter) == 0 { 146 return len(actual) == 0 147 } 148 149 if filter[0] == "#" { 150 if len(filter) == 1 { 151 return true 152 } 153 154 for i := range actual { 155 if matchSliced(actual[i:], filter[1:]) { 156 return true 157 } 158 } 159 160 return false 161 } 162 163 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 164 return matchSliced(actual[1:], filter[1:]) 165 } 166 167 return false 168 } 169 170 func match(actual, filter string) bool { 171 return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) 172 } 173 174 func tableIsEmpty(t *lua.LTable) bool { 175 key, _ := t.Next(lua.LNil) 176 return key == lua.LNil 177 } 178 179 func processMsg(L *lua.LState, agent MqttAgent, msg *MqttMessage) { 180 agent.Log(L, msg) 181 182 cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) 183 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 184 L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) }) 185 186 if tableIsEmpty(subTbl) { 187 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 188 if err := client.Disconnect(nil); err != nil { 189 log.Println("disconnect empty client:", err) 190 } 191 L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) 192 } 193 } 194 195 func mqttRead(client *mqtt.Client, toLua chan<- MqttMessage, id int) { 196 var big *mqtt.BigMessage 197 198 for { 199 message, topic, err := client.ReadSlices() 200 t := float64(time.Now().UnixMicro()) * 1.0e-6 201 202 switch { 203 case err == nil: 204 toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)} 205 206 case errors.As(err, &big): 207 data, err := big.ReadAll() 208 if err != nil { 209 log.Println("mqttRead big message:", err) 210 } else { 211 toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data} 212 } 213 214 case errors.Is(err, mqtt.ErrClosed): 215 log.Println("mqttRead finishing:", err) 216 return 217 218 case mqtt.IsConnectionRefused(err): 219 log.Println("mqttRead connection refused:", err) 220 time.Sleep(15 * time.Minute) 221 222 default: 223 log.Println("mqttRead:", err) 224 time.Sleep(2 * time.Second) 225 } 226 } 227 } 228 229 func reload(oldL *lua.LState, agent MqttAgent, main_script string) *lua.LState { 230 log.Println("Reloading", main_script) 231 reloader, isReloader := agent.(MqttReloadingAgent) 232 233 newL := lua.NewState() 234 235 if isReloader { 236 reloader.ReloadBegin(oldL, newL) 237 } else { 238 agent.Setup(newL) 239 } 240 241 registerMqttClientType(newL) 242 registerTimerType(newL) 243 244 stateReloadBegin(oldL, newL) 245 246 if err := newL.DoFile(main_script); err != nil { 247 log.Println("Reload failed:", err) 248 stateReloadAbort(oldL, newL) 249 if isReloader { 250 reloader.ReloadAbort(oldL, newL) 251 } else { 252 agent.Teardown(newL) 253 } 254 newL.Close() 255 return oldL 256 } else { 257 stateReloadEnd(oldL, newL) 258 if isReloader { 259 reloader.ReloadEnd(oldL, newL) 260 } else { 261 agent.Teardown(oldL) 262 } 263 oldL.Close() 264 log.Println("Reload successful") 265 return newL 266 } 267 } 268 269 func dup(src []byte) []byte { 270 res := make([]byte, len(src)) 271 copy(res, src) 272 return res 273 } 274 275 func newUserData(L *lua.LState, v interface{}) *lua.LUserData { 276 res := L.NewUserData() 277 res.Value = v 278 return res 279 } 280 281 /********** State Object in the Lua Interpreter **********/ 282 283 const luaStateName = "_mqttagent" 284 const keyChanToLua = 1 285 const keyClientPrefix = 2 286 const keyClientNextId = 3 287 const keyCfgMap = 4 288 const keyCnxTable = 5 289 const keyTimerTable = 6 290 const keyReloadRequest = 7 291 const keyOldCfgMap = 8 292 293 func registerState(L *lua.LState, clientPrefix string, toLua chan<- MqttMessage) { 294 st := L.NewTable() 295 L.RawSetInt(st, keyChanToLua, newUserData(L, toLua)) 296 L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix)) 297 L.RawSetInt(st, keyClientNextId, lua.LNumber(1)) 298 L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap))) 299 L.RawSetInt(st, keyCnxTable, L.NewTable()) 300 L.RawSetInt(st, keyTimerTable, L.NewTable()) 301 L.SetGlobal(luaStateName, st) 302 L.SetGlobal("reload", L.NewFunction(requestReload)) 303 } 304 305 func stateReloadBegin(oldL, newL *lua.LState) { 306 oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable) 307 toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- MqttMessage) 308 clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix) 309 nextId := oldL.RawGetInt(oldSt, keyClientNextId) 310 cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 311 312 st := newL.NewTable() 313 newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua)) 314 newL.RawSetInt(st, keyClientPrefix, clientPrefix) 315 newL.RawSetInt(st, keyClientNextId, nextId) 316 newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap))) 317 newL.RawSetInt(st, keyCnxTable, newL.NewTable()) 318 newL.RawSetInt(st, keyTimerTable, newL.NewTable()) 319 newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap)) 320 newL.SetGlobal(luaStateName, st) 321 newL.SetGlobal("reload", newL.NewFunction(requestReload)) 322 } 323 324 func stateReloadAbort(oldL, newL *lua.LState) { 325 statePartialCleanup(newL, oldL) 326 } 327 328 func stateReloadEnd(oldL, newL *lua.LState) { 329 statePartialCleanup(oldL, newL) 330 newSt := newL.GetGlobal(luaStateName).(*lua.LTable) 331 newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil) 332 } 333 334 func statePartialCleanup(staleL, keptL *lua.LState) { 335 staleCnxTable := stateCnxTable(staleL) 336 keptCnxTable := stateCnxTable(keptL) 337 338 staleL.ForEach(staleCnxTable, func(key, value lua.LValue) { 339 clientId := int(key.(lua.LNumber)) 340 if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil { 341 client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client) 342 client.Close() 343 } 344 }) 345 346 keptL.ForEach(keptCnxTable, func(key, value lua.LValue) { 347 clientId := int(key.(lua.LNumber)) 348 keptCnx := value.(*lua.LTable) 349 if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) { 350 return 351 } 352 353 client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 354 staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable) 355 356 if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) { 357 panic("This shouldn't happen") 358 } 359 360 keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable) 361 staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable) 362 363 staleL.ForEach(staleSub, func(key, _ lua.LValue) { 364 topic := string(key.(lua.LString)) 365 if keptL.GetField(keptSub, topic) == lua.LNil { 366 log.Println("Unsubscribing from stale topic", topic) 367 if err := client.Unsubscribe(nil, topic); err != nil { 368 log.Println("Failed to unsubscribe:", err) 369 } 370 } 371 }) 372 373 keptL.ForEach(keptSub, func(key, _ lua.LValue) { 374 topic := string(key.(lua.LString)) 375 if staleL.GetField(staleSub, topic) == lua.LNil { 376 log.Println("Subscribing to new topic", topic) 377 if err := client.Subscribe(nil, topic); err != nil { 378 log.Println("Failed to subscribe:", err) 379 } 380 } 381 }) 382 383 keptL.RawSetInt(keptCnx, keyReused, lua.LNil) 384 }) 385 } 386 387 func stateValue(L *lua.LState, key int) lua.LValue { 388 st := L.GetGlobal(luaStateName) 389 return L.RawGetInt(st.(*lua.LTable), key) 390 } 391 392 func stateChanToLua(L *lua.LState) chan<- MqttMessage { 393 ud := stateValue(L, keyChanToLua) 394 return ud.(*lua.LUserData).Value.(chan<- MqttMessage) 395 } 396 397 func stateClientNextId(L *lua.LState) (int, string) { 398 st := L.GetGlobal(luaStateName).(*lua.LTable) 399 result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber)) 400 L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1)) 401 prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix)) 402 return result, fmt.Sprintf("%s-%d", prefix, result) 403 } 404 405 func stateCfgMap(L *lua.LState) mqttConfigMap { 406 return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 407 } 408 409 func stateCnxTable(L *lua.LState) *lua.LTable { 410 return stateValue(L, keyCnxTable).(*lua.LTable) 411 } 412 413 func stateTimerTable(L *lua.LState) *lua.LTable { 414 return stateValue(L, keyTimerTable).(*lua.LTable) 415 } 416 417 func stateReloadRequested(L *lua.LState) bool { 418 return lua.LVAsBool(stateValue(L, keyReloadRequest)) 419 } 420 421 func stateRequestReload(L *lua.LState, v lua.LValue) { 422 st := L.GetGlobal(luaStateName).(*lua.LTable) 423 L.RawSetInt(st, keyReloadRequest, v) 424 } 425 426 func stateOldCfgMap(L *lua.LState) mqttConfigMap { 427 val := stateValue(L, keyOldCfgMap) 428 if val == lua.LNil { 429 return nil 430 } else { 431 return val.(*lua.LUserData).Value.(mqttConfigMap) 432 } 433 } 434 435 func requestReload(L *lua.LState) int { 436 stateRequestReload(L, lua.LTrue) 437 return 0 438 } 439 440 /********** Lua Object for MQTT client **********/ 441 442 const luaMqttClientTypeName = "mqttclient" 443 const keyClient = 1 444 const keySubTable = 2 445 const keyReused = 3 446 447 func registerMqttClientType(L *lua.LState) { 448 mt := L.NewTypeMetatable(luaMqttClientTypeName) 449 L.SetGlobal(luaMqttClientTypeName, mt) 450 L.SetField(mt, "new", L.NewFunction(newMqttClient)) 451 L.SetField(mt, "__call", L.NewFunction(luaPublish)) 452 L.SetField(mt, "__index", L.NewFunction(luaQuery)) 453 L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) 454 } 455 456 type mqttConfig struct { 457 Connection string 458 TLS bool 459 PauseTimeout string 460 AtLeastOnceMax int 461 ExactlyOnceMax int 462 UserName string 463 Password string 464 Will struct { 465 Topic string 466 Message string 467 Retain bool 468 AtLeastOnce bool 469 ExactlyOnce bool 470 } 471 KeepAlive uint16 472 CleanSession bool 473 } 474 475 type mqttClientEntry struct { 476 client *mqtt.Client 477 id int 478 } 479 480 type mqttConfigMap map[mqttConfig]mqttClientEntry 481 482 func mqttConfigBytes(src string) []byte { 483 if src == "" { 484 return nil 485 } else { 486 return []byte(src) 487 } 488 } 489 490 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) { 491 pto, err := time.ParseDuration(config.PauseTimeout) 492 if err != nil { 493 pto = time.Second 494 } 495 496 var dialer mqtt.Dialer 497 498 if config.TLS { 499 dialer = mqtt.NewTLSDialer("tcp", config.Connection, nil) 500 } else { 501 dialer = mqtt.NewDialer("tcp", config.Connection) 502 } 503 504 processed_cfg := mqtt.Config{ 505 Dialer: dialer, 506 PauseTimeout: pto, 507 AtLeastOnceMax: config.AtLeastOnceMax, 508 ExactlyOnceMax: config.ExactlyOnceMax, 509 UserName: config.UserName, 510 Password: mqttConfigBytes(config.Password), 511 Will: struct { 512 Topic string 513 Message []byte 514 Retain bool 515 AtLeastOnce bool 516 ExactlyOnce bool 517 }{ 518 Topic: config.Will.Topic, 519 Message: mqttConfigBytes(config.Will.Message), 520 Retain: config.Will.Retain, 521 AtLeastOnce: config.Will.AtLeastOnce, 522 ExactlyOnce: config.Will.ExactlyOnce, 523 }, 524 KeepAlive: config.KeepAlive, 525 CleanSession: config.CleanSession, 526 } 527 528 return mqtt.VolatileSession(id, &processed_cfg) 529 } 530 531 func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue { 532 res := L.NewTable() 533 L.RawSetInt(res, keyClient, newUserData(L, client)) 534 L.RawSetInt(res, keySubTable, L.NewTable()) 535 if reused { 536 L.RawSetInt(res, keyReused, lua.LTrue) 537 } 538 L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName)) 539 L.RawSetInt(stateCnxTable(L), id, res) 540 return res 541 } 542 543 func newMqttClient(L *lua.LState) int { 544 var config mqttConfig 545 if err := gluamapper.Map(L.CheckTable(1), &config); err != nil { 546 log.Println("newMqttClient:", err) 547 L.Push(lua.LNil) 548 L.Push(lua.LString(err.Error())) 549 return 2 550 } 551 552 cfgMap := stateCfgMap(L) 553 554 if cfg, found := cfgMap[config]; found { 555 res := L.RawGetInt(stateCnxTable(L), cfg.id) 556 tbl := res.(*lua.LTable) 557 if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client { 558 panic("Inconsistent configuration table") 559 } 560 561 L.Push(res) 562 return 1 563 } 564 565 if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil { 566 if cfg, found := oldCfgMap[config]; found { 567 cfgMap[config] = cfg 568 L.Push(registerClient(L, cfg.id, cfg.client, true)) 569 return 1 570 } 571 } 572 573 id, idString := stateClientNextId(L) 574 client, err := newClient(&config, idString) 575 if err != nil { 576 log.Println("newMqttClient:", err) 577 L.Push(lua.LNil) 578 L.Push(lua.LString(err.Error())) 579 return 2 580 } 581 go mqttRead(client, stateChanToLua(L), id) 582 583 cfgMap[config] = mqttClientEntry{id: id, client: client} 584 585 L.Push(registerClient(L, id, client, false)) 586 return 1 587 } 588 589 func luaPublish(L *lua.LState) int { 590 cnx := L.CheckTable(1) 591 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 592 593 if L.GetTop() == 1 { 594 if err := client.Ping(nil); err != nil { 595 log.Println("luaPing:", err) 596 L.Push(lua.LNil) 597 L.Push(lua.LString(err.Error())) 598 return 2 599 } else { 600 L.Push(lua.LTrue) 601 return 1 602 } 603 } 604 605 message := L.CheckString(2) 606 topic := L.CheckString(3) 607 608 if err := client.Publish(nil, []byte(message), topic); err != nil { 609 L.Push(lua.LNil) 610 L.Push(lua.LString(err.Error())) 611 return 2 612 } else { 613 L.Push(lua.LTrue) 614 return 1 615 } 616 } 617 618 func luaQuery(L *lua.LState) int { 619 cnx := L.CheckTable(1) 620 topic := L.CheckString(2) 621 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 622 L.Push(L.GetField(subTbl, topic)) 623 return 1 624 } 625 626 func luaSubscribe(L *lua.LState) int { 627 var err error 628 cnx := L.CheckTable(1) 629 topic := L.CheckString(2) 630 callback := L.OptFunction(3, nil) 631 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 632 tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 633 634 _, is_new := L.GetField(tbl, topic).(*lua.LNilType) 635 636 if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) { 637 if callback == nil { 638 err = client.Unsubscribe(nil, topic) 639 } else if is_new { 640 err = client.Subscribe(nil, topic) 641 } 642 } 643 644 if err != nil { 645 log.Println("luaSubscribe:", err) 646 L.Push(lua.LNil) 647 L.Push(lua.LString(err.Error())) 648 return 2 649 } else { 650 if callback == nil { 651 if is_new { 652 log.Printf("Not subscribed to %q", topic) 653 } else { 654 log.Printf("Unsubscribed from %q", topic) 655 } 656 L.SetField(tbl, topic, lua.LNil) 657 } else { 658 if is_new { 659 log.Printf("Subscribed to %q", topic) 660 } else { 661 log.Printf("Updating subscription to %q", topic) 662 } 663 L.SetField(tbl, topic, callback) 664 } 665 666 L.Push(lua.LTrue) 667 return 1 668 } 669 } 670 671 /********** Lua Object for timers **********/ 672 673 const luaTimerTypeName = "timer" 674 675 func registerTimerType(L *lua.LState) { 676 mt := L.NewTypeMetatable(luaTimerTypeName) 677 L.SetGlobal(luaTimerTypeName, mt) 678 L.SetField(mt, "new", L.NewFunction(newTimer)) 679 L.SetField(mt, "schedule", L.NewFunction(timerSchedule)) 680 L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods)) 681 } 682 683 func newTimer(L *lua.LState) int { 684 atTime := L.Get(1) 685 cb := L.CheckFunction(2) 686 L.Pop(2) 687 L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName)) 688 L.Push(cb) 689 L.Push(atTime) 690 return timerSchedule(L) 691 } 692 693 var timerMethods = map[string]lua.LGFunction{ 694 "cancel": timerCancel, 695 "schedule": timerSchedule, 696 } 697 698 func timerCancel(L *lua.LState) int { 699 timer := L.CheckFunction(1) 700 L.RawSet(stateTimerTable(L), timer, lua.LNil) 701 return 0 702 } 703 704 func timerSchedule(L *lua.LState) int { 705 timer := L.CheckFunction(1) 706 atTime := lua.LNil 707 if L.Get(2) != lua.LNil { 708 atTime = L.CheckNumber(2) 709 } 710 711 L.RawSet(stateTimerTable(L), timer, atTime) 712 return 0 713 } 714 715 func toTime(lsec lua.LNumber) time.Time { 716 fsec := float64(lsec) 717 sec := int64(fsec) 718 nsec := int64((fsec - float64(sec)) * 1.0e9) 719 720 return time.Unix(sec, nsec) 721 } 722 723 func runTimers(L *lua.LState, parentTimer *time.Timer) { 724 hasNext := false 725 var nextTime time.Time 726 727 now := time.Now() 728 timers := stateTimerTable(L) 729 730 timer, luaT := timers.Next(lua.LNil) 731 for timer != lua.LNil { 732 t := toTime(luaT.(lua.LNumber)) 733 if t.Compare(now) <= 0 { 734 L.RawSet(timers, timer, lua.LNil) 735 err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT) 736 if err != nil { 737 panic(err) 738 } 739 timer = lua.LNil 740 hasNext = false 741 } else if !hasNext || t.Compare(nextTime) < 0 { 742 hasNext = true 743 nextTime = t 744 } 745 746 timer, luaT = timers.Next(timer) 747 } 748 749 if hasNext { 750 parentTimer.Reset(time.Until(nextTime)) 751 } else { 752 parentTimer.Stop() 753 } 754 }