main.go (24657B)
1 /* 2 * Copyright (c) 2025, Natacha Porté 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17 package main 18 19 import ( 20 "database/sql" 21 "embed" 22 "errors" 23 "fmt" 24 "log" 25 "os" 26 "regexp" 27 "runtime/debug" 28 "strconv" 29 "strings" 30 "sync/atomic" 31 "time" 32 33 "github.com/dustin/go-humanize" 34 _ "github.com/glebarez/go-sqlite" 35 "github.com/nats-io/nats.go" 36 "github.com/pelletier/go-toml/v2" 37 "github.com/thoj/go-ircevent" 38 ) 39 40 type command struct { 41 name string 42 arg string 43 } 44 45 func main() { 46 setVersion() 47 48 config_file := "natsim.toml" 49 if len(os.Args) > 1 { 50 config_file = os.Args[1] 51 } 52 53 im, err := NewNatsIM(config_file) 54 if err != nil { 55 log.Fatal(err) 56 } 57 58 log.Println("natsim " + version + " started") 59 60 im.irc.Loop() 61 } 62 63 /**************** Construction ****************/ 64 65 type LineMark struct { 66 Start string 67 Mid string 68 End string 69 } 70 71 type IrcConfig struct { 72 Channel string 73 Server string 74 Nick string 75 Cmd LineMark 76 Send LineMark 77 Show LineMark 78 ShowReply *LineMark 79 ShowHeader *LineMark 80 MaxLine int 81 ContSuffix string 82 ContPrefix string 83 AntiFlood antiflood 84 Filter []FilterElement 85 } 86 87 type LogConfig struct { 88 SqlDriver string 89 SqlConnection string 90 Filter []FilterElement 91 } 92 93 type NatsConfig struct { 94 Name string 95 Server string 96 NkeySeed string 97 Subjects []string 98 Filter []FilterElement 99 } 100 101 type NatsIM struct { 102 Irc IrcConfig 103 Log LogConfig 104 Nats NatsConfig 105 106 irc *irc.Connection 107 nc *nats.Conn 108 subs []*nats.Subscription 109 curMsg nats.Msg 110 db *sql.DB 111 ensureSubject *sql.Stmt 112 insertReceived *sql.Stmt 113 insertRHeader *sql.Stmt 114 insertSent *sql.Stmt 115 insertSHeader *sql.Stmt 116 cmdQueue chan command 117 ircQueue chan string 118 dropped atomic.Uint32 119 } 120 121 func NewNatsIM(configPath string) (*NatsIM, error) { 122 natsim := &NatsIM{ 123 Irc: IrcConfig{ 124 Nick: "natsim", 125 Cmd: LineMark{ 126 Start: "!", 127 Mid: " ", 128 }, 129 Send: LineMark{Mid: ": "}, 130 Show: LineMark{Mid: ": "}, 131 }, 132 Nats: NatsConfig{ 133 Name: "nastim", 134 Subjects: []string{">"}, 135 }, 136 } 137 138 f, err := os.Open(configPath) 139 if err != nil { 140 return nil, err 141 } 142 defer func() { 143 if err := f.Close(); err != nil { 144 log.Println("Closing configuration", configPath, err) 145 } 146 }() 147 148 d := toml.NewDecoder(f).DisallowUnknownFields() 149 err = d.Decode(natsim) 150 if err != nil { 151 var details *toml.StrictMissingError 152 if errors.As(err, &details) { 153 for _, missing := range details.Errors { 154 log.Println(missing) 155 } 156 } 157 return nil, err 158 } 159 160 if natsim.Irc.MaxLine > 0 && len(natsim.Irc.ContPrefix)+len(natsim.Irc.ContSuffix) >= natsim.Irc.MaxLine { 161 natsim.Irc.ContPrefix = "" 162 natsim.Irc.ContSuffix = "" 163 } 164 165 if natsim.Log.SqlDriver != "" { 166 if err := natsim.logInit(); err != nil { 167 natsim.Close() 168 return nil, err 169 } 170 } 171 172 natsim.cmdQueue = make(chan command, 10) 173 natsim.ircQueue = make(chan string, 10) 174 175 natsim.irc = irc.IRC(natsim.Irc.Nick, "natsim") 176 natsim.irc.AddCallback("001", natsim.ircJoin) 177 natsim.irc.AddCallback("366", natsim.ircJoined) 178 natsim.irc.AddCallback("PRIVMSG", natsim.ircReceive) 179 180 err = natsim.irc.Connect(natsim.Irc.Server) 181 if err != nil { 182 natsim.Close() 183 return nil, err 184 } 185 186 go natsim.doCommands() 187 go natsim.ircSender() 188 189 return natsim, nil 190 } 191 192 func (natsim *NatsIM) Close() { 193 if natsim.irc != nil { 194 natsim.irc.Quit() 195 natsim.irc = nil 196 } 197 198 if natsim.nc != nil { 199 natsim.nc.Close() 200 natsim.nc = nil 201 } 202 203 if natsim.ensureSubject != nil { 204 if err := natsim.ensureSubject.Close(); err != nil { 205 log.Println("Close ensureSubject:", err) 206 } 207 natsim.ensureSubject = nil 208 } 209 210 if natsim.insertReceived != nil { 211 if err := natsim.insertReceived.Close(); err != nil { 212 log.Println("Close insertReceived:", err) 213 } 214 natsim.insertReceived = nil 215 } 216 217 if natsim.insertRHeader != nil { 218 if err := natsim.insertRHeader.Close(); err != nil { 219 log.Println("Close insertRHeader:", err) 220 } 221 natsim.insertRHeader = nil 222 } 223 224 if natsim.insertSent != nil { 225 if err := natsim.insertSent.Close(); err != nil { 226 log.Println("Close insertSent:", err) 227 } 228 natsim.insertSent = nil 229 } 230 231 if natsim.insertSHeader != nil { 232 if err := natsim.insertSHeader.Close(); err != nil { 233 log.Println("Close insertSHeader:", err) 234 } 235 natsim.insertSHeader = nil 236 } 237 238 if natsim.db != nil { 239 if err := natsim.db.Close(); err != nil { 240 log.Println("Close log DB:", err) 241 } 242 natsim.db = nil 243 } 244 245 close(natsim.cmdQueue) 246 close(natsim.ircQueue) 247 } 248 249 /**************** Command Goroutine ****************/ 250 251 func (natsim *NatsIM) doCommands() { 252 for { 253 cmd, ok := <-natsim.cmdQueue 254 if !ok { 255 return 256 } 257 258 switch cmd.name { 259 case "curmsg": 260 var sb strings.Builder 261 sb.WriteString("[WIP]") 262 263 if natsim.curMsg.Reply != "" { 264 show := LineMark{Start: "Reply-To:"} 265 if natsim.Irc.ShowReply != nil { 266 show = *natsim.Irc.ShowReply 267 } 268 sb.WriteString(show.Start) 269 sb.WriteString(natsim.curMsg.Reply) 270 sb.WriteString(show.End) 271 } 272 273 show := LineMark{Mid: ": "} 274 if natsim.Irc.ShowHeader != nil { 275 show = *natsim.Irc.ShowHeader 276 } 277 for key, values := range natsim.curMsg.Header { 278 for _, value := range values { 279 sb.WriteString(packMark(show, key, value)) 280 } 281 } 282 283 natsim.ircSend(sb.String()) 284 285 case "delheader": 286 index := -1 287 key := cmd.arg 288 289 if strings.HasPrefix(cmd.arg, "* ") { 290 key = key[2:] 291 } else if before, after, found := strings.Cut(cmd.arg, " "); found { 292 if n, err := strconv.Atoi(before); err == nil { 293 index = n 294 key = after 295 } 296 } 297 298 if natsim.curMsg.Header == nil || natsim.curMsg.Header[key] == nil { 299 natsim.ircSendf("No recorded header %q", key) 300 } else if index < 0 { 301 delete(natsim.curMsg.Header, key) 302 } else if index < len(natsim.curMsg.Header[key]) { 303 natsim.curMsg.Header[key] = append(natsim.curMsg.Header[key][:index], natsim.curMsg.Header[key][index+1:]...) 304 } else { 305 natsim.ircSendf("No index %d in header %q", index, key) 306 } 307 308 case "filter": 309 var plist *[]FilterElement 310 var name string 311 index := -1 312 place, eltstr, _ := strings.Cut(cmd.arg, " ") 313 uplace := strings.ToUpper(place) 314 315 if uplace == "NATS" { 316 plist = &natsim.Nats.Filter 317 name = "N" 318 } else if uplace == "LOG" { 319 plist = &natsim.Log.Filter 320 name = "L" 321 } else if uplace == "IRC" { 322 plist = &natsim.Irc.Filter 323 name = "I" 324 } else if n, err := strconv.Atoi(uplace[1:]); err == nil && (uplace[0:1] == "N" || uplace[0:1] == "L" || uplace[0:1] == "I") { 325 index = n - 1 326 name = uplace[0:1] 327 switch name { 328 case "N": 329 plist = &natsim.Nats.Filter 330 case "L": 331 plist = &natsim.Log.Filter 332 case "I": 333 plist = &natsim.Irc.Filter 334 } 335 } else { 336 natsim.ircSendf("Unable to parse place %q", uplace) 337 } 338 339 var elt FilterElement 340 if plist != nil { 341 err := elt.UnmarshalText([]byte(eltstr)) 342 if err != nil { 343 natsim.ircSendError("Parse FilterElement", err) 344 plist = nil 345 } 346 } 347 348 if plist != nil { 349 if index < 0 { 350 index += len(*plist) + 1 351 } 352 353 if index >= 0 && index < len(*plist) { 354 *plist = append((*plist)[:index+1], (*plist)[index:]...) 355 (*plist)[index] = elt 356 } else { 357 index = len(*plist) 358 *plist = append(*plist, elt) 359 } 360 natsim.ircSendf("Inserted filter element %s%d/%d", name, index+1, len(*plist)) 361 } 362 363 case "filters": 364 var buf strings.Builder 365 buf.WriteString(fmt.Sprintf("Active filters: %d NATS, %d log, %d IRC", len(natsim.Nats.Filter), len(natsim.Log.Filter), len(natsim.Irc.Filter))) 366 WriteFilter(&buf, "\n N", natsim.Nats.Filter) 367 WriteFilter(&buf, "\n L", natsim.Log.Filter) 368 WriteFilter(&buf, "\n I", natsim.Irc.Filter) 369 natsim.ircSend(buf.String()) 370 371 case "header": 372 sep := ": " 373 if natsim.Irc.ShowHeader != nil { 374 sep = natsim.Irc.ShowHeader.Mid 375 } 376 if key, value, found := strings.Cut(cmd.arg, sep); !found { 377 natsim.ircSendf("No header separator %q", sep) 378 } else { 379 if natsim.curMsg.Header == nil { 380 natsim.curMsg.Header = make(nats.Header) 381 } 382 natsim.curMsg.Header[key] = append(natsim.curMsg.Header[key], value) 383 } 384 385 case "reply-to": 386 fallthrough 387 case "replyto": 388 natsim.curMsg.Reply = cmd.arg 389 390 case "status": 391 var buf strings.Builder 392 393 if err := natsim.nc.LastError(); err != nil { 394 buf.WriteString("Last error: ") 395 buf.WriteString(err.Error()) 396 buf.WriteString("\n") 397 } 398 399 buf.WriteString(natsim.nc.Status().String()) 400 401 if url := natsim.nc.ConnectedUrlRedacted(); url != "" { 402 buf.WriteString(" to ") 403 buf.WriteString(url) 404 } 405 406 if rtt, err := natsim.nc.RTT(); err == nil { 407 buf.WriteString(", RTT ") 408 buf.WriteString(rtt.String()) 409 } 410 411 buf.WriteString(fmt.Sprintf(", %d subscriptions\n%s", natsim.nc.NumSubscriptions(), natsim.natsStats())) 412 natsim.ircSend(buf.String()) 413 414 case "subscribe": 415 if s, err := natsim.nc.Subscribe(cmd.arg, natsim.natsReceive); err != nil { 416 natsim.ircSendError("Subscribe", err) 417 } else { 418 natsim.subs = append(natsim.subs, s) 419 natsim.ircSendf("Subscribed to %q", s.Subject) 420 } 421 422 case "subscriptions": 423 var buf strings.Builder 424 buf.WriteString(fmt.Sprintf("Current subscriptions (%d):", len(natsim.subs))) 425 for i, s := range natsim.subs { 426 buf.WriteString(fmt.Sprintf("\n%d. %s", i+1, s.Subject)) 427 } 428 natsim.ircSend(buf.String()) 429 430 case "quit": 431 log.Println("Quit command", cmd.arg) 432 natsim.irc.QuitMessage = cmd.arg 433 natsim.Close() 434 435 case "unfilter": 436 uplace := strings.ToUpper(cmd.arg) 437 438 if uplace == "NATS" { 439 n := len(natsim.Nats.Filter) 440 natsim.Nats.Filter = []FilterElement{} 441 natsim.ircSendf("Removed %d NATS filter elements", n) 442 } else if uplace == "LOG" { 443 n := len(natsim.Log.Filter) 444 natsim.Log.Filter = []FilterElement{} 445 natsim.ircSendf("Removed %d log filter elements", n) 446 } else if uplace == "IRC" { 447 n := len(natsim.Irc.Filter) 448 natsim.Irc.Filter = []FilterElement{} 449 natsim.ircSendf("Removed %d IRC filter elements", n) 450 } else if n, err := strconv.Atoi(uplace[1:]); err == nil && (uplace[0:1] == "N" || uplace[0:1] == "L" || uplace[0:1] == "I") { 451 var plist *[]FilterElement 452 index := n - 1 453 name := uplace[0:1] 454 switch name { 455 case "N": 456 plist = &natsim.Nats.Filter 457 case "L": 458 plist = &natsim.Log.Filter 459 case "I": 460 plist = &natsim.Irc.Filter 461 } 462 if n < 0 { 463 index = len(*plist) + n 464 } 465 if index < 0 || index >= len(*plist) { 466 natsim.ircSendf("Bad filter index %d for %s%d", index, name, len(*plist)) 467 } else { 468 *plist = append((*plist)[:index], (*plist)[index+1:]...) 469 natsim.ircSendf("Removed filter %s%d/%d", name, index+1, len(*plist)+1) 470 } 471 } else { 472 natsim.ircSendf("Unable to parse place %q", uplace) 473 } 474 475 case "unsubscribe": 476 if n, err := strconv.Atoi(cmd.arg); err == nil && n > 0 && n <= len(natsim.subs) { 477 if err = natsim.subs[n-1].Unsubscribe(); err != nil { 478 natsim.ircSendError("Unsubscribe", err) 479 } else { 480 natsim.ircSendf("Unsubscribed from %q", natsim.subs[n-1].Subject) 481 natsim.subs = append(natsim.subs[:n-1], natsim.subs[n:]...) 482 } 483 } else { 484 n := 0 485 for i, s := range natsim.subs { 486 if s.Subject != cmd.arg { 487 natsim.subs[n] = natsim.subs[i] 488 n++ 489 } else if err = s.Unsubscribe(); err != nil { 490 natsim.ircSendError("Unsubscribe", err) 491 } 492 } 493 natsim.ircSendf("Unsubscribed from %d subjects", len(natsim.subs)-n) 494 natsim.subs = natsim.subs[:n] 495 } 496 497 case "version": 498 natsim.ircSendf("natsim %s", version) 499 500 default: 501 natsim.ircSendf("Unknown command %q", cmd.name) 502 } 503 } 504 } 505 506 /**************** IRC Callbacks ****************/ 507 508 func (natsim *NatsIM) ircJoin(e *irc.Event) { 509 natsim.irc.Join(natsim.Irc.Channel) 510 } 511 512 func (natsim *NatsIM) ircJoined(e *irc.Event) { 513 optSeed, err := nats.NkeyOptionFromSeed(natsim.Nats.NkeySeed) 514 if err != nil { 515 natsim.ircSendError("NkeyOptionFromSeed", err) 516 return 517 } 518 519 natsim.nc, err = nats.Connect(natsim.Nats.Server, 520 optSeed, 521 nats.Name(natsim.Nats.Name), 522 nats.ConnectHandler(natsim.natsConnected), 523 nats.DisconnectErrHandler(natsim.natsDisconnected), 524 nats.ReconnectHandler(natsim.natsReconnected), 525 nats.ReconnectErrHandler(natsim.natsReconnectErr)) 526 if err != nil { 527 natsim.ircSendError("Connect", err) 528 return 529 } 530 531 for _, subject := range natsim.Nats.Subjects { 532 if s, err := natsim.nc.Subscribe(subject, natsim.natsReceive); err != nil { 533 natsim.ircSendError("Subscribe", err) 534 } else { 535 natsim.subs = append(natsim.subs, s) 536 } 537 } 538 } 539 540 func (natsim *NatsIM) ircReceive(e *irc.Event) { 541 msg := e.Message() 542 if name, arg, found := unpackMark(natsim.Irc.Cmd, msg, true); found { 543 natsim.cmdQueue <- command{name: name, arg: arg} 544 } else if subject, data, found := unpackMark(natsim.Irc.Send, msg, false); found { 545 natsim.curMsg.Subject = subject 546 natsim.curMsg.Data = []byte(data) 547 if err := natsim.nc.PublishMsg(&natsim.curMsg); err != nil { 548 natsim.ircSendError("Publish", err) 549 } else { 550 natsim.logSent(&natsim.curMsg) 551 } 552 natsim.curMsg = nats.Msg{} 553 } 554 } 555 556 func (natsim *NatsIM) ircSendError(context string, err error) { 557 prefix := "[E] " 558 if context != "" { 559 prefix += context + ": " 560 } 561 natsim.ircSend(prefix + err.Error()) 562 } 563 564 func (natsim *NatsIM) ircSend(s string) { 565 select { 566 case natsim.ircQueue <- s: 567 default: 568 natsim.dropped.Add(1) 569 } 570 } 571 572 func (natsim *NatsIM) ircSendf(format string, a ...interface{}) { 573 natsim.ircSend(fmt.Sprintf(format, a...)) 574 } 575 576 func (natsim *NatsIM) ircSender() { 577 var dropped uint32 578 var nindex = 0 579 floodend := make([]time.Time, natsim.Irc.AntiFlood.count) 580 delay := natsim.Irc.AntiFlood.delay / time.Duration(natsim.Irc.AntiFlood.count) 581 prev := time.Now() 582 583 for { 584 var lines []string 585 586 if len(natsim.ircQueue) == 0 { 587 dropped += natsim.dropped.Swap(0) 588 } 589 590 if dropped > 0 { 591 select { 592 case s, ok := <-natsim.ircQueue: 593 if ok { 594 lines = natsim.ircSplit(s) 595 } else { 596 return 597 } 598 case <-time.After(delay): 599 dropped += natsim.dropped.Swap(0) 600 lines = []string{fmt.Sprintf("Dropped %d messages", dropped)} 601 dropped = 0 602 } 603 } else { 604 s, ok := <-natsim.ircQueue 605 if ok { 606 lines = natsim.ircSplit(s) 607 } else { 608 return 609 } 610 } 611 612 for _, line := range lines { 613 if time.Until(floodend[nindex]) > 0 { 614 time.Sleep(time.Until(prev.Add(delay))) 615 } 616 617 natsim.irc.Privmsg(natsim.Irc.Channel, line) 618 619 prev = time.Now() 620 floodend[nindex] = prev.Add(natsim.Irc.AntiFlood.delay) 621 nindex = (nindex + 1) % natsim.Irc.AntiFlood.count 622 } 623 } 624 } 625 626 func (natsim *NatsIM) ircSplit(s string) []string { 627 var result []string 628 629 for _, line := range strings.Split(s, "\n") { 630 if natsim.Irc.MaxLine <= 0 || len(line) < natsim.Irc.MaxLine { 631 result = append(result, line) 632 } else { 633 for offset := 0; offset < len(line); { 634 var buf strings.Builder 635 l := len(line) - offset 636 if offset > 0 { 637 buf.WriteString(natsim.Irc.ContPrefix) 638 } 639 640 if buf.Len()+l <= natsim.Irc.MaxLine { 641 buf.WriteString(line[offset:]) 642 } else { 643 l = natsim.Irc.MaxLine - buf.Len() - len(natsim.Irc.ContSuffix) 644 buf.WriteString(line[offset : offset+l]) 645 buf.WriteString(natsim.Irc.ContSuffix) 646 } 647 648 result = append(result, buf.String()) 649 offset += l 650 } 651 } 652 } 653 654 return result 655 } 656 657 /**************** Nats Callbacks ****************/ 658 659 func (natsim *NatsIM) natsConnected(c *nats.Conn) { 660 natsim.ircSend("Connected to " + c.ConnectedUrlRedacted()) 661 } 662 663 func (natsim *NatsIM) natsDisconnected(c *nats.Conn, err error) { 664 if err != nil { 665 natsim.ircSendError("Disconnected", err) 666 } 667 } 668 669 func (natsim *NatsIM) natsReceive(m *nats.Msg) { 670 if !IsKept(m.Subject, m.Data, natsim.Nats.Filter, true) { 671 return 672 } 673 674 if IsKept(m.Subject, m.Data, natsim.Log.Filter, true) { 675 natsim.logReceived(m) 676 } 677 678 if !IsKept(m.Subject, m.Data, natsim.Irc.Filter, true) { 679 return 680 } 681 682 var sb strings.Builder 683 sb.WriteString(packMark(natsim.Irc.Show, m.Subject, string(m.Data))) 684 685 if m.Reply != "" && natsim.Irc.ShowReply != nil { 686 sb.WriteString(natsim.Irc.ShowReply.Start) 687 sb.WriteString(m.Reply) 688 sb.WriteString(natsim.Irc.ShowReply.End) 689 } 690 691 if natsim.Irc.ShowHeader != nil { 692 for key, values := range m.Header { 693 for _, value := range values { 694 sb.WriteString(packMark(*natsim.Irc.ShowHeader, key, value)) 695 } 696 } 697 } 698 699 natsim.ircSend(sb.String()) 700 } 701 702 func (natsim *NatsIM) natsReconnected(c *nats.Conn) { 703 natsim.ircSend("Reconnected to " + c.ConnectedUrlRedacted()) 704 } 705 706 func (natsim *NatsIM) natsReconnectErr(c *nats.Conn, err error) { 707 natsim.ircSendError("Reconnect", err) 708 } 709 710 func (natsim *NatsIM) natsStats() string { 711 stats := natsim.nc.Stats() 712 return fmt.Sprintf("%d reconnections, %s / %s msg in, %s / %s msg out", 713 stats.Reconnects, 714 humanize.IBytes(stats.InBytes), 715 humanizeNum(stats.InMsgs), 716 humanize.IBytes(stats.OutBytes), 717 humanizeNum(stats.OutMsgs)) 718 } 719 720 /**************** Log to Database ****************/ 721 722 //go:embed init.sql 723 var embeddedSQL embed.FS 724 725 func (natsim *NatsIM) logInit() error { 726 if natsim.Log.SqlDriver == "" { 727 return nil 728 } 729 730 var err error 731 732 natsim.db, err = sql.Open(natsim.Log.SqlDriver, natsim.Log.SqlConnection) 733 if err != nil { 734 log.Println("sql.Open:", err) 735 return err 736 } 737 738 var version int 739 if err = natsim.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil { 740 log.Println("query user_verison", err) 741 return err 742 } 743 744 switch version { 745 case 0: 746 initSQL, err := embeddedSQL.ReadFile("init.sql") 747 if err != nil { 748 log.Println("embedded.ReadFile:", err) 749 return err 750 } 751 752 if _, err = natsim.db.Exec(string(initSQL)); err != nil { 753 log.Println("Init log DB:", err) 754 return err 755 } 756 757 case 1: 758 759 default: 760 log.Println("Unsupported database version:", version) 761 return errors.New("unsupported database version") 762 } 763 764 natsim.ensureSubject, err = natsim.db.Prepare("INSERT INTO subjects(name) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM subjects WHERE name = ?);") 765 if err != nil { 766 log.Println("Prepare ensureSubject:", err) 767 return err 768 } 769 770 natsim.insertReceived, err = natsim.db.Prepare("INSERT INTO received(timestamp,subject_id,reply_subject_id,data) VALUES (?, (SELECT id FROM subjects WHERE name = ?), (SELECT id FROM subjects WHERE name = ?), ?);") 771 if err != nil { 772 log.Println("Prepare insertReceived:", err) 773 return err 774 } 775 776 natsim.insertRHeader, err = natsim.db.Prepare("INSERT INTO received_headers_view(msg_id,key,value) VALUES (?,?,?);") 777 if err != nil { 778 log.Println("Prepare insertRHeader:", err) 779 return err 780 } 781 782 natsim.insertSent, err = natsim.db.Prepare("INSERT INTO sent(timestamp,subject_id,reply_subject_id,data) VALUES (?, (SELECT id FROM subjects WHERE name = ?), (SELECT id FROM subjects WHERE name = ?), ?);") 783 if err != nil { 784 log.Println("Prepare insertSent:", err) 785 return err 786 } 787 788 natsim.insertSHeader, err = natsim.db.Prepare("INSERT INTO sent_headers_view(msg_id,key,value) VALUES (?,?,?);") 789 if err != nil { 790 log.Println("Prepare insertSHeader:", err) 791 return err 792 } 793 794 return nil 795 } 796 797 func (natsim *NatsIM) logMsg(msg *nats.Msg, insertMsg, insertHeader *sql.Stmt) { 798 if natsim.db == nil || natsim.insertReceived == nil { 799 return 800 } 801 802 if _, err := natsim.ensureSubject.Exec(msg.Subject, msg.Subject); err != nil { 803 natsim.ircSendError("ensureSubject.Exec", err) 804 return 805 } 806 807 var reply sql.NullString 808 if msg.Reply != "" { 809 if _, err := natsim.ensureSubject.Exec(msg.Reply, msg.Reply); err != nil { 810 natsim.ircSendError("ensureReply.Exec", err) 811 return 812 } 813 reply = sql.NullString{String: msg.Reply, Valid: true} 814 } 815 816 t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5 817 if r, err := insertMsg.Exec(t, msg.Subject, reply, msg.Data); err != nil { 818 natsim.ircSendError("insertMsg.Exec", err) 819 } else if id, err := r.LastInsertId(); err != nil { 820 natsim.ircSendError("LastInsertId", err) 821 } else if id <= 0 { 822 natsim.ircSendf("LastInsertId returned invalid id %d", id) 823 } else { 824 for key, values := range msg.Header { 825 for _, value := range values { 826 if _, err := insertHeader.Exec(id, key, value); err != nil { 827 natsim.ircSendf("insertHeader(%q, %q): %s", key, value, err) 828 } 829 } 830 } 831 } 832 } 833 834 func (natsim *NatsIM) logReceived(msg *nats.Msg) { 835 natsim.logMsg(msg, natsim.insertReceived, natsim.insertRHeader) 836 } 837 838 func (natsim *NatsIM) logSent(msg *nats.Msg) { 839 natsim.logMsg(msg, natsim.insertSent, natsim.insertSHeader) 840 } 841 842 /**************** Filters ****************/ 843 844 type FilterElement struct { 845 Result bool 846 Part FilterPart 847 Test *regexp.Regexp 848 } 849 850 type FilterPart int 851 852 const ( 853 FilterSubject FilterPart = iota 854 FilterData 855 ) 856 857 func (element *FilterElement) Match(subject string, data []byte) bool { 858 var b []byte 859 switch element.Part { 860 case FilterSubject: 861 b = []byte(subject) 862 case FilterData: 863 b = data 864 default: 865 panic("Unexpected part") 866 } 867 868 return element.Test.Match(b) 869 } 870 871 func (element *FilterElement) String() string { 872 r := "drop " 873 if element.Result { 874 r = "pass " 875 } 876 877 p := "" 878 switch element.Part { 879 case FilterSubject: 880 p = "subject " 881 case FilterData: 882 p = "data " 883 default: 884 panic("Unexpected part") 885 } 886 887 return r + p + element.Test.String() 888 } 889 890 func (element *FilterElement) UnmarshalText(text []byte) error { 891 s := string(text) 892 893 switch { 894 case strings.HasPrefix(s, "pass "): 895 element.Result = true 896 s = s[5:] 897 case strings.HasPrefix(s, "drop "): 898 element.Result = false 899 s = s[5:] 900 default: 901 return fmt.Errorf("malformed filter %q", s) 902 } 903 904 switch { 905 case strings.HasPrefix(s, "subject "): 906 element.Part = FilterSubject 907 s = s[8:] 908 case strings.HasPrefix(s, "data "): 909 element.Part = FilterData 910 s = s[5:] 911 default: 912 return fmt.Errorf("bad filter part %q", s) 913 } 914 915 re, err := regexp.Compile(s) 916 element.Test = re 917 return err 918 } 919 920 func WriteFilter(buf *strings.Builder, prefix string, filter []FilterElement) { 921 for i, element := range filter { 922 line := fmt.Sprintf("%s%d. %s", prefix, i+1, element.String()) 923 buf.WriteString(line) 924 } 925 } 926 927 func IsKept(subject string, data []byte, elements []FilterElement, base bool) bool { 928 for _, element := range elements { 929 if element.Match(subject, data) { 930 return element.Result 931 } 932 } 933 934 return base 935 } 936 937 /**************** Tools ****************/ 938 939 type antiflood struct { 940 count int 941 delay time.Duration 942 } 943 944 func (af *antiflood) UnmarshalText(text []byte) error { 945 if before, after, found := strings.Cut(string(text), "/"); found { 946 if n, err := strconv.Atoi(before); err != nil { 947 return err 948 } else { 949 af.count = n 950 } 951 952 if d, err := time.ParseDuration(after); err != nil { 953 return err 954 } else { 955 af.delay = d 956 } 957 958 } else if d, err := time.ParseDuration(string(text)); err != nil { 959 return err 960 } else { 961 af.count = 1 962 af.delay = d 963 } 964 965 return nil 966 } 967 968 func humanizeNum(n uint64) string { 969 num, unit, found := strings.Cut(humanize.Bytes(n), " ") 970 if !found || unit == "" || unit[len(unit)-1:] != "B" { 971 panic("Unexpected huamized result") 972 } 973 return num + unit[:len(unit)-1] 974 } 975 976 func packMark(mark LineMark, name, arg string) string { 977 return mark.Start + name + mark.Mid + arg + mark.End 978 } 979 980 func unpackMark(mark LineMark, line string, optional bool) (string, string, bool) { 981 if strings.HasPrefix(line, mark.Start) && strings.HasSuffix(line, mark.End) { 982 inside := line[len(mark.Start) : len(line)-len(mark.End)] 983 if mark.Mid == "" { 984 return inside, "", true 985 } else if name, arg, found := strings.Cut(inside, mark.Mid); found { 986 return name, arg, true 987 } else { 988 return inside, "", optional 989 } 990 } else { 991 return "", "", false 992 } 993 } 994 995 var version = "(unknown)" 996 997 func setVersion() { 998 info, ok := debug.ReadBuildInfo() 999 if !ok { 1000 return 1001 } 1002 1003 version = info.Main.Version 1004 1005 if version == "(devel)" { 1006 vcs := "" 1007 rev := "" 1008 dirty := "" 1009 for _, setting := range info.Settings { 1010 switch setting.Key { 1011 case "vcs": 1012 vcs = setting.Value + "-" 1013 case "vcs.revision": 1014 rev = setting.Value[0:8] 1015 case "vcs.modified": 1016 if setting.Value == "true" { 1017 dirty = "*" 1018 } 1019 } 1020 } 1021 1022 if rev != "" { 1023 version = vcs + rev + dirty 1024 } 1025 } 1026 }