diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 09:50:48 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 09:50:48 +0200 |
| commit | 7ce4aee16c0223cc15c9dd8d9024120069500c65 (patch) | |
| tree | c702ae1dc708e81911c741b7e2312289ab389a4f | |
| parent | d8f88d455990636bb797643dee7d39a95bbbd62c (diff) | |
task 57753d28: add interactive query control
| -rw-r--r-- | internal/clients/baseclient.go | 7 | ||||
| -rw-r--r-- | internal/clients/interactive_control.go | 368 | ||||
| -rw-r--r-- | internal/clients/interactive_control_test.go | 223 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 14 | ||||
| -rw-r--r-- | internal/clients/query_regex.go | 30 |
5 files changed, 629 insertions, 13 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index de76bf1..0e67ba4 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -90,6 +90,13 @@ func (c *baseClient) makeConnections(maker maker) { } func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { + if c.Args.InteractiveQuery { + return c.startInteractiveControl(ctx, statsCh) + } + return c.runConnections(ctx, statsCh) +} + +func (c *baseClient) runConnections(ctx context.Context, statsCh <-chan string) (status int) { dlog.Client.Trace("Starting base client") // Can be nil when serverless. if c.hostKeyCallback != nil { diff --git a/internal/clients/interactive_control.go b/internal/clients/interactive_control.go new file mode 100644 index 0000000..56fd85f --- /dev/null +++ b/internal/clients/interactive_control.go @@ -0,0 +1,368 @@ +package clients + +import ( + "bufio" + "context" + "errors" + "flag" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/mimecast/dtail/internal/clients/connectors" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/omode" +) + +const interactiveControlTimeout = 2 * time.Second + +type interactiveCommand struct { + spec SessionSpec + kind string + next config.Args +} + +func (c *baseClient) startInteractiveControl(ctx context.Context, statsCh <-chan string) int { + controlTTY, err := os.OpenFile(c.Args.ControlTTYPath, os.O_RDWR, 0) + if err != nil { + dlog.Client.Error("Unable to open interactive query control TTY", c.Args.ControlTTYPath, err) + return 1 + } + defer controlTTY.Close() + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + statusCh := make(chan int, 1) + go func() { + statusCh <- c.runConnections(runCtx, statsCh) + }() + + controlErrCh := make(chan error, 1) + go func() { + controlErrCh <- c.runInteractiveControl(runCtx, cancel, controlTTY) + }() + + select { + case status := <-statusCh: + cancel() + <-controlErrCh + return status + case err := <-controlErrCh: + if err != nil { + dlog.Client.Warn("Interactive query control stopped", err) + } + cancel() + return <-statusCh + } +} + +func (c *baseClient) runInteractiveControl(ctx context.Context, cancel context.CancelFunc, tty *os.File) error { + if _, err := fmt.Fprintf(tty, + "Interactive query control enabled. Commands: :reload <flags>, :show, :help, :quit\n"); err != nil { + return err + } + + reader := bufio.NewScanner(tty) + reader.Buffer(make([]byte, 0, 1024), 1024*1024) + + go func() { + <-ctx.Done() + _ = tty.Close() + }() + + for reader.Scan() { + line := strings.TrimSpace(reader.Text()) + if line == "" { + continue + } + + command, err := parseInteractiveCommand(c.Args, line) + if err != nil { + if writeErr := writeControlLine(tty, "interactive query error: "+err.Error()); writeErr != nil { + return writeErr + } + continue + } + + switch command.kind { + case "help": + if err := c.writeInteractiveHelp(tty); err != nil { + return err + } + case "show": + if err := c.writeInteractiveState(tty); err != nil { + return err + } + case "quit": + if err := writeControlLine(tty, "quitting interactive session"); err != nil { + return err + } + cancel() + return nil + case "reload": + if err := c.applyInteractiveReload(command.next, command.spec); err != nil { + if writeErr := writeControlLine(tty, "reload failed: "+err.Error()); writeErr != nil { + return writeErr + } + continue + } + if err := writeControlLine(tty, "reload applied successfully"); err != nil { + return err + } + default: + if err := writeControlLine(tty, "unsupported command"); err != nil { + return err + } + } + } + + if err := reader.Err(); err != nil && ctx.Err() == nil && !errors.Is(err, os.ErrClosed) { + return err + } + return nil +} + +func (c *baseClient) applyInteractiveReload(nextArgs config.Args, nextSpec SessionSpec) error { + if len(c.connections) == 0 { + return errors.New("no active connections") + } + + var unsupported []string + for _, conn := range c.connections { + if !conn.SupportsQueryUpdates(interactiveControlTimeout) { + unsupported = append(unsupported, conn.Server()) + } + } + if len(unsupported) > 0 { + return fmt.Errorf("%w: %s", connectors.ErrSessionUnsupported, strings.Join(unsupported, ", ")) + } + + var generation uint64 + for _, conn := range c.connections { + if err := conn.ApplySessionSpec(nextSpec, interactiveControlTimeout); err != nil { + return fmt.Errorf("%s: %w", conn.Server(), err) + } + + _, committedGeneration, ok := conn.CommittedSession() + if !ok || committedGeneration == 0 { + return fmt.Errorf("%s: missing committed session generation", conn.Server()) + } + if generation == 0 { + generation = committedGeneration + continue + } + if generation != committedGeneration { + return fmt.Errorf("mismatched committed generations: got %d and %d", generation, committedGeneration) + } + } + + if committer, ok := c.maker.(sessionCommitter); ok { + if err := committer.commitSessionSpec(nextSpec, generation); err != nil { + return fmt.Errorf("commit session state: %w", err) + } + } + + c.Args = nextArgs + c.sessionSpec = nextSpec + + return nil +} + +func (c *baseClient) writeInteractiveHelp(writer io.Writer) error { + return writeControlLine(writer, + "Commands: :reload <flags>, :show, :help, :quit. Use quotes around multi-word values such as --query \"select count(status) from stats\".") +} + +func (c *baseClient) writeInteractiveState(writer io.Writer) error { + spec := c.sessionSpec + ready := 0 + for _, conn := range c.connections { + if conn.SupportsQueryUpdates(0) { + ready++ + } + } + + line := fmt.Sprintf( + "mode=%s files=%s query=%q regex=%q options=%q timeout=%d capable=%d/%d", + c.Args.Mode, + strings.Join(spec.Files, ","), + spec.Query, + spec.Regex, + spec.Options, + spec.Timeout, + ready, + len(c.connections), + ) + return writeControlLine(writer, line) +} + +func parseInteractiveCommand(current config.Args, line string) (interactiveCommand, error) { + line = strings.TrimSpace(line) + + switch { + case line == ":help": + return interactiveCommand{kind: "help"}, nil + case line == ":show": + return interactiveCommand{kind: "show"}, nil + case line == ":quit": + return interactiveCommand{kind: "quit"}, nil + case strings.HasPrefix(line, ":reload"): + remainder := strings.TrimSpace(strings.TrimPrefix(line, ":reload")) + if remainder == "" { + return interactiveCommand{}, errors.New("reload requires flags to change") + } + tokens, err := splitInteractiveArgs(remainder) + if err != nil { + return interactiveCommand{}, err + } + nextArgs, err := parseInteractiveReloadArgs(current, tokens) + if err != nil { + return interactiveCommand{}, err + } + nextSpec, err := buildInteractiveSessionSpec(nextArgs) + if err != nil { + return interactiveCommand{}, err + } + return interactiveCommand{ + kind: "reload", + next: nextArgs, + spec: nextSpec, + }, nil + default: + return interactiveCommand{}, fmt.Errorf("unknown command %q", line) + } +} + +func parseInteractiveReloadArgs(current config.Args, tokens []string) (config.Args, error) { + next := current + fs := flag.NewFlagSet("reload", flag.ContinueOnError) + fs.SetOutput(io.Discard) + + fs.StringVar(&next.What, "files", current.What, "File(s) to read") + fs.BoolVar(&next.Plain, "plain", current.Plain, "Plain output mode") + fs.BoolVar(&next.Quiet, "quiet", current.Quiet, "Quiet output mode") + fs.IntVar(&next.Timeout, "timeout", current.Timeout, "Max time dtail server will collect data until disconnection") + + switch { + case isInteractiveQueryMode(current): + fs.StringVar(&next.QueryStr, "query", current.QueryStr, "Map reduce query") + case current.Mode == omode.GrepClient || current.Mode == omode.TailClient: + var grep string + fs.StringVar(&next.RegexStr, "regex", current.RegexStr, "Regular expression") + fs.StringVar(&grep, "grep", "", "Alias for -regex") + fs.BoolVar(&next.RegexInvert, "invert", current.RegexInvert, "Invert regex") + fs.IntVar(&next.LContext.BeforeContext, "before", current.LContext.BeforeContext, "Leading context lines") + fs.IntVar(&next.LContext.AfterContext, "after", current.LContext.AfterContext, "Trailing context lines") + fs.IntVar(&next.LContext.MaxCount, "max", current.LContext.MaxCount, "Maximum number of matches") + if err := fs.Parse(tokens); err != nil { + return current, err + } + if grep != "" { + next.RegexStr = grep + } + if len(fs.Args()) > 0 { + return current, fmt.Errorf("unexpected arguments: %s", strings.Join(fs.Args(), " ")) + } + return next, nil + case current.Mode == omode.CatClient: + default: + return current, fmt.Errorf("interactive reload is unsupported for mode %s", current.Mode) + } + + if err := fs.Parse(tokens); err != nil { + return current, err + } + if len(fs.Args()) > 0 { + return current, fmt.Errorf("unexpected arguments: %s", strings.Join(fs.Args(), " ")) + } + return next, nil +} + +func buildInteractiveSessionSpec(args config.Args) (SessionSpec, error) { + normalizedArgs, err := normalizeInteractiveArgs(args) + if err != nil { + return SessionSpec{}, err + } + + spec := NewSessionSpec(normalizedArgs) + if _, err := spec.Commands(); err != nil { + return SessionSpec{}, err + } + return spec, nil +} + +func normalizeInteractiveArgs(args config.Args) (config.Args, error) { + if !isInteractiveQueryMode(args) { + return args, nil + } + + _, regexValue, err := maprRegexFromQueryString(args.QueryStr) + if err != nil { + return args, err + } + args.RegexStr = regexValue + return args, nil +} + +func isInteractiveQueryMode(args config.Args) bool { + return strings.TrimSpace(args.QueryStr) != "" && + (args.Mode == omode.MapClient || args.Mode == omode.TailClient) +} + +func splitInteractiveArgs(raw string) ([]string, error) { + var ( + tokens []string + current strings.Builder + inQuote rune + escaped bool + ) + + flush := func() { + if current.Len() == 0 { + return + } + tokens = append(tokens, current.String()) + current.Reset() + } + + for _, r := range raw { + switch { + case escaped: + current.WriteRune(r) + escaped = false + case r == '\\': + escaped = true + case inQuote != 0: + if r == inQuote { + inQuote = 0 + continue + } + current.WriteRune(r) + case r == '\'' || r == '"': + inQuote = r + case r == ' ' || r == '\t': + flush() + default: + current.WriteRune(r) + } + } + + if escaped { + return nil, errors.New("unterminated escape sequence") + } + if inQuote != 0 { + return nil, errors.New("unterminated quoted string") + } + + flush() + return tokens, nil +} + +func writeControlLine(writer io.Writer, message string) error { + _, err := fmt.Fprintf(writer, "%s\n", message) + return err +} diff --git a/internal/clients/interactive_control_test.go b/internal/clients/interactive_control_test.go new file mode 100644 index 0000000..1cc31ec --- /dev/null +++ b/internal/clients/interactive_control_test.go @@ -0,0 +1,223 @@ +package clients + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/mimecast/dtail/internal/clients/connectors" + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/omode" + sessionspec "github.com/mimecast/dtail/internal/session" +) + +func TestParseInteractiveCommandForGrepReload(t *testing.T) { + current := config.Args{ + Mode: omode.GrepClient, + What: "/var/log/app.log", + RegexStr: "ERROR", + } + + command, err := parseInteractiveCommand(current, + `:reload --grep WARN --before 2 --after 3 --max 4 --invert --files "/tmp/other.log" --plain --quiet --timeout 7`) + if err != nil { + t.Fatalf("parseInteractiveCommand() error = %v", err) + } + + if command.kind != "reload" { + t.Fatalf("command kind = %q, want reload", command.kind) + } + if command.next.What != "/tmp/other.log" { + t.Fatalf("files = %q, want /tmp/other.log", command.next.What) + } + if command.next.RegexStr != "WARN" { + t.Fatalf("regex = %q, want WARN", command.next.RegexStr) + } + if !command.next.RegexInvert || !command.next.Plain || !command.next.Quiet { + t.Fatalf("expected invert/plain/quiet flags to be set: %#v", command.next) + } + if command.next.LContext.BeforeContext != 2 || command.next.LContext.AfterContext != 3 || command.next.LContext.MaxCount != 4 { + t.Fatalf("unexpected context values: %#v", command.next.LContext) + } + if command.spec.Regex != "WARN" { + t.Fatalf("session spec regex = %q, want WARN", command.spec.Regex) + } +} + +func TestParseInteractiveCommandForMapReloadDerivesRegex(t *testing.T) { + current := config.Args{ + Mode: omode.MapClient, + What: "/var/log/app.log", + QueryStr: "select count(status) from stats group by status", + } + + command, err := parseInteractiveCommand(current, + `:reload --query "select count(status) from warnings group by status" --files /tmp/new.log --plain --timeout 9`) + if err != nil { + t.Fatalf("parseInteractiveCommand() error = %v", err) + } + + if command.kind != "reload" { + t.Fatalf("command kind = %q, want reload", command.kind) + } + if command.next.QueryStr != "select count(status) from warnings group by status" { + t.Fatalf("query = %q", command.next.QueryStr) + } + if command.spec.Regex != "\\|MAPREDUCE:WARNINGS\\|" { + t.Fatalf("session spec regex = %q, want WARNINGS table regex", command.spec.Regex) + } +} + +func TestParseInteractiveCommandRejectsUnterminatedQuotes(t *testing.T) { + current := config.Args{ + Mode: omode.MapClient, + QueryStr: "select count(status) from stats group by status", + } + + if _, err := parseInteractiveCommand(current, `:reload --query "select count(status) from stats`); err == nil { + t.Fatalf("expected parseInteractiveCommand() to reject unterminated quoted input") + } +} + +func TestApplyInteractiveReloadRejectsUnsupportedConnections(t *testing.T) { + client := &baseClient{ + Args: config.Args{ + Mode: omode.GrepClient, + What: "/var/log/app.log", + RegexStr: "ERROR", + }, + sessionSpec: SessionSpec{ + Mode: omode.GrepClient, + Files: []string{"/var/log/app.log"}, + Regex: "ERROR", + }, + connections: []connectors.Connector{ + &interactiveReloadConnector{server: "srv1", supported: false}, + }, + } + + err := client.applyInteractiveReload(config.Args{ + Mode: omode.GrepClient, + What: "/tmp/next.log", + RegexStr: "WARN", + }, SessionSpec{ + Mode: omode.GrepClient, + Files: []string{"/tmp/next.log"}, + Regex: "WARN", + }) + if !errors.Is(err, connectors.ErrSessionUnsupported) { + t.Fatalf("expected ErrSessionUnsupported, got %v", err) + } + if client.Args.What != "/var/log/app.log" || client.sessionSpec.Regex != "ERROR" { + t.Fatalf("client state changed on unsupported reload: args=%#v spec=%#v", client.Args, client.sessionSpec) + } +} + +func TestApplyInteractiveReloadCommitsSharedState(t *testing.T) { + connA := &interactiveReloadConnector{server: "srv1", supported: true, generation: 4} + connB := &interactiveReloadConnector{server: "srv2", supported: true, generation: 4} + maker := &interactiveReloadMaker{} + + client := &baseClient{ + Args: config.Args{ + Mode: omode.MapClient, + What: "/var/log/app.log", + QueryStr: "select count(status) from stats group by status", + }, + sessionSpec: SessionSpec{ + Mode: omode.MapClient, + Files: []string{"/var/log/app.log"}, + Query: "select count(status) from stats group by status", + Regex: "\\|MAPREDUCE:STATS\\|", + }, + connections: []connectors.Connector{connA, connB}, + maker: maker, + } + + nextArgs := config.Args{ + Mode: omode.MapClient, + What: "/tmp/new.log", + QueryStr: "select count(status) from warnings group by status", + Plain: true, + Timeout: 5, + } + nextSpec := SessionSpec{ + Mode: omode.MapClient, + Files: []string{"/tmp/new.log"}, + Query: nextArgs.QueryStr, + Regex: "\\|MAPREDUCE:WARNINGS\\|", + Timeout: 5, + } + + if err := client.applyInteractiveReload(nextArgs, nextSpec); err != nil { + t.Fatalf("applyInteractiveReload() error = %v", err) + } + + if client.Args.What != "/tmp/new.log" || client.sessionSpec.Query != nextArgs.QueryStr { + t.Fatalf("client state not committed: args=%#v spec=%#v", client.Args, client.sessionSpec) + } + if len(maker.commits) != 1 { + t.Fatalf("expected one sessionCommitter call, got %d", len(maker.commits)) + } + if maker.commits[0].generation != 4 || maker.commits[0].spec.Query != nextArgs.QueryStr { + t.Fatalf("unexpected commit payload: %#v", maker.commits[0]) + } + if connA.appliedSpec.Query != nextArgs.QueryStr || connB.appliedSpec.Query != nextArgs.QueryStr { + t.Fatalf("connectors did not receive new session spec: %#v %#v", connA.appliedSpec, connB.appliedSpec) + } +} + +type interactiveReloadConnector struct { + appliedSpec sessionspec.Spec + applyErr error + generation uint64 + server string + supported bool +} + +func (*interactiveReloadConnector) Start(context.Context, context.CancelFunc, chan struct{}, chan struct{}) { +} + +func (c *interactiveReloadConnector) Server() string { return c.server } + +func (*interactiveReloadConnector) Handler() handlers.Handler { return nil } + +func (c *interactiveReloadConnector) SupportsQueryUpdates(time.Duration) bool { return c.supported } + +func (c *interactiveReloadConnector) ApplySessionSpec(spec sessionspec.Spec, _ time.Duration) error { + if c.applyErr != nil { + return c.applyErr + } + c.appliedSpec = spec + return nil +} + +func (c *interactiveReloadConnector) CommittedSession() (sessionspec.Spec, uint64, bool) { + if c.generation == 0 { + return sessionspec.Spec{}, 0, false + } + return c.appliedSpec, c.generation, true +} + +type interactiveReloadMaker struct { + commits []interactiveReloadCommit +} + +type interactiveReloadCommit struct { + generation uint64 + spec SessionSpec +} + +func (*interactiveReloadMaker) makeHandler(string) handlers.Handler { return nil } + +func (*interactiveReloadMaker) makeCommands() []string { return nil } + +func (m *interactiveReloadMaker) commitSessionSpec(spec SessionSpec, generation uint64) error { + m.commits = append(m.commits, interactiveReloadCommit{ + generation: generation, + spec: spec, + }) + return nil +} diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 68770ec..3e46ecf 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -258,19 +258,7 @@ func (c *MaprClient) isCumulative(query *mapr.Query) bool { } func (c *MaprClient) setRegexForQuery(query *mapr.Query) { - if query == nil { - c.RegexStr = "." - return - } - - switch query.Table { - case "", ".": - c.RegexStr = "." - case "*": - c.RegexStr = "\\|MAPREDUCE:\\|" - default: - c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", query.Table) - } + c.RegexStr = maprRegexForQuery(query) } func (c *MaprClient) reportDelay(query *mapr.Query, rampUp bool) time.Duration { diff --git a/internal/clients/query_regex.go b/internal/clients/query_regex.go new file mode 100644 index 0000000..9ccd233 --- /dev/null +++ b/internal/clients/query_regex.go @@ -0,0 +1,30 @@ +package clients + +import ( + "fmt" + + "github.com/mimecast/dtail/internal/mapr" +) + +func maprRegexForQuery(query *mapr.Query) string { + if query == nil { + return "." + } + + switch query.Table { + case "", ".": + return "." + case "*": + return "\\|MAPREDUCE:\\|" + default: + return fmt.Sprintf("\\|MAPREDUCE:%s\\|", query.Table) + } +} + +func maprRegexFromQueryString(queryStr string) (*mapr.Query, string, error) { + query, err := mapr.NewQuery(queryStr) + if err != nil { + return nil, "", err + } + return query, maprRegexForQuery(query), nil +} |
