natsim

NATS ↔ Instant Messaging Bridge
git clone https://git.instinctive.eu/natsim.git
Log | Files | Refs | README | LICENSE

main.go (24657B)


      1 /*
      2  * Copyright (c) 2025, Natacha Porté
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 package main
     18 
     19 import (
     20 	"database/sql"
     21 	"embed"
     22 	"errors"
     23 	"fmt"
     24 	"log"
     25 	"os"
     26 	"regexp"
     27 	"runtime/debug"
     28 	"strconv"
     29 	"strings"
     30 	"sync/atomic"
     31 	"time"
     32 
     33 	"github.com/dustin/go-humanize"
     34 	_ "github.com/glebarez/go-sqlite"
     35 	"github.com/nats-io/nats.go"
     36 	"github.com/pelletier/go-toml/v2"
     37 	"github.com/thoj/go-ircevent"
     38 )
     39 
     40 type command struct {
     41 	name string
     42 	arg  string
     43 }
     44 
     45 func main() {
     46 	setVersion()
     47 
     48 	config_file := "natsim.toml"
     49 	if len(os.Args) > 1 {
     50 		config_file = os.Args[1]
     51 	}
     52 
     53 	im, err := NewNatsIM(config_file)
     54 	if err != nil {
     55 		log.Fatal(err)
     56 	}
     57 
     58 	log.Println("natsim " + version + " started")
     59 
     60 	im.irc.Loop()
     61 }
     62 
     63 /**************** Construction ****************/
     64 
     65 type LineMark struct {
     66 	Start string
     67 	Mid   string
     68 	End   string
     69 }
     70 
     71 type IrcConfig struct {
     72 	Channel    string
     73 	Server     string
     74 	Nick       string
     75 	Cmd        LineMark
     76 	Send       LineMark
     77 	Show       LineMark
     78 	ShowReply  *LineMark
     79 	ShowHeader *LineMark
     80 	MaxLine    int
     81 	ContSuffix string
     82 	ContPrefix string
     83 	AntiFlood  antiflood
     84 	Filter     []FilterElement
     85 }
     86 
     87 type LogConfig struct {
     88 	SqlDriver     string
     89 	SqlConnection string
     90 	Filter        []FilterElement
     91 }
     92 
     93 type NatsConfig struct {
     94 	Name     string
     95 	Server   string
     96 	NkeySeed string
     97 	Subjects []string
     98 	Filter   []FilterElement
     99 }
    100 
    101 type NatsIM struct {
    102 	Irc  IrcConfig
    103 	Log  LogConfig
    104 	Nats NatsConfig
    105 
    106 	irc            *irc.Connection
    107 	nc             *nats.Conn
    108 	subs           []*nats.Subscription
    109 	curMsg         nats.Msg
    110 	db             *sql.DB
    111 	ensureSubject  *sql.Stmt
    112 	insertReceived *sql.Stmt
    113 	insertRHeader  *sql.Stmt
    114 	insertSent     *sql.Stmt
    115 	insertSHeader  *sql.Stmt
    116 	cmdQueue       chan command
    117 	ircQueue       chan string
    118 	dropped        atomic.Uint32
    119 }
    120 
    121 func NewNatsIM(configPath string) (*NatsIM, error) {
    122 	natsim := &NatsIM{
    123 		Irc: IrcConfig{
    124 			Nick: "natsim",
    125 			Cmd: LineMark{
    126 				Start: "!",
    127 				Mid:   " ",
    128 			},
    129 			Send: LineMark{Mid: ": "},
    130 			Show: LineMark{Mid: ": "},
    131 		},
    132 		Nats: NatsConfig{
    133 			Name:     "nastim",
    134 			Subjects: []string{">"},
    135 		},
    136 	}
    137 
    138 	f, err := os.Open(configPath)
    139 	if err != nil {
    140 		return nil, err
    141 	}
    142 	defer func() {
    143 		if err := f.Close(); err != nil {
    144 			log.Println("Closing configuration", configPath, err)
    145 		}
    146 	}()
    147 
    148 	d := toml.NewDecoder(f).DisallowUnknownFields()
    149 	err = d.Decode(natsim)
    150 	if err != nil {
    151 		var details *toml.StrictMissingError
    152 		if errors.As(err, &details) {
    153 			for _, missing := range details.Errors {
    154 				log.Println(missing)
    155 			}
    156 		}
    157 		return nil, err
    158 	}
    159 
    160 	if natsim.Irc.MaxLine > 0 && len(natsim.Irc.ContPrefix)+len(natsim.Irc.ContSuffix) >= natsim.Irc.MaxLine {
    161 		natsim.Irc.ContPrefix = ""
    162 		natsim.Irc.ContSuffix = ""
    163 	}
    164 
    165 	if natsim.Log.SqlDriver != "" {
    166 		if err := natsim.logInit(); err != nil {
    167 			natsim.Close()
    168 			return nil, err
    169 		}
    170 	}
    171 
    172 	natsim.cmdQueue = make(chan command, 10)
    173 	natsim.ircQueue = make(chan string, 10)
    174 
    175 	natsim.irc = irc.IRC(natsim.Irc.Nick, "natsim")
    176 	natsim.irc.AddCallback("001", natsim.ircJoin)
    177 	natsim.irc.AddCallback("366", natsim.ircJoined)
    178 	natsim.irc.AddCallback("PRIVMSG", natsim.ircReceive)
    179 
    180 	err = natsim.irc.Connect(natsim.Irc.Server)
    181 	if err != nil {
    182 		natsim.Close()
    183 		return nil, err
    184 	}
    185 
    186 	go natsim.doCommands()
    187 	go natsim.ircSender()
    188 
    189 	return natsim, nil
    190 }
    191 
    192 func (natsim *NatsIM) Close() {
    193 	if natsim.irc != nil {
    194 		natsim.irc.Quit()
    195 		natsim.irc = nil
    196 	}
    197 
    198 	if natsim.nc != nil {
    199 		natsim.nc.Close()
    200 		natsim.nc = nil
    201 	}
    202 
    203 	if natsim.ensureSubject != nil {
    204 		if err := natsim.ensureSubject.Close(); err != nil {
    205 			log.Println("Close ensureSubject:", err)
    206 		}
    207 		natsim.ensureSubject = nil
    208 	}
    209 
    210 	if natsim.insertReceived != nil {
    211 		if err := natsim.insertReceived.Close(); err != nil {
    212 			log.Println("Close insertReceived:", err)
    213 		}
    214 		natsim.insertReceived = nil
    215 	}
    216 
    217 	if natsim.insertRHeader != nil {
    218 		if err := natsim.insertRHeader.Close(); err != nil {
    219 			log.Println("Close insertRHeader:", err)
    220 		}
    221 		natsim.insertRHeader = nil
    222 	}
    223 
    224 	if natsim.insertSent != nil {
    225 		if err := natsim.insertSent.Close(); err != nil {
    226 			log.Println("Close insertSent:", err)
    227 		}
    228 		natsim.insertSent = nil
    229 	}
    230 
    231 	if natsim.insertSHeader != nil {
    232 		if err := natsim.insertSHeader.Close(); err != nil {
    233 			log.Println("Close insertSHeader:", err)
    234 		}
    235 		natsim.insertSHeader = nil
    236 	}
    237 
    238 	if natsim.db != nil {
    239 		if err := natsim.db.Close(); err != nil {
    240 			log.Println("Close log DB:", err)
    241 		}
    242 		natsim.db = nil
    243 	}
    244 
    245 	close(natsim.cmdQueue)
    246 	close(natsim.ircQueue)
    247 }
    248 
    249 /**************** Command Goroutine ****************/
    250 
    251 func (natsim *NatsIM) doCommands() {
    252 	for {
    253 		cmd, ok := <-natsim.cmdQueue
    254 		if !ok {
    255 			return
    256 		}
    257 
    258 		switch cmd.name {
    259 		case "curmsg":
    260 			var sb strings.Builder
    261 			sb.WriteString("[WIP]")
    262 
    263 			if natsim.curMsg.Reply != "" {
    264 				show := LineMark{Start: "Reply-To:"}
    265 				if natsim.Irc.ShowReply != nil {
    266 					show = *natsim.Irc.ShowReply
    267 				}
    268 				sb.WriteString(show.Start)
    269 				sb.WriteString(natsim.curMsg.Reply)
    270 				sb.WriteString(show.End)
    271 			}
    272 
    273 			show := LineMark{Mid: ": "}
    274 			if natsim.Irc.ShowHeader != nil {
    275 				show = *natsim.Irc.ShowHeader
    276 			}
    277 			for key, values := range natsim.curMsg.Header {
    278 				for _, value := range values {
    279 					sb.WriteString(packMark(show, key, value))
    280 				}
    281 			}
    282 
    283 			natsim.ircSend(sb.String())
    284 
    285 		case "delheader":
    286 			index := -1
    287 			key := cmd.arg
    288 
    289 			if strings.HasPrefix(cmd.arg, "* ") {
    290 				key = key[2:]
    291 			} else if before, after, found := strings.Cut(cmd.arg, " "); found {
    292 				if n, err := strconv.Atoi(before); err == nil {
    293 					index = n
    294 					key = after
    295 				}
    296 			}
    297 
    298 			if natsim.curMsg.Header == nil || natsim.curMsg.Header[key] == nil {
    299 				natsim.ircSendf("No recorded header %q", key)
    300 			} else if index < 0 {
    301 				delete(natsim.curMsg.Header, key)
    302 			} else if index < len(natsim.curMsg.Header[key]) {
    303 				natsim.curMsg.Header[key] = append(natsim.curMsg.Header[key][:index], natsim.curMsg.Header[key][index+1:]...)
    304 			} else {
    305 				natsim.ircSendf("No index %d in header %q", index, key)
    306 			}
    307 
    308 		case "filter":
    309 			var plist *[]FilterElement
    310 			var name string
    311 			index := -1
    312 			place, eltstr, _ := strings.Cut(cmd.arg, " ")
    313 			uplace := strings.ToUpper(place)
    314 
    315 			if uplace == "NATS" {
    316 				plist = &natsim.Nats.Filter
    317 				name = "N"
    318 			} else if uplace == "LOG" {
    319 				plist = &natsim.Log.Filter
    320 				name = "L"
    321 			} else if uplace == "IRC" {
    322 				plist = &natsim.Irc.Filter
    323 				name = "I"
    324 			} else if n, err := strconv.Atoi(uplace[1:]); err == nil && (uplace[0:1] == "N" || uplace[0:1] == "L" || uplace[0:1] == "I") {
    325 				index = n - 1
    326 				name = uplace[0:1]
    327 				switch name {
    328 				case "N":
    329 					plist = &natsim.Nats.Filter
    330 				case "L":
    331 					plist = &natsim.Log.Filter
    332 				case "I":
    333 					plist = &natsim.Irc.Filter
    334 				}
    335 			} else {
    336 				natsim.ircSendf("Unable to parse place %q", uplace)
    337 			}
    338 
    339 			var elt FilterElement
    340 			if plist != nil {
    341 				err := elt.UnmarshalText([]byte(eltstr))
    342 				if err != nil {
    343 					natsim.ircSendError("Parse FilterElement", err)
    344 					plist = nil
    345 				}
    346 			}
    347 
    348 			if plist != nil {
    349 				if index < 0 {
    350 					index += len(*plist) + 1
    351 				}
    352 
    353 				if index >= 0 && index < len(*plist) {
    354 					*plist = append((*plist)[:index+1], (*plist)[index:]...)
    355 					(*plist)[index] = elt
    356 				} else {
    357 					index = len(*plist)
    358 					*plist = append(*plist, elt)
    359 				}
    360 				natsim.ircSendf("Inserted filter element %s%d/%d", name, index+1, len(*plist))
    361 			}
    362 
    363 		case "filters":
    364 			var buf strings.Builder
    365 			buf.WriteString(fmt.Sprintf("Active filters: %d NATS, %d log, %d IRC", len(natsim.Nats.Filter), len(natsim.Log.Filter), len(natsim.Irc.Filter)))
    366 			WriteFilter(&buf, "\n N", natsim.Nats.Filter)
    367 			WriteFilter(&buf, "\n L", natsim.Log.Filter)
    368 			WriteFilter(&buf, "\n I", natsim.Irc.Filter)
    369 			natsim.ircSend(buf.String())
    370 
    371 		case "header":
    372 			sep := ": "
    373 			if natsim.Irc.ShowHeader != nil {
    374 				sep = natsim.Irc.ShowHeader.Mid
    375 			}
    376 			if key, value, found := strings.Cut(cmd.arg, sep); !found {
    377 				natsim.ircSendf("No header separator %q", sep)
    378 			} else {
    379 				if natsim.curMsg.Header == nil {
    380 					natsim.curMsg.Header = make(nats.Header)
    381 				}
    382 				natsim.curMsg.Header[key] = append(natsim.curMsg.Header[key], value)
    383 			}
    384 
    385 		case "reply-to":
    386 			fallthrough
    387 		case "replyto":
    388 			natsim.curMsg.Reply = cmd.arg
    389 
    390 		case "status":
    391 			var buf strings.Builder
    392 
    393 			if err := natsim.nc.LastError(); err != nil {
    394 				buf.WriteString("Last error: ")
    395 				buf.WriteString(err.Error())
    396 				buf.WriteString("\n")
    397 			}
    398 
    399 			buf.WriteString(natsim.nc.Status().String())
    400 
    401 			if url := natsim.nc.ConnectedUrlRedacted(); url != "" {
    402 				buf.WriteString(" to ")
    403 				buf.WriteString(url)
    404 			}
    405 
    406 			if rtt, err := natsim.nc.RTT(); err == nil {
    407 				buf.WriteString(", RTT ")
    408 				buf.WriteString(rtt.String())
    409 			}
    410 
    411 			buf.WriteString(fmt.Sprintf(", %d subscriptions\n%s", natsim.nc.NumSubscriptions(), natsim.natsStats()))
    412 			natsim.ircSend(buf.String())
    413 
    414 		case "subscribe":
    415 			if s, err := natsim.nc.Subscribe(cmd.arg, natsim.natsReceive); err != nil {
    416 				natsim.ircSendError("Subscribe", err)
    417 			} else {
    418 				natsim.subs = append(natsim.subs, s)
    419 				natsim.ircSendf("Subscribed to %q", s.Subject)
    420 			}
    421 
    422 		case "subscriptions":
    423 			var buf strings.Builder
    424 			buf.WriteString(fmt.Sprintf("Current subscriptions (%d):", len(natsim.subs)))
    425 			for i, s := range natsim.subs {
    426 				buf.WriteString(fmt.Sprintf("\n%d. %s", i+1, s.Subject))
    427 			}
    428 			natsim.ircSend(buf.String())
    429 
    430 		case "quit":
    431 			log.Println("Quit command", cmd.arg)
    432 			natsim.irc.QuitMessage = cmd.arg
    433 			natsim.Close()
    434 
    435 		case "unfilter":
    436 			uplace := strings.ToUpper(cmd.arg)
    437 
    438 			if uplace == "NATS" {
    439 				n := len(natsim.Nats.Filter)
    440 				natsim.Nats.Filter = []FilterElement{}
    441 				natsim.ircSendf("Removed %d NATS filter elements", n)
    442 			} else if uplace == "LOG" {
    443 				n := len(natsim.Log.Filter)
    444 				natsim.Log.Filter = []FilterElement{}
    445 				natsim.ircSendf("Removed %d log filter elements", n)
    446 			} else if uplace == "IRC" {
    447 				n := len(natsim.Irc.Filter)
    448 				natsim.Irc.Filter = []FilterElement{}
    449 				natsim.ircSendf("Removed %d IRC filter elements", n)
    450 			} else if n, err := strconv.Atoi(uplace[1:]); err == nil && (uplace[0:1] == "N" || uplace[0:1] == "L" || uplace[0:1] == "I") {
    451 				var plist *[]FilterElement
    452 				index := n - 1
    453 				name := uplace[0:1]
    454 				switch name {
    455 				case "N":
    456 					plist = &natsim.Nats.Filter
    457 				case "L":
    458 					plist = &natsim.Log.Filter
    459 				case "I":
    460 					plist = &natsim.Irc.Filter
    461 				}
    462 				if n < 0 {
    463 					index = len(*plist) + n
    464 				}
    465 				if index < 0 || index >= len(*plist) {
    466 					natsim.ircSendf("Bad filter index %d for %s%d", index, name, len(*plist))
    467 				} else {
    468 					*plist = append((*plist)[:index], (*plist)[index+1:]...)
    469 					natsim.ircSendf("Removed filter %s%d/%d", name, index+1, len(*plist)+1)
    470 				}
    471 			} else {
    472 				natsim.ircSendf("Unable to parse place %q", uplace)
    473 			}
    474 
    475 		case "unsubscribe":
    476 			if n, err := strconv.Atoi(cmd.arg); err == nil && n > 0 && n <= len(natsim.subs) {
    477 				if err = natsim.subs[n-1].Unsubscribe(); err != nil {
    478 					natsim.ircSendError("Unsubscribe", err)
    479 				} else {
    480 					natsim.ircSendf("Unsubscribed from %q", natsim.subs[n-1].Subject)
    481 					natsim.subs = append(natsim.subs[:n-1], natsim.subs[n:]...)
    482 				}
    483 			} else {
    484 				n := 0
    485 				for i, s := range natsim.subs {
    486 					if s.Subject != cmd.arg {
    487 						natsim.subs[n] = natsim.subs[i]
    488 						n++
    489 					} else if err = s.Unsubscribe(); err != nil {
    490 						natsim.ircSendError("Unsubscribe", err)
    491 					}
    492 				}
    493 				natsim.ircSendf("Unsubscribed from %d subjects", len(natsim.subs)-n)
    494 				natsim.subs = natsim.subs[:n]
    495 			}
    496 
    497 		case "version":
    498 			natsim.ircSendf("natsim %s", version)
    499 
    500 		default:
    501 			natsim.ircSendf("Unknown command %q", cmd.name)
    502 		}
    503 	}
    504 }
    505 
    506 /**************** IRC Callbacks ****************/
    507 
    508 func (natsim *NatsIM) ircJoin(e *irc.Event) {
    509 	natsim.irc.Join(natsim.Irc.Channel)
    510 }
    511 
    512 func (natsim *NatsIM) ircJoined(e *irc.Event) {
    513 	optSeed, err := nats.NkeyOptionFromSeed(natsim.Nats.NkeySeed)
    514 	if err != nil {
    515 		natsim.ircSendError("NkeyOptionFromSeed", err)
    516 		return
    517 	}
    518 
    519 	natsim.nc, err = nats.Connect(natsim.Nats.Server,
    520 		optSeed,
    521 		nats.Name(natsim.Nats.Name),
    522 		nats.ConnectHandler(natsim.natsConnected),
    523 		nats.DisconnectErrHandler(natsim.natsDisconnected),
    524 		nats.ReconnectHandler(natsim.natsReconnected),
    525 		nats.ReconnectErrHandler(natsim.natsReconnectErr))
    526 	if err != nil {
    527 		natsim.ircSendError("Connect", err)
    528 		return
    529 	}
    530 
    531 	for _, subject := range natsim.Nats.Subjects {
    532 		if s, err := natsim.nc.Subscribe(subject, natsim.natsReceive); err != nil {
    533 			natsim.ircSendError("Subscribe", err)
    534 		} else {
    535 			natsim.subs = append(natsim.subs, s)
    536 		}
    537 	}
    538 }
    539 
    540 func (natsim *NatsIM) ircReceive(e *irc.Event) {
    541 	msg := e.Message()
    542 	if name, arg, found := unpackMark(natsim.Irc.Cmd, msg, true); found {
    543 		natsim.cmdQueue <- command{name: name, arg: arg}
    544 	} else if subject, data, found := unpackMark(natsim.Irc.Send, msg, false); found {
    545 		natsim.curMsg.Subject = subject
    546 		natsim.curMsg.Data = []byte(data)
    547 		if err := natsim.nc.PublishMsg(&natsim.curMsg); err != nil {
    548 			natsim.ircSendError("Publish", err)
    549 		} else {
    550 			natsim.logSent(&natsim.curMsg)
    551 		}
    552 		natsim.curMsg = nats.Msg{}
    553 	}
    554 }
    555 
    556 func (natsim *NatsIM) ircSendError(context string, err error) {
    557 	prefix := "[E] "
    558 	if context != "" {
    559 		prefix += context + ": "
    560 	}
    561 	natsim.ircSend(prefix + err.Error())
    562 }
    563 
    564 func (natsim *NatsIM) ircSend(s string) {
    565 	select {
    566 	case natsim.ircQueue <- s:
    567 	default:
    568 		natsim.dropped.Add(1)
    569 	}
    570 }
    571 
    572 func (natsim *NatsIM) ircSendf(format string, a ...interface{}) {
    573 	natsim.ircSend(fmt.Sprintf(format, a...))
    574 }
    575 
    576 func (natsim *NatsIM) ircSender() {
    577 	var dropped uint32
    578 	var nindex = 0
    579 	floodend := make([]time.Time, natsim.Irc.AntiFlood.count)
    580 	delay := natsim.Irc.AntiFlood.delay / time.Duration(natsim.Irc.AntiFlood.count)
    581 	prev := time.Now()
    582 
    583 	for {
    584 		var lines []string
    585 
    586 		if len(natsim.ircQueue) == 0 {
    587 			dropped += natsim.dropped.Swap(0)
    588 		}
    589 
    590 		if dropped > 0 {
    591 			select {
    592 			case s, ok := <-natsim.ircQueue:
    593 				if ok {
    594 					lines = natsim.ircSplit(s)
    595 				} else {
    596 					return
    597 				}
    598 			case <-time.After(delay):
    599 				dropped += natsim.dropped.Swap(0)
    600 				lines = []string{fmt.Sprintf("Dropped %d messages", dropped)}
    601 				dropped = 0
    602 			}
    603 		} else {
    604 			s, ok := <-natsim.ircQueue
    605 			if ok {
    606 				lines = natsim.ircSplit(s)
    607 			} else {
    608 				return
    609 			}
    610 		}
    611 
    612 		for _, line := range lines {
    613 			if time.Until(floodend[nindex]) > 0 {
    614 				time.Sleep(time.Until(prev.Add(delay)))
    615 			}
    616 
    617 			natsim.irc.Privmsg(natsim.Irc.Channel, line)
    618 
    619 			prev = time.Now()
    620 			floodend[nindex] = prev.Add(natsim.Irc.AntiFlood.delay)
    621 			nindex = (nindex + 1) % natsim.Irc.AntiFlood.count
    622 		}
    623 	}
    624 }
    625 
    626 func (natsim *NatsIM) ircSplit(s string) []string {
    627 	var result []string
    628 
    629 	for _, line := range strings.Split(s, "\n") {
    630 		if natsim.Irc.MaxLine <= 0 || len(line) < natsim.Irc.MaxLine {
    631 			result = append(result, line)
    632 		} else {
    633 			for offset := 0; offset < len(line); {
    634 				var buf strings.Builder
    635 				l := len(line) - offset
    636 				if offset > 0 {
    637 					buf.WriteString(natsim.Irc.ContPrefix)
    638 				}
    639 
    640 				if buf.Len()+l <= natsim.Irc.MaxLine {
    641 					buf.WriteString(line[offset:])
    642 				} else {
    643 					l = natsim.Irc.MaxLine - buf.Len() - len(natsim.Irc.ContSuffix)
    644 					buf.WriteString(line[offset : offset+l])
    645 					buf.WriteString(natsim.Irc.ContSuffix)
    646 				}
    647 
    648 				result = append(result, buf.String())
    649 				offset += l
    650 			}
    651 		}
    652 	}
    653 
    654 	return result
    655 }
    656 
    657 /**************** Nats Callbacks ****************/
    658 
    659 func (natsim *NatsIM) natsConnected(c *nats.Conn) {
    660 	natsim.ircSend("Connected to " + c.ConnectedUrlRedacted())
    661 }
    662 
    663 func (natsim *NatsIM) natsDisconnected(c *nats.Conn, err error) {
    664 	if err != nil {
    665 		natsim.ircSendError("Disconnected", err)
    666 	}
    667 }
    668 
    669 func (natsim *NatsIM) natsReceive(m *nats.Msg) {
    670 	if !IsKept(m.Subject, m.Data, natsim.Nats.Filter, true) {
    671 		return
    672 	}
    673 
    674 	if IsKept(m.Subject, m.Data, natsim.Log.Filter, true) {
    675 		natsim.logReceived(m)
    676 	}
    677 
    678 	if !IsKept(m.Subject, m.Data, natsim.Irc.Filter, true) {
    679 		return
    680 	}
    681 
    682 	var sb strings.Builder
    683 	sb.WriteString(packMark(natsim.Irc.Show, m.Subject, string(m.Data)))
    684 
    685 	if m.Reply != "" && natsim.Irc.ShowReply != nil {
    686 		sb.WriteString(natsim.Irc.ShowReply.Start)
    687 		sb.WriteString(m.Reply)
    688 		sb.WriteString(natsim.Irc.ShowReply.End)
    689 	}
    690 
    691 	if natsim.Irc.ShowHeader != nil {
    692 		for key, values := range m.Header {
    693 			for _, value := range values {
    694 				sb.WriteString(packMark(*natsim.Irc.ShowHeader, key, value))
    695 			}
    696 		}
    697 	}
    698 
    699 	natsim.ircSend(sb.String())
    700 }
    701 
    702 func (natsim *NatsIM) natsReconnected(c *nats.Conn) {
    703 	natsim.ircSend("Reconnected to " + c.ConnectedUrlRedacted())
    704 }
    705 
    706 func (natsim *NatsIM) natsReconnectErr(c *nats.Conn, err error) {
    707 	natsim.ircSendError("Reconnect", err)
    708 }
    709 
    710 func (natsim *NatsIM) natsStats() string {
    711 	stats := natsim.nc.Stats()
    712 	return fmt.Sprintf("%d reconnections, %s / %s msg in, %s / %s msg out",
    713 		stats.Reconnects,
    714 		humanize.IBytes(stats.InBytes),
    715 		humanizeNum(stats.InMsgs),
    716 		humanize.IBytes(stats.OutBytes),
    717 		humanizeNum(stats.OutMsgs))
    718 }
    719 
    720 /**************** Log to Database ****************/
    721 
    722 //go:embed init.sql
    723 var embeddedSQL embed.FS
    724 
    725 func (natsim *NatsIM) logInit() error {
    726 	if natsim.Log.SqlDriver == "" {
    727 		return nil
    728 	}
    729 
    730 	var err error
    731 
    732 	natsim.db, err = sql.Open(natsim.Log.SqlDriver, natsim.Log.SqlConnection)
    733 	if err != nil {
    734 		log.Println("sql.Open:", err)
    735 		return err
    736 	}
    737 
    738 	var version int
    739 	if err = natsim.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
    740 		log.Println("query user_verison", err)
    741 		return err
    742 	}
    743 
    744 	switch version {
    745 	case 0:
    746 		initSQL, err := embeddedSQL.ReadFile("init.sql")
    747 		if err != nil {
    748 			log.Println("embedded.ReadFile:", err)
    749 			return err
    750 		}
    751 
    752 		if _, err = natsim.db.Exec(string(initSQL)); err != nil {
    753 			log.Println("Init log DB:", err)
    754 			return err
    755 		}
    756 
    757 	case 1:
    758 
    759 	default:
    760 		log.Println("Unsupported database version:", version)
    761 		return errors.New("unsupported database version")
    762 	}
    763 
    764 	natsim.ensureSubject, err = natsim.db.Prepare("INSERT INTO subjects(name) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM subjects WHERE name = ?);")
    765 	if err != nil {
    766 		log.Println("Prepare ensureSubject:", err)
    767 		return err
    768 	}
    769 
    770 	natsim.insertReceived, err = natsim.db.Prepare("INSERT INTO received(timestamp,subject_id,reply_subject_id,data) VALUES (?, (SELECT id FROM subjects WHERE name = ?), (SELECT id FROM subjects WHERE name = ?), ?);")
    771 	if err != nil {
    772 		log.Println("Prepare insertReceived:", err)
    773 		return err
    774 	}
    775 
    776 	natsim.insertRHeader, err = natsim.db.Prepare("INSERT INTO received_headers_view(msg_id,key,value) VALUES (?,?,?);")
    777 	if err != nil {
    778 		log.Println("Prepare insertRHeader:", err)
    779 		return err
    780 	}
    781 
    782 	natsim.insertSent, err = natsim.db.Prepare("INSERT INTO sent(timestamp,subject_id,reply_subject_id,data) VALUES (?, (SELECT id FROM subjects WHERE name = ?), (SELECT id FROM subjects WHERE name = ?), ?);")
    783 	if err != nil {
    784 		log.Println("Prepare insertSent:", err)
    785 		return err
    786 	}
    787 
    788 	natsim.insertSHeader, err = natsim.db.Prepare("INSERT INTO sent_headers_view(msg_id,key,value) VALUES (?,?,?);")
    789 	if err != nil {
    790 		log.Println("Prepare insertSHeader:", err)
    791 		return err
    792 	}
    793 
    794 	return nil
    795 }
    796 
    797 func (natsim *NatsIM) logMsg(msg *nats.Msg, insertMsg, insertHeader *sql.Stmt) {
    798 	if natsim.db == nil || natsim.insertReceived == nil {
    799 		return
    800 	}
    801 
    802 	if _, err := natsim.ensureSubject.Exec(msg.Subject, msg.Subject); err != nil {
    803 		natsim.ircSendError("ensureSubject.Exec", err)
    804 		return
    805 	}
    806 
    807 	var reply sql.NullString
    808 	if msg.Reply != "" {
    809 		if _, err := natsim.ensureSubject.Exec(msg.Reply, msg.Reply); err != nil {
    810 			natsim.ircSendError("ensureReply.Exec", err)
    811 			return
    812 		}
    813 		reply = sql.NullString{String: msg.Reply, Valid: true}
    814 	}
    815 
    816 	t := float64(time.Now().UnixNano())/8.64e13 + 2440587.5
    817 	if r, err := insertMsg.Exec(t, msg.Subject, reply, msg.Data); err != nil {
    818 		natsim.ircSendError("insertMsg.Exec", err)
    819 	} else if id, err := r.LastInsertId(); err != nil {
    820 		natsim.ircSendError("LastInsertId", err)
    821 	} else if id <= 0 {
    822 		natsim.ircSendf("LastInsertId returned invalid id %d", id)
    823 	} else {
    824 		for key, values := range msg.Header {
    825 			for _, value := range values {
    826 				if _, err := insertHeader.Exec(id, key, value); err != nil {
    827 					natsim.ircSendf("insertHeader(%q, %q): %s", key, value, err)
    828 				}
    829 			}
    830 		}
    831 	}
    832 }
    833 
    834 func (natsim *NatsIM) logReceived(msg *nats.Msg) {
    835 	natsim.logMsg(msg, natsim.insertReceived, natsim.insertRHeader)
    836 }
    837 
    838 func (natsim *NatsIM) logSent(msg *nats.Msg) {
    839 	natsim.logMsg(msg, natsim.insertSent, natsim.insertSHeader)
    840 }
    841 
    842 /**************** Filters ****************/
    843 
    844 type FilterElement struct {
    845 	Result bool
    846 	Part   FilterPart
    847 	Test   *regexp.Regexp
    848 }
    849 
    850 type FilterPart int
    851 
    852 const (
    853 	FilterSubject FilterPart = iota
    854 	FilterData
    855 )
    856 
    857 func (element *FilterElement) Match(subject string, data []byte) bool {
    858 	var b []byte
    859 	switch element.Part {
    860 	case FilterSubject:
    861 		b = []byte(subject)
    862 	case FilterData:
    863 		b = data
    864 	default:
    865 		panic("Unexpected part")
    866 	}
    867 
    868 	return element.Test.Match(b)
    869 }
    870 
    871 func (element *FilterElement) String() string {
    872 	r := "drop "
    873 	if element.Result {
    874 		r = "pass "
    875 	}
    876 
    877 	p := ""
    878 	switch element.Part {
    879 	case FilterSubject:
    880 		p = "subject "
    881 	case FilterData:
    882 		p = "data "
    883 	default:
    884 		panic("Unexpected part")
    885 	}
    886 
    887 	return r + p + element.Test.String()
    888 }
    889 
    890 func (element *FilterElement) UnmarshalText(text []byte) error {
    891 	s := string(text)
    892 
    893 	switch {
    894 	case strings.HasPrefix(s, "pass "):
    895 		element.Result = true
    896 		s = s[5:]
    897 	case strings.HasPrefix(s, "drop "):
    898 		element.Result = false
    899 		s = s[5:]
    900 	default:
    901 		return fmt.Errorf("malformed filter %q", s)
    902 	}
    903 
    904 	switch {
    905 	case strings.HasPrefix(s, "subject "):
    906 		element.Part = FilterSubject
    907 		s = s[8:]
    908 	case strings.HasPrefix(s, "data "):
    909 		element.Part = FilterData
    910 		s = s[5:]
    911 	default:
    912 		return fmt.Errorf("bad filter part %q", s)
    913 	}
    914 
    915 	re, err := regexp.Compile(s)
    916 	element.Test = re
    917 	return err
    918 }
    919 
    920 func WriteFilter(buf *strings.Builder, prefix string, filter []FilterElement) {
    921 	for i, element := range filter {
    922 		line := fmt.Sprintf("%s%d. %s", prefix, i+1, element.String())
    923 		buf.WriteString(line)
    924 	}
    925 }
    926 
    927 func IsKept(subject string, data []byte, elements []FilterElement, base bool) bool {
    928 	for _, element := range elements {
    929 		if element.Match(subject, data) {
    930 			return element.Result
    931 		}
    932 	}
    933 
    934 	return base
    935 }
    936 
    937 /**************** Tools ****************/
    938 
    939 type antiflood struct {
    940 	count int
    941 	delay time.Duration
    942 }
    943 
    944 func (af *antiflood) UnmarshalText(text []byte) error {
    945 	if before, after, found := strings.Cut(string(text), "/"); found {
    946 		if n, err := strconv.Atoi(before); err != nil {
    947 			return err
    948 		} else {
    949 			af.count = n
    950 		}
    951 
    952 		if d, err := time.ParseDuration(after); err != nil {
    953 			return err
    954 		} else {
    955 			af.delay = d
    956 		}
    957 
    958 	} else if d, err := time.ParseDuration(string(text)); err != nil {
    959 		return err
    960 	} else {
    961 		af.count = 1
    962 		af.delay = d
    963 	}
    964 
    965 	return nil
    966 }
    967 
    968 func humanizeNum(n uint64) string {
    969 	num, unit, found := strings.Cut(humanize.Bytes(n), " ")
    970 	if !found || unit == "" || unit[len(unit)-1:] != "B" {
    971 		panic("Unexpected huamized result")
    972 	}
    973 	return num + unit[:len(unit)-1]
    974 }
    975 
    976 func packMark(mark LineMark, name, arg string) string {
    977 	return mark.Start + name + mark.Mid + arg + mark.End
    978 }
    979 
    980 func unpackMark(mark LineMark, line string, optional bool) (string, string, bool) {
    981 	if strings.HasPrefix(line, mark.Start) && strings.HasSuffix(line, mark.End) {
    982 		inside := line[len(mark.Start) : len(line)-len(mark.End)]
    983 		if mark.Mid == "" {
    984 			return inside, "", true
    985 		} else if name, arg, found := strings.Cut(inside, mark.Mid); found {
    986 			return name, arg, true
    987 		} else {
    988 			return inside, "", optional
    989 		}
    990 	} else {
    991 		return "", "", false
    992 	}
    993 }
    994 
    995 var version = "(unknown)"
    996 
    997 func setVersion() {
    998 	info, ok := debug.ReadBuildInfo()
    999 	if !ok {
   1000 		return
   1001 	}
   1002 
   1003 	version = info.Main.Version
   1004 
   1005 	if version == "(devel)" {
   1006 		vcs := ""
   1007 		rev := ""
   1008 		dirty := ""
   1009 		for _, setting := range info.Settings {
   1010 			switch setting.Key {
   1011 			case "vcs":
   1012 				vcs = setting.Value + "-"
   1013 			case "vcs.revision":
   1014 				rev = setting.Value[0:8]
   1015 			case "vcs.modified":
   1016 				if setting.Value == "true" {
   1017 					dirty = "*"
   1018 				}
   1019 			}
   1020 		}
   1021 
   1022 		if rev != "" {
   1023 			version = vcs + rev + dirty
   1024 		}
   1025 	}
   1026 }