mqttagent.go (18589B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package mqttagent 18 19 import ( 20 "errors" 21 "fmt" 22 "log" 23 "os" 24 "strings" 25 "time" 26 27 "github.com/go-mqtt/mqtt" 28 "github.com/yuin/gluamapper" 29 "github.com/yuin/gopher-lua" 30 ) 31 32 type MqttAgent interface { 33 Setup(L *lua.LState) 34 Log(L *lua.LState, msg *MqttMessage) 35 Teardown(L *lua.LState) 36 } 37 38 type MqttReloadingAgent interface { 39 Setup(L *lua.LState) 40 Log(L *lua.LState, msg *MqttMessage) 41 ReloadBegin(oldL, newL *lua.LState) 42 ReloadAbort(oldL, newL *lua.LState) 43 ReloadEnd(oldL, newL *lua.LState) 44 Teardown(L *lua.LState) 45 } 46 47 type MqttMessage struct { 48 Timestamp float64 49 ClientId int 50 Topic []byte 51 Message []byte 52 } 53 54 func Run(agent MqttAgent, main_script string, capacity int) { 55 fromMqtt := make(chan MqttMessage, capacity) 56 57 L := lua.NewState() 58 defer L.Close() 59 60 agent.Setup(L) 61 defer agent.Teardown(L) 62 63 hostname, err := os.Hostname() 64 if err != nil { 65 hostname = "<unknown>" 66 } 67 68 idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()) 69 registerMqttClientType(L) 70 registerTimerType(L) 71 registerState(L, idString, fromMqtt) 72 defer cleanupClients(L) 73 74 if err := L.DoFile(main_script); err != nil { 75 panic(err) 76 } 77 78 timer := time.NewTimer(0) 79 defer timer.Stop() 80 81 log.Println(idString, "started") 82 83 for { 84 select { 85 case msg, ok := <-fromMqtt: 86 87 if !ok { 88 log.Println("fromMqtt is closed") 89 break 90 } 91 92 processMsg(L, agent, &msg) 93 94 case <-timer.C: 95 } 96 97 runTimers(L, timer) 98 99 if stateReloadRequested(L) { 100 L = reload(L, agent, main_script) 101 runTimers(L, timer) 102 stateRequestReload(L, lua.LNil) 103 } 104 105 if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) { 106 break 107 } 108 } 109 110 log.Println(idString, "finished") 111 } 112 113 func cleanupClients(L *lua.LState) { 114 cnxTbl := stateCnxTable(L) 115 if cnxTbl == nil { 116 return 117 } 118 119 L.ForEach(cnxTbl, func(key, value lua.LValue) { 120 cnx := value.(*lua.LTable) 121 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 122 if err := client.Disconnect(nil); err != nil { 123 log.Printf("cleanup client %s: %v", lua.LVAsString(key), err) 124 } 125 }) 126 } 127 128 func dispatchMsg(L *lua.LState, msg *MqttMessage, cnx, key, value lua.LValue) { 129 skey, ok := key.(lua.LString) 130 topic := string(msg.Topic) 131 132 if ok && match(topic, string(skey)) { 133 err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, 134 cnx, 135 lua.LString(string(msg.Message)), 136 lua.LString(topic), 137 lua.LNumber(msg.Timestamp)) 138 if err != nil { 139 panic(err) 140 } 141 } 142 } 143 144 func matchSliced(actual, filter []string) bool { 145 if len(filter) == 0 { 146 return len(actual) == 0 147 } 148 149 if filter[0] == "#" { 150 if len(filter) == 1 { 151 return true 152 } 153 154 for i := range actual { 155 if matchSliced(actual[i:], filter[1:]) { 156 return true 157 } 158 } 159 160 return false 161 } 162 163 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 164 return matchSliced(actual[1:], filter[1:]) 165 } 166 167 return false 168 } 169 170 func match(actual, filter string) bool { 171 return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) 172 } 173 174 func tableIsEmpty(t *lua.LTable) bool { 175 key, _ := t.Next(lua.LNil) 176 return key == lua.LNil 177 } 178 179 func processMsg(L *lua.LState, agent MqttAgent, msg *MqttMessage) { 180 agent.Log(L, msg) 181 182 cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) 183 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 184 L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) }) 185 186 if tableIsEmpty(subTbl) { 187 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 188 if err := client.Disconnect(nil); err != nil { 189 log.Println("disconnect empty client:", err) 190 } 191 L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) 192 } 193 } 194 195 func mqttRead(client *mqtt.Client, toLua chan<- MqttMessage, id int) { 196 var big *mqtt.BigMessage 197 198 for { 199 message, topic, err := client.ReadSlices() 200 t := float64(time.Now().UnixMicro()) * 1.0e-6 201 202 switch { 203 case err == nil: 204 toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)} 205 206 case errors.As(err, &big): 207 data, err := big.ReadAll() 208 if err != nil { 209 log.Println("mqttRead big message:", err) 210 } else { 211 toLua <- MqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data} 212 } 213 214 case errors.Is(err, mqtt.ErrClosed): 215 log.Println("mqttRead finishing:", err) 216 return 217 218 case mqtt.IsConnectionRefused(err): 219 log.Println("mqttRead connection refused:", err) 220 time.Sleep(15 * time.Minute) 221 222 default: 223 log.Println("mqttRead:", err) 224 time.Sleep(2 * time.Second) 225 } 226 } 227 } 228 229 func reload(oldL *lua.LState, agent MqttAgent, main_script string) *lua.LState { 230 log.Println("Reloading", main_script) 231 reloader, isReloader := agent.(MqttReloadingAgent) 232 233 newL := lua.NewState() 234 235 if isReloader { 236 reloader.ReloadBegin(oldL, newL) 237 } else { 238 agent.Setup(newL) 239 } 240 241 registerMqttClientType(newL) 242 registerTimerType(newL) 243 244 stateReloadBegin(oldL, newL) 245 246 if err := newL.DoFile(main_script); err != nil { 247 log.Println("Reload failed:", err) 248 stateReloadAbort(oldL, newL) 249 if isReloader { 250 reloader.ReloadAbort(oldL, newL) 251 } else { 252 agent.Teardown(newL) 253 } 254 newL.Close() 255 return oldL 256 } else { 257 stateReloadEnd(oldL, newL) 258 if isReloader { 259 reloader.ReloadEnd(oldL, newL) 260 } else { 261 agent.Teardown(oldL) 262 } 263 oldL.Close() 264 log.Println("Reload successful") 265 return newL 266 } 267 } 268 269 func dup(src []byte) []byte { 270 res := make([]byte, len(src)) 271 copy(res, src) 272 return res 273 } 274 275 func newUserData(L *lua.LState, v interface{}) *lua.LUserData { 276 res := L.NewUserData() 277 res.Value = v 278 return res 279 } 280 281 /********** State Object in the Lua Interpreter **********/ 282 283 const luaStateName = "_mqttagent" 284 const keyChanToLua = 1 285 const keyClientPrefix = 2 286 const keyClientNextId = 3 287 const keyCfgMap = 4 288 const keyCnxTable = 5 289 const keyTimerTable = 6 290 const keyReloadRequest = 7 291 const keyOldCfgMap = 8 292 293 func registerState(L *lua.LState, clientPrefix string, toLua chan<- MqttMessage) { 294 st := L.NewTable() 295 L.RawSetInt(st, keyChanToLua, newUserData(L, toLua)) 296 L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix)) 297 L.RawSetInt(st, keyClientNextId, lua.LNumber(1)) 298 L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap))) 299 L.RawSetInt(st, keyCnxTable, L.NewTable()) 300 L.RawSetInt(st, keyTimerTable, L.NewTable()) 301 L.SetGlobal(luaStateName, st) 302 L.SetGlobal("reload", L.NewFunction(requestReload)) 303 } 304 305 func stateReloadBegin(oldL, newL *lua.LState) { 306 oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable) 307 toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- MqttMessage) 308 clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix) 309 nextId := oldL.RawGetInt(oldSt, keyClientNextId) 310 cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 311 312 st := newL.NewTable() 313 newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua)) 314 newL.RawSetInt(st, keyClientPrefix, clientPrefix) 315 newL.RawSetInt(st, keyClientNextId, nextId) 316 newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap))) 317 newL.RawSetInt(st, keyCnxTable, newL.NewTable()) 318 newL.RawSetInt(st, keyTimerTable, newL.NewTable()) 319 newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap)) 320 newL.SetGlobal(luaStateName, st) 321 newL.SetGlobal("reload", newL.NewFunction(requestReload)) 322 } 323 324 func stateReloadAbort(oldL, newL *lua.LState) { 325 statePartialCleanup(newL, oldL) 326 } 327 328 func stateReloadEnd(oldL, newL *lua.LState) { 329 statePartialCleanup(oldL, newL) 330 newSt := newL.GetGlobal(luaStateName).(*lua.LTable) 331 newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil) 332 } 333 334 func statePartialCleanup(staleL, keptL *lua.LState) { 335 staleCnxTable := stateCnxTable(staleL) 336 keptCnxTable := stateCnxTable(keptL) 337 338 staleL.ForEach(staleCnxTable, func(key, value lua.LValue) { 339 clientId := int(key.(lua.LNumber)) 340 if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil { 341 client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client) 342 client.Close() 343 } 344 }) 345 346 keptL.ForEach(keptCnxTable, func(key, value lua.LValue) { 347 clientId := int(key.(lua.LNumber)) 348 keptCnx := value.(*lua.LTable) 349 if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) { 350 return 351 } 352 353 client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 354 staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable) 355 356 if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) { 357 panic("This shouldn't happen") 358 } 359 360 keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable) 361 staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable) 362 363 staleL.ForEach(staleSub, func(key, _ lua.LValue) { 364 topic := string(key.(lua.LString)) 365 if keptL.GetField(keptSub, topic) == lua.LNil { 366 log.Println("Unsubscribing from stale topic", topic) 367 if err := client.Unsubscribe(nil, topic); err != nil { 368 log.Println("Failed to unsubscribe:", err) 369 } 370 } 371 }) 372 373 keptL.ForEach(keptSub, func(key, _ lua.LValue) { 374 topic := string(key.(lua.LString)) 375 if staleL.GetField(staleSub, topic) == lua.LNil { 376 log.Println("Subscribing to new topic", topic) 377 if err := client.Subscribe(nil, topic); err != nil { 378 log.Println("Failed to subscribe:", err) 379 } 380 } 381 }) 382 383 keptL.RawSetInt(keptCnx, keyReused, lua.LNil) 384 }) 385 } 386 387 func stateValue(L *lua.LState, key int) lua.LValue { 388 st := L.GetGlobal(luaStateName) 389 return L.RawGetInt(st.(*lua.LTable), key) 390 } 391 392 func stateChanToLua(L *lua.LState) chan<- MqttMessage { 393 ud := stateValue(L, keyChanToLua) 394 return ud.(*lua.LUserData).Value.(chan<- MqttMessage) 395 } 396 397 func stateClientNextId(L *lua.LState) (int, string) { 398 st := L.GetGlobal(luaStateName).(*lua.LTable) 399 result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber)) 400 L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1)) 401 prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix)) 402 return result, fmt.Sprintf("%s-%d", prefix, result) 403 } 404 405 func stateCfgMap(L *lua.LState) mqttConfigMap { 406 return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 407 } 408 409 func stateCnxTable(L *lua.LState) *lua.LTable { 410 return stateValue(L, keyCnxTable).(*lua.LTable) 411 } 412 413 func stateTimerTable(L *lua.LState) *lua.LTable { 414 return stateValue(L, keyTimerTable).(*lua.LTable) 415 } 416 417 func stateReloadRequested(L *lua.LState) bool { 418 return lua.LVAsBool(stateValue(L, keyReloadRequest)) 419 } 420 421 func stateRequestReload(L *lua.LState, v lua.LValue) { 422 st := L.GetGlobal(luaStateName).(*lua.LTable) 423 L.RawSetInt(st, keyReloadRequest, v) 424 } 425 426 func stateOldCfgMap(L *lua.LState) mqttConfigMap { 427 val := stateValue(L, keyOldCfgMap) 428 if val == lua.LNil { 429 return nil 430 } else { 431 return val.(*lua.LUserData).Value.(mqttConfigMap) 432 } 433 } 434 435 func requestReload(L *lua.LState) int { 436 stateRequestReload(L, lua.LTrue) 437 return 0 438 } 439 440 /********** Lua Object for MQTT client **********/ 441 442 const luaMqttClientTypeName = "mqttclient" 443 const keyClient = 1 444 const keySubTable = 2 445 const keyReused = 3 446 447 func registerMqttClientType(L *lua.LState) { 448 mt := L.NewTypeMetatable(luaMqttClientTypeName) 449 L.SetGlobal(luaMqttClientTypeName, mt) 450 L.SetField(mt, "new", L.NewFunction(newMqttClient)) 451 L.SetField(mt, "__call", L.NewFunction(luaPublish)) 452 L.SetField(mt, "__index", L.NewFunction(luaQuery)) 453 L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) 454 } 455 456 type mqttConfig struct { 457 Connection string 458 PauseTimeout string 459 AtLeastOnceMax int 460 ExactlyOnceMax int 461 UserName string 462 Password string 463 Will struct { 464 Topic string 465 Message string 466 Retain bool 467 AtLeastOnce bool 468 ExactlyOnce bool 469 } 470 KeepAlive uint16 471 CleanSession bool 472 } 473 474 type mqttClientEntry struct { 475 client *mqtt.Client 476 id int 477 } 478 479 type mqttConfigMap map[mqttConfig]mqttClientEntry 480 481 func mqttConfigBytes(src string) []byte { 482 if src == "" { 483 return nil 484 } else { 485 return []byte(src) 486 } 487 } 488 489 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) { 490 pto, err := time.ParseDuration(config.PauseTimeout) 491 if err != nil { 492 pto = time.Second 493 } 494 495 processed_cfg := mqtt.Config{ 496 Dialer: mqtt.NewDialer("tcp", config.Connection), 497 PauseTimeout: pto, 498 AtLeastOnceMax: config.AtLeastOnceMax, 499 ExactlyOnceMax: config.ExactlyOnceMax, 500 UserName: config.UserName, 501 Password: mqttConfigBytes(config.Password), 502 Will: struct { 503 Topic string 504 Message []byte 505 Retain bool 506 AtLeastOnce bool 507 ExactlyOnce bool 508 }{ 509 Topic: config.Will.Topic, 510 Message: mqttConfigBytes(config.Will.Message), 511 Retain: config.Will.Retain, 512 AtLeastOnce: config.Will.AtLeastOnce, 513 ExactlyOnce: config.Will.ExactlyOnce, 514 }, 515 KeepAlive: config.KeepAlive, 516 CleanSession: config.CleanSession, 517 } 518 519 return mqtt.VolatileSession(id, &processed_cfg) 520 } 521 522 func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue { 523 res := L.NewTable() 524 L.RawSetInt(res, keyClient, newUserData(L, client)) 525 L.RawSetInt(res, keySubTable, L.NewTable()) 526 if reused { 527 L.RawSetInt(res, keyReused, lua.LTrue) 528 } 529 L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName)) 530 L.RawSetInt(stateCnxTable(L), id, res) 531 return res 532 } 533 534 func newMqttClient(L *lua.LState) int { 535 var config mqttConfig 536 if err := gluamapper.Map(L.CheckTable(1), &config); err != nil { 537 log.Println("newMqttClient:", err) 538 L.Push(lua.LNil) 539 L.Push(lua.LString(err.Error())) 540 return 2 541 } 542 543 cfgMap := stateCfgMap(L) 544 545 if cfg, found := cfgMap[config]; found { 546 res := L.RawGetInt(stateCnxTable(L), cfg.id) 547 tbl := res.(*lua.LTable) 548 if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client { 549 panic("Inconsistent configuration table") 550 } 551 552 L.Push(res) 553 return 1 554 } 555 556 if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil { 557 if cfg, found := oldCfgMap[config]; found { 558 cfgMap[config] = cfg 559 L.Push(registerClient(L, cfg.id, cfg.client, true)) 560 return 1 561 } 562 } 563 564 id, idString := stateClientNextId(L) 565 client, err := newClient(&config, idString) 566 if err != nil { 567 log.Println("newMqttClient:", err) 568 L.Push(lua.LNil) 569 L.Push(lua.LString(err.Error())) 570 return 2 571 } 572 go mqttRead(client, stateChanToLua(L), id) 573 574 cfgMap[config] = mqttClientEntry{id: id, client: client} 575 576 L.Push(registerClient(L, id, client, false)) 577 return 1 578 } 579 580 func luaPublish(L *lua.LState) int { 581 cnx := L.CheckTable(1) 582 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 583 584 if L.GetTop() == 1 { 585 if err := client.Ping(nil); err != nil { 586 log.Println("luaPing:", err) 587 L.Push(lua.LNil) 588 L.Push(lua.LString(err.Error())) 589 return 2 590 } else { 591 L.Push(lua.LTrue) 592 return 1 593 } 594 } 595 596 message := L.CheckString(2) 597 topic := L.CheckString(3) 598 599 if err := client.Publish(nil, []byte(message), topic); err != nil { 600 L.Push(lua.LNil) 601 L.Push(lua.LString(err.Error())) 602 return 2 603 } else { 604 L.Push(lua.LTrue) 605 return 1 606 } 607 } 608 609 func luaQuery(L *lua.LState) int { 610 cnx := L.CheckTable(1) 611 topic := L.CheckString(2) 612 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 613 L.Push(L.GetField(subTbl, topic)) 614 return 1 615 } 616 617 func luaSubscribe(L *lua.LState) int { 618 var err error 619 cnx := L.CheckTable(1) 620 topic := L.CheckString(2) 621 callback := L.OptFunction(3, nil) 622 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 623 tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 624 625 _, is_new := L.GetField(tbl, topic).(*lua.LNilType) 626 627 if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) { 628 if callback == nil { 629 err = client.Unsubscribe(nil, topic) 630 } else if is_new { 631 err = client.Subscribe(nil, topic) 632 } 633 } 634 635 if err != nil { 636 log.Println("luaSubscribe:", err) 637 L.Push(lua.LNil) 638 L.Push(lua.LString(err.Error())) 639 return 2 640 } else { 641 if callback == nil { 642 if is_new { 643 log.Printf("Not subscribed to %q", topic) 644 } else { 645 log.Printf("Unsubscribed from %q", topic) 646 } 647 L.SetField(tbl, topic, lua.LNil) 648 } else { 649 if is_new { 650 log.Printf("Subscribed to %q", topic) 651 } else { 652 log.Printf("Updating subscription to %q", topic) 653 } 654 L.SetField(tbl, topic, callback) 655 } 656 657 L.Push(lua.LTrue) 658 return 1 659 } 660 } 661 662 /********** Lua Object for timers **********/ 663 664 const luaTimerTypeName = "timer" 665 666 func registerTimerType(L *lua.LState) { 667 mt := L.NewTypeMetatable(luaTimerTypeName) 668 L.SetGlobal(luaTimerTypeName, mt) 669 L.SetField(mt, "new", L.NewFunction(newTimer)) 670 L.SetField(mt, "schedule", L.NewFunction(timerSchedule)) 671 L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods)) 672 } 673 674 func newTimer(L *lua.LState) int { 675 atTime := L.Get(1) 676 cb := L.CheckFunction(2) 677 L.Pop(2) 678 L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName)) 679 L.Push(cb) 680 L.Push(atTime) 681 return timerSchedule(L) 682 } 683 684 var timerMethods = map[string]lua.LGFunction{ 685 "cancel": timerCancel, 686 "schedule": timerSchedule, 687 } 688 689 func timerCancel(L *lua.LState) int { 690 timer := L.CheckFunction(1) 691 L.RawSet(stateTimerTable(L), timer, lua.LNil) 692 return 0 693 } 694 695 func timerSchedule(L *lua.LState) int { 696 timer := L.CheckFunction(1) 697 atTime := lua.LNil 698 if L.Get(2) != lua.LNil { 699 atTime = L.CheckNumber(2) 700 } 701 702 L.RawSet(stateTimerTable(L), timer, atTime) 703 return 0 704 } 705 706 func toTime(lsec lua.LNumber) time.Time { 707 fsec := float64(lsec) 708 sec := int64(fsec) 709 nsec := int64((fsec - float64(sec)) * 1.0e9) 710 711 return time.Unix(sec, nsec) 712 } 713 714 func runTimers(L *lua.LState, parentTimer *time.Timer) { 715 hasNext := false 716 var nextTime time.Time 717 718 now := time.Now() 719 timers := stateTimerTable(L) 720 721 timer, luaT := timers.Next(lua.LNil) 722 for timer != lua.LNil { 723 t := toTime(luaT.(lua.LNumber)) 724 if t.Compare(now) <= 0 { 725 L.RawSet(timers, timer, lua.LNil) 726 err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT) 727 if err != nil { 728 panic(err) 729 } 730 timer = lua.LNil 731 hasNext = false 732 } else if !hasNext || t.Compare(nextTime) < 0 { 733 hasNext = true 734 nextTime = t 735 } 736 737 timer, luaT = timers.Next(timer) 738 } 739 740 if hasNext { 741 parentTimer.Reset(time.Until(nextTime)) 742 } else { 743 parentTimer.Stop() 744 } 745 }