mqttagent.go (18976B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package mqttagent 18 19 import ( 20 "errors" 21 "fmt" 22 "log" 23 "os" 24 "strings" 25 "time" 26 27 "github.com/go-mqtt/mqtt" 28 "github.com/yuin/gluamapper" 29 "github.com/yuin/gopher-lua" 30 ) 31 32 type MqttAgent interface { 33 Setup(L *lua.LState) 34 Teardown(L *lua.LState) 35 } 36 37 type MqttReloadingAgent interface { 38 Setup(L *lua.LState) 39 ReloadBegin(oldL, newL *lua.LState) 40 ReloadAbort(oldL, newL *lua.LState) 41 ReloadEnd(oldL, newL *lua.LState) 42 Teardown(L *lua.LState) 43 } 44 45 type mqttMessage struct { 46 Timestamp float64 47 ClientId int 48 Topic []byte 49 Message []byte 50 } 51 52 func Run(agent MqttAgent, main_script string, capacity int) { 53 fromMqtt := make(chan mqttMessage, capacity) 54 55 L := lua.NewState() 56 defer L.Close() 57 58 agent.Setup(L) 59 defer agent.Teardown(L) 60 61 hostname, err := os.Hostname() 62 if err != nil { 63 hostname = "<unknown>" 64 } 65 66 idString := fmt.Sprintf("mqttagent-%s-%d", hostname, os.Getpid()) 67 registerMqttClientType(L) 68 registerTimerType(L) 69 registerState(L, agent, idString, fromMqtt) 70 defer cleanupClients(L) 71 72 if err := L.DoFile(main_script); err != nil { 73 panic(err) 74 } 75 76 timer := time.NewTimer(0) 77 defer timer.Stop() 78 79 log.Println(idString, "started") 80 81 for { 82 select { 83 case msg, ok := <-fromMqtt: 84 85 if !ok { 86 log.Println("fromMqtt is closed") 87 break 88 } 89 90 processMsg(L, &msg) 91 92 case <-timer.C: 93 } 94 95 runTimers(L, timer) 96 97 if stateReloadRequested(L) { 98 L = reload(L, main_script) 99 runTimers(L, timer) 100 stateRequestReload(L, lua.LNil) 101 } 102 103 if tableIsEmpty(stateCnxTable(L)) && tableIsEmpty(stateTimerTable(L)) { 104 break 105 } 106 } 107 108 log.Println(idString, "finished") 109 } 110 111 func cleanupClients(L *lua.LState) { 112 cnxTbl := stateCnxTable(L) 113 if cnxTbl == nil { 114 return 115 } 116 117 L.ForEach(cnxTbl, func(key, value lua.LValue) { 118 cnx := value.(*lua.LTable) 119 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 120 if err := client.Disconnect(nil); err != nil { 121 log.Printf("cleanup client %s: %v", lua.LVAsString(key), err) 122 } 123 }) 124 } 125 126 func dispatchMsg(L *lua.LState, msg *mqttMessage, cnx, key, value lua.LValue) { 127 skey, ok := key.(lua.LString) 128 topic := string(msg.Topic) 129 130 if ok && match(topic, string(skey)) { 131 err := L.CallByParam(lua.P{Fn: value, NRet: 0, Protect: true}, 132 cnx, 133 lua.LString(string(msg.Message)), 134 lua.LString(topic), 135 lua.LNumber(msg.Timestamp)) 136 if err != nil { 137 panic(err) 138 } 139 } 140 } 141 142 func matchSliced(actual, filter []string) bool { 143 if len(filter) == 0 { 144 return len(actual) == 0 145 } 146 147 if filter[0] == "#" { 148 if len(filter) == 1 { 149 return true 150 } 151 152 for i := range actual { 153 if matchSliced(actual[i:], filter[1:]) { 154 return true 155 } 156 } 157 158 return false 159 } 160 161 if len(actual) > 0 && (filter[0] == "+" || filter[0] == actual[0]) { 162 return matchSliced(actual[1:], filter[1:]) 163 } 164 165 return false 166 } 167 168 func match(actual, filter string) bool { 169 return matchSliced(strings.Split(actual, "/"), strings.Split(filter, "/")) 170 } 171 172 func tableIsEmpty(t *lua.LTable) bool { 173 key, _ := t.Next(lua.LNil) 174 return key == lua.LNil 175 } 176 177 func processMsg(L *lua.LState, msg *mqttMessage) { 178 cnx := L.RawGetInt(stateCnxTable(L), msg.ClientId).(*lua.LTable) 179 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 180 L.ForEach(subTbl, func(key, value lua.LValue) { dispatchMsg(L, msg, cnx, key, value) }) 181 182 if tableIsEmpty(subTbl) { 183 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 184 if err := client.Disconnect(nil); err != nil { 185 log.Println("disconnect empty client:", err) 186 } 187 L.RawSetInt(stateCnxTable(L), msg.ClientId, lua.LNil) 188 } 189 } 190 191 func mqttRead(client *mqtt.Client, toLua chan<- mqttMessage, id int) { 192 var big *mqtt.BigMessage 193 194 for { 195 message, topic, err := client.ReadSlices() 196 t := float64(time.Now().UnixMicro()) * 1.0e-6 197 198 switch { 199 case err == nil: 200 toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: dup(message)} 201 202 case errors.As(err, &big): 203 data, err := big.ReadAll() 204 if err != nil { 205 log.Println("mqttRead big message:", err) 206 } else { 207 toLua <- mqttMessage{Timestamp: t, ClientId: id, Topic: dup(topic), Message: data} 208 } 209 210 case errors.Is(err, mqtt.ErrClosed): 211 log.Println("mqttRead finishing:", err) 212 return 213 214 case mqtt.IsConnectionRefused(err): 215 log.Println("mqttRead connection refused:", err) 216 time.Sleep(15 * time.Minute) 217 218 default: 219 log.Println("mqttRead:", err) 220 time.Sleep(2 * time.Second) 221 } 222 } 223 } 224 225 func reload(oldL *lua.LState, main_script string) *lua.LState { 226 log.Println("Reloading", main_script) 227 agent := stateAgent(oldL) 228 reloader, isReloader := agent.(MqttReloadingAgent) 229 230 newL := lua.NewState() 231 232 if isReloader { 233 reloader.ReloadBegin(oldL, newL) 234 } else { 235 agent.Setup(newL) 236 } 237 238 registerMqttClientType(newL) 239 registerTimerType(newL) 240 241 stateReloadBegin(oldL, newL) 242 243 if err := newL.DoFile(main_script); err != nil { 244 log.Println("Reload failed:", err) 245 stateReloadAbort(oldL, newL) 246 if isReloader { 247 reloader.ReloadAbort(oldL, newL) 248 } else { 249 agent.Teardown(newL) 250 } 251 newL.Close() 252 return oldL 253 } else { 254 stateReloadEnd(oldL, newL) 255 if isReloader { 256 reloader.ReloadEnd(oldL, newL) 257 } else { 258 agent.Teardown(oldL) 259 } 260 oldL.Close() 261 log.Println("Reload successful") 262 return newL 263 } 264 } 265 266 func dup(src []byte) []byte { 267 res := make([]byte, len(src)) 268 copy(res, src) 269 return res 270 } 271 272 func newUserData(L *lua.LState, v interface{}) *lua.LUserData { 273 res := L.NewUserData() 274 res.Value = v 275 return res 276 } 277 278 /********** State Object in the Lua Interpreter **********/ 279 280 const luaStateName = "_mqttagent" 281 const keyChanToLua = 1 282 const keyAgent = 2 283 const keyClientPrefix = 3 284 const keyClientNextId = 4 285 const keyCfgMap = 5 286 const keyCnxTable = 6 287 const keyTimerTable = 7 288 const keyReloadRequest = 8 289 const keyOldCfgMap = 9 290 291 func registerState(L *lua.LState, agent MqttAgent, clientPrefix string, toLua chan<- mqttMessage) { 292 st := L.NewTable() 293 L.RawSetInt(st, keyChanToLua, newUserData(L, toLua)) 294 L.RawSetInt(st, keyAgent, newUserData(L, agent)) 295 L.RawSetInt(st, keyClientPrefix, lua.LString(clientPrefix)) 296 L.RawSetInt(st, keyClientNextId, lua.LNumber(1)) 297 L.RawSetInt(st, keyCfgMap, newUserData(L, make(mqttConfigMap))) 298 L.RawSetInt(st, keyCnxTable, L.NewTable()) 299 L.RawSetInt(st, keyTimerTable, L.NewTable()) 300 L.SetGlobal(luaStateName, st) 301 L.SetGlobal("reload", L.NewFunction(requestReload)) 302 } 303 304 func stateReloadBegin(oldL, newL *lua.LState) { 305 oldSt := oldL.GetGlobal(luaStateName).(*lua.LTable) 306 toLua := oldL.RawGetInt(oldSt, keyChanToLua).(*lua.LUserData).Value.(chan<- mqttMessage) 307 agent := oldL.RawGetInt(oldSt, keyAgent).(*lua.LUserData).Value.(MqttAgent) 308 clientPrefix := oldL.RawGetInt(oldSt, keyClientPrefix) 309 nextId := oldL.RawGetInt(oldSt, keyClientNextId) 310 cfgMap := oldL.RawGetInt(oldSt, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 311 312 st := newL.NewTable() 313 newL.RawSetInt(st, keyChanToLua, newUserData(newL, toLua)) 314 newL.RawSetInt(st, keyAgent, newUserData(newL, agent)) 315 newL.RawSetInt(st, keyClientPrefix, clientPrefix) 316 newL.RawSetInt(st, keyClientNextId, nextId) 317 newL.RawSetInt(st, keyCfgMap, newUserData(newL, make(mqttConfigMap))) 318 newL.RawSetInt(st, keyCnxTable, newL.NewTable()) 319 newL.RawSetInt(st, keyTimerTable, newL.NewTable()) 320 newL.RawSetInt(st, keyOldCfgMap, newUserData(newL, cfgMap)) 321 newL.SetGlobal(luaStateName, st) 322 newL.SetGlobal("reload", newL.NewFunction(requestReload)) 323 } 324 325 func stateReloadAbort(oldL, newL *lua.LState) { 326 statePartialCleanup(newL, oldL) 327 } 328 329 func stateReloadEnd(oldL, newL *lua.LState) { 330 statePartialCleanup(oldL, newL) 331 newSt := newL.GetGlobal(luaStateName).(*lua.LTable) 332 newL.RawSetInt(newSt, keyOldCfgMap, lua.LNil) 333 } 334 335 func statePartialCleanup(staleL, keptL *lua.LState) { 336 staleCnxTable := stateCnxTable(staleL) 337 keptCnxTable := stateCnxTable(keptL) 338 339 staleL.ForEach(staleCnxTable, func(key, value lua.LValue) { 340 clientId := int(key.(lua.LNumber)) 341 if keptL.RawGetInt(keptCnxTable, clientId) == lua.LNil { 342 client := staleL.RawGetInt(value.(*lua.LTable), keyClient).(*lua.LUserData).Value.(*mqtt.Client) 343 client.Close() 344 } 345 }) 346 347 keptL.ForEach(keptCnxTable, func(key, value lua.LValue) { 348 clientId := int(key.(lua.LNumber)) 349 keptCnx := value.(*lua.LTable) 350 if lua.LVIsFalse(keptL.RawGetInt(keptCnx, keyReused)) { 351 return 352 } 353 354 client := keptL.RawGetInt(keptCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 355 staleCnx := staleL.RawGetInt(staleCnxTable, clientId).(*lua.LTable) 356 357 if client != staleL.RawGetInt(staleCnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) { 358 panic("This shouldn't happen") 359 } 360 361 keptSub := keptL.RawGetInt(keptCnx, keySubTable).(*lua.LTable) 362 staleSub := staleL.RawGetInt(staleCnx, keySubTable).(*lua.LTable) 363 364 staleL.ForEach(staleSub, func(key, _ lua.LValue) { 365 topic := string(key.(lua.LString)) 366 if keptL.GetField(keptSub, topic) == lua.LNil { 367 log.Println("Unsubscribing from stale topic", topic) 368 if err := client.Unsubscribe(nil, topic); err != nil { 369 log.Println("Failed to unsubscribe:", err) 370 } 371 } 372 }) 373 374 keptL.ForEach(keptSub, func(key, _ lua.LValue) { 375 topic := string(key.(lua.LString)) 376 if staleL.GetField(staleSub, topic) == lua.LNil { 377 log.Println("Subscribing to new topic", topic) 378 if err := client.Subscribe(nil, topic); err != nil { 379 log.Println("Failed to subscribe:", err) 380 } 381 } 382 }) 383 384 keptL.RawSetInt(keptCnx, keyReused, lua.LNil) 385 }) 386 } 387 388 func stateValue(L *lua.LState, key int) lua.LValue { 389 st := L.GetGlobal(luaStateName) 390 return L.RawGetInt(st.(*lua.LTable), key) 391 } 392 393 func stateChanToLua(L *lua.LState) chan<- mqttMessage { 394 ud := stateValue(L, keyChanToLua) 395 return ud.(*lua.LUserData).Value.(chan<- mqttMessage) 396 } 397 398 func stateAgent(L *lua.LState) MqttAgent { 399 ud := stateValue(L, keyAgent) 400 return ud.(*lua.LUserData).Value.(MqttAgent) 401 } 402 403 func stateClientNextId(L *lua.LState) (int, string) { 404 st := L.GetGlobal(luaStateName).(*lua.LTable) 405 result := int(L.RawGetInt(st, keyClientNextId).(lua.LNumber)) 406 L.RawSetInt(st, keyClientNextId, lua.LNumber(result+1)) 407 prefix := lua.LVAsString(L.RawGetInt(st, keyClientPrefix)) 408 return result, fmt.Sprintf("%s-%d", prefix, result) 409 } 410 411 func stateCfgMap(L *lua.LState) mqttConfigMap { 412 return stateValue(L, keyCfgMap).(*lua.LUserData).Value.(mqttConfigMap) 413 } 414 415 func stateCnxTable(L *lua.LState) *lua.LTable { 416 return stateValue(L, keyCnxTable).(*lua.LTable) 417 } 418 419 func stateTimerTable(L *lua.LState) *lua.LTable { 420 return stateValue(L, keyTimerTable).(*lua.LTable) 421 } 422 423 func stateReloadRequested(L *lua.LState) bool { 424 return lua.LVAsBool(stateValue(L, keyReloadRequest)) 425 } 426 427 func stateRequestReload(L *lua.LState, v lua.LValue) { 428 st := L.GetGlobal(luaStateName).(*lua.LTable) 429 L.RawSetInt(st, keyReloadRequest, v) 430 } 431 432 func stateOldCfgMap(L *lua.LState) mqttConfigMap { 433 val := stateValue(L, keyOldCfgMap) 434 if val == lua.LNil { 435 return nil 436 } else { 437 return val.(*lua.LUserData).Value.(mqttConfigMap) 438 } 439 } 440 441 func requestReload(L *lua.LState) int { 442 stateRequestReload(L, lua.LTrue) 443 return 0 444 } 445 446 /********** Lua Object for MQTT client **********/ 447 448 const luaMqttClientTypeName = "mqttclient" 449 const keyClient = 1 450 const keySubTable = 2 451 const keyReused = 3 452 453 func registerMqttClientType(L *lua.LState) { 454 mt := L.NewTypeMetatable(luaMqttClientTypeName) 455 L.SetGlobal(luaMqttClientTypeName, mt) 456 L.SetField(mt, "new", L.NewFunction(newMqttClient)) 457 L.SetField(mt, "__call", L.NewFunction(luaPublish)) 458 L.SetField(mt, "__index", L.NewFunction(luaQuery)) 459 L.SetField(mt, "__newindex", L.NewFunction(luaSubscribe)) 460 } 461 462 type mqttConfig struct { 463 Connection string 464 TLS bool 465 PauseTimeout string 466 AtLeastOnceMax int 467 ExactlyOnceMax int 468 UserName string 469 Password string 470 Will struct { 471 Topic string 472 Message string 473 Retain bool 474 AtLeastOnce bool 475 ExactlyOnce bool 476 } 477 KeepAlive uint16 478 CleanSession bool 479 } 480 481 type mqttClientEntry struct { 482 client *mqtt.Client 483 id int 484 } 485 486 type mqttConfigMap map[mqttConfig]mqttClientEntry 487 488 func mqttConfigBytes(src string) []byte { 489 if src == "" { 490 return nil 491 } else { 492 return []byte(src) 493 } 494 } 495 496 func newClient(config *mqttConfig, id string) (*mqtt.Client, error) { 497 pto, err := time.ParseDuration(config.PauseTimeout) 498 if err != nil { 499 pto = time.Second 500 } 501 502 var dialer mqtt.Dialer 503 504 if config.TLS { 505 dialer = mqtt.NewTLSDialer("tcp", config.Connection, nil) 506 } else { 507 dialer = mqtt.NewDialer("tcp", config.Connection) 508 } 509 510 processed_cfg := mqtt.Config{ 511 Dialer: dialer, 512 PauseTimeout: pto, 513 AtLeastOnceMax: config.AtLeastOnceMax, 514 ExactlyOnceMax: config.ExactlyOnceMax, 515 UserName: config.UserName, 516 Password: mqttConfigBytes(config.Password), 517 Will: struct { 518 Topic string 519 Message []byte 520 Retain bool 521 AtLeastOnce bool 522 ExactlyOnce bool 523 }{ 524 Topic: config.Will.Topic, 525 Message: mqttConfigBytes(config.Will.Message), 526 Retain: config.Will.Retain, 527 AtLeastOnce: config.Will.AtLeastOnce, 528 ExactlyOnce: config.Will.ExactlyOnce, 529 }, 530 KeepAlive: config.KeepAlive, 531 CleanSession: config.CleanSession, 532 } 533 534 return mqtt.VolatileSession(id, &processed_cfg) 535 } 536 537 func registerClient(L *lua.LState, id int, client *mqtt.Client, reused bool) lua.LValue { 538 res := L.NewTable() 539 L.RawSetInt(res, keyClient, newUserData(L, client)) 540 L.RawSetInt(res, keySubTable, L.NewTable()) 541 if reused { 542 L.RawSetInt(res, keyReused, lua.LTrue) 543 } 544 L.SetMetatable(res, L.GetTypeMetatable(luaMqttClientTypeName)) 545 L.RawSetInt(stateCnxTable(L), id, res) 546 return res 547 } 548 549 func newMqttClient(L *lua.LState) int { 550 var config mqttConfig 551 if err := gluamapper.Map(L.CheckTable(1), &config); err != nil { 552 log.Println("newMqttClient:", err) 553 L.Push(lua.LNil) 554 L.Push(lua.LString(err.Error())) 555 return 2 556 } 557 558 cfgMap := stateCfgMap(L) 559 560 if cfg, found := cfgMap[config]; found { 561 res := L.RawGetInt(stateCnxTable(L), cfg.id) 562 tbl := res.(*lua.LTable) 563 if L.RawGetInt(tbl, keyClient).(*lua.LUserData).Value.(*mqtt.Client) != cfg.client { 564 panic("Inconsistent configuration table") 565 } 566 567 L.Push(res) 568 return 1 569 } 570 571 if oldCfgMap := stateOldCfgMap(L); oldCfgMap != nil { 572 if cfg, found := oldCfgMap[config]; found { 573 cfgMap[config] = cfg 574 L.Push(registerClient(L, cfg.id, cfg.client, true)) 575 return 1 576 } 577 } 578 579 id, idString := stateClientNextId(L) 580 client, err := newClient(&config, idString) 581 if err != nil { 582 log.Println("newMqttClient:", err) 583 L.Push(lua.LNil) 584 L.Push(lua.LString(err.Error())) 585 return 2 586 } 587 go mqttRead(client, stateChanToLua(L), id) 588 589 cfgMap[config] = mqttClientEntry{id: id, client: client} 590 591 L.Push(registerClient(L, id, client, false)) 592 return 1 593 } 594 595 func luaPublish(L *lua.LState) int { 596 cnx := L.CheckTable(1) 597 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 598 599 if L.GetTop() == 1 { 600 if err := client.Ping(nil); err != nil { 601 log.Println("luaPing:", err) 602 L.Push(lua.LNil) 603 L.Push(lua.LString(err.Error())) 604 return 2 605 } else { 606 L.Push(lua.LTrue) 607 return 1 608 } 609 } 610 611 message := L.CheckString(2) 612 topic := L.CheckString(3) 613 614 if err := client.Publish(nil, []byte(message), topic); err != nil { 615 L.Push(lua.LNil) 616 L.Push(lua.LString(err.Error())) 617 return 2 618 } else { 619 L.Push(lua.LTrue) 620 return 1 621 } 622 } 623 624 func luaQuery(L *lua.LState) int { 625 cnx := L.CheckTable(1) 626 topic := L.CheckString(2) 627 subTbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 628 L.Push(L.GetField(subTbl, topic)) 629 return 1 630 } 631 632 func luaSubscribe(L *lua.LState) int { 633 var err error 634 cnx := L.CheckTable(1) 635 topic := L.CheckString(2) 636 callback := L.OptFunction(3, nil) 637 client := L.RawGetInt(cnx, keyClient).(*lua.LUserData).Value.(*mqtt.Client) 638 tbl := L.RawGetInt(cnx, keySubTable).(*lua.LTable) 639 640 _, is_new := L.GetField(tbl, topic).(*lua.LNilType) 641 642 if lua.LVIsFalse(L.RawGetInt(cnx, keyReused)) { 643 if callback == nil { 644 err = client.Unsubscribe(nil, topic) 645 } else if is_new { 646 err = client.Subscribe(nil, topic) 647 } 648 } 649 650 if err != nil { 651 log.Println("luaSubscribe:", err) 652 L.Push(lua.LNil) 653 L.Push(lua.LString(err.Error())) 654 return 2 655 } else { 656 if callback == nil { 657 if is_new { 658 log.Printf("Not subscribed to %q", topic) 659 } else { 660 log.Printf("Unsubscribed from %q", topic) 661 } 662 L.SetField(tbl, topic, lua.LNil) 663 } else { 664 if is_new { 665 log.Printf("Subscribed to %q", topic) 666 } else { 667 log.Printf("Updating subscription to %q", topic) 668 } 669 L.SetField(tbl, topic, callback) 670 } 671 672 L.Push(lua.LTrue) 673 return 1 674 } 675 } 676 677 /********** Lua Object for timers **********/ 678 679 const luaTimerTypeName = "timer" 680 681 func registerTimerType(L *lua.LState) { 682 mt := L.NewTypeMetatable(luaTimerTypeName) 683 L.SetGlobal(luaTimerTypeName, mt) 684 L.SetField(mt, "new", L.NewFunction(newTimer)) 685 L.SetField(mt, "schedule", L.NewFunction(timerSchedule)) 686 L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), timerMethods)) 687 } 688 689 func newTimer(L *lua.LState) int { 690 atTime := L.Get(1) 691 cb := L.CheckFunction(2) 692 L.Pop(2) 693 L.SetMetatable(cb, L.GetTypeMetatable(luaTimerTypeName)) 694 L.Push(cb) 695 L.Push(atTime) 696 return timerSchedule(L) 697 } 698 699 var timerMethods = map[string]lua.LGFunction{ 700 "cancel": timerCancel, 701 "schedule": timerSchedule, 702 } 703 704 func timerCancel(L *lua.LState) int { 705 timer := L.CheckFunction(1) 706 L.RawSet(stateTimerTable(L), timer, lua.LNil) 707 return 0 708 } 709 710 func timerSchedule(L *lua.LState) int { 711 timer := L.CheckFunction(1) 712 atTime := lua.LNil 713 if L.Get(2) != lua.LNil { 714 atTime = L.CheckNumber(2) 715 } 716 717 L.RawSet(stateTimerTable(L), timer, atTime) 718 return 0 719 } 720 721 func toTime(lsec lua.LNumber) time.Time { 722 fsec := float64(lsec) 723 sec := int64(fsec) 724 nsec := int64((fsec - float64(sec)) * 1.0e9) 725 726 return time.Unix(sec, nsec) 727 } 728 729 func runTimers(L *lua.LState, parentTimer *time.Timer) { 730 hasNext := false 731 var nextTime time.Time 732 733 now := time.Now() 734 timers := stateTimerTable(L) 735 736 timer, luaT := timers.Next(lua.LNil) 737 for timer != lua.LNil { 738 t := toTime(luaT.(lua.LNumber)) 739 if t.Compare(now) <= 0 { 740 L.RawSet(timers, timer, lua.LNil) 741 err := L.CallByParam(lua.P{Fn: timer, NRet: 0, Protect: true}, timer, luaT) 742 if err != nil { 743 panic(err) 744 } 745 timer = lua.LNil 746 hasNext = false 747 } else if !hasNext || t.Compare(nextTime) < 0 { 748 hasNext = true 749 nextTime = t 750 } 751 752 timer, luaT = timers.Next(timer) 753 } 754 755 if hasNext { 756 parentTimer.Reset(time.Until(nextTime)) 757 } else { 758 parentTimer.Stop() 759 } 760 }