summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 09:50:48 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 09:50:48 +0200
commit7ce4aee16c0223cc15c9dd8d9024120069500c65 (patch)
treec702ae1dc708e81911c741b7e2312289ab389a4f
parentd8f88d455990636bb797643dee7d39a95bbbd62c (diff)
task 57753d28: add interactive query control
-rw-r--r--internal/clients/baseclient.go7
-rw-r--r--internal/clients/interactive_control.go368
-rw-r--r--internal/clients/interactive_control_test.go223
-rw-r--r--internal/clients/maprclient.go14
-rw-r--r--internal/clients/query_regex.go30
5 files changed, 629 insertions, 13 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index de76bf1..0e67ba4 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -90,6 +90,13 @@ func (c *baseClient) makeConnections(maker maker) {
}
func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
+ if c.Args.InteractiveQuery {
+ return c.startInteractiveControl(ctx, statsCh)
+ }
+ return c.runConnections(ctx, statsCh)
+}
+
+func (c *baseClient) runConnections(ctx context.Context, statsCh <-chan string) (status int) {
dlog.Client.Trace("Starting base client")
// Can be nil when serverless.
if c.hostKeyCallback != nil {
diff --git a/internal/clients/interactive_control.go b/internal/clients/interactive_control.go
new file mode 100644
index 0000000..56fd85f
--- /dev/null
+++ b/internal/clients/interactive_control.go
@@ -0,0 +1,368 @@
+package clients
+
+import (
+ "bufio"
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/mimecast/dtail/internal/clients/connectors"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/omode"
+)
+
+const interactiveControlTimeout = 2 * time.Second
+
+type interactiveCommand struct {
+ spec SessionSpec
+ kind string
+ next config.Args
+}
+
+func (c *baseClient) startInteractiveControl(ctx context.Context, statsCh <-chan string) int {
+ controlTTY, err := os.OpenFile(c.Args.ControlTTYPath, os.O_RDWR, 0)
+ if err != nil {
+ dlog.Client.Error("Unable to open interactive query control TTY", c.Args.ControlTTYPath, err)
+ return 1
+ }
+ defer controlTTY.Close()
+
+ runCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ statusCh := make(chan int, 1)
+ go func() {
+ statusCh <- c.runConnections(runCtx, statsCh)
+ }()
+
+ controlErrCh := make(chan error, 1)
+ go func() {
+ controlErrCh <- c.runInteractiveControl(runCtx, cancel, controlTTY)
+ }()
+
+ select {
+ case status := <-statusCh:
+ cancel()
+ <-controlErrCh
+ return status
+ case err := <-controlErrCh:
+ if err != nil {
+ dlog.Client.Warn("Interactive query control stopped", err)
+ }
+ cancel()
+ return <-statusCh
+ }
+}
+
+func (c *baseClient) runInteractiveControl(ctx context.Context, cancel context.CancelFunc, tty *os.File) error {
+ if _, err := fmt.Fprintf(tty,
+ "Interactive query control enabled. Commands: :reload <flags>, :show, :help, :quit\n"); err != nil {
+ return err
+ }
+
+ reader := bufio.NewScanner(tty)
+ reader.Buffer(make([]byte, 0, 1024), 1024*1024)
+
+ go func() {
+ <-ctx.Done()
+ _ = tty.Close()
+ }()
+
+ for reader.Scan() {
+ line := strings.TrimSpace(reader.Text())
+ if line == "" {
+ continue
+ }
+
+ command, err := parseInteractiveCommand(c.Args, line)
+ if err != nil {
+ if writeErr := writeControlLine(tty, "interactive query error: "+err.Error()); writeErr != nil {
+ return writeErr
+ }
+ continue
+ }
+
+ switch command.kind {
+ case "help":
+ if err := c.writeInteractiveHelp(tty); err != nil {
+ return err
+ }
+ case "show":
+ if err := c.writeInteractiveState(tty); err != nil {
+ return err
+ }
+ case "quit":
+ if err := writeControlLine(tty, "quitting interactive session"); err != nil {
+ return err
+ }
+ cancel()
+ return nil
+ case "reload":
+ if err := c.applyInteractiveReload(command.next, command.spec); err != nil {
+ if writeErr := writeControlLine(tty, "reload failed: "+err.Error()); writeErr != nil {
+ return writeErr
+ }
+ continue
+ }
+ if err := writeControlLine(tty, "reload applied successfully"); err != nil {
+ return err
+ }
+ default:
+ if err := writeControlLine(tty, "unsupported command"); err != nil {
+ return err
+ }
+ }
+ }
+
+ if err := reader.Err(); err != nil && ctx.Err() == nil && !errors.Is(err, os.ErrClosed) {
+ return err
+ }
+ return nil
+}
+
+func (c *baseClient) applyInteractiveReload(nextArgs config.Args, nextSpec SessionSpec) error {
+ if len(c.connections) == 0 {
+ return errors.New("no active connections")
+ }
+
+ var unsupported []string
+ for _, conn := range c.connections {
+ if !conn.SupportsQueryUpdates(interactiveControlTimeout) {
+ unsupported = append(unsupported, conn.Server())
+ }
+ }
+ if len(unsupported) > 0 {
+ return fmt.Errorf("%w: %s", connectors.ErrSessionUnsupported, strings.Join(unsupported, ", "))
+ }
+
+ var generation uint64
+ for _, conn := range c.connections {
+ if err := conn.ApplySessionSpec(nextSpec, interactiveControlTimeout); err != nil {
+ return fmt.Errorf("%s: %w", conn.Server(), err)
+ }
+
+ _, committedGeneration, ok := conn.CommittedSession()
+ if !ok || committedGeneration == 0 {
+ return fmt.Errorf("%s: missing committed session generation", conn.Server())
+ }
+ if generation == 0 {
+ generation = committedGeneration
+ continue
+ }
+ if generation != committedGeneration {
+ return fmt.Errorf("mismatched committed generations: got %d and %d", generation, committedGeneration)
+ }
+ }
+
+ if committer, ok := c.maker.(sessionCommitter); ok {
+ if err := committer.commitSessionSpec(nextSpec, generation); err != nil {
+ return fmt.Errorf("commit session state: %w", err)
+ }
+ }
+
+ c.Args = nextArgs
+ c.sessionSpec = nextSpec
+
+ return nil
+}
+
+func (c *baseClient) writeInteractiveHelp(writer io.Writer) error {
+ return writeControlLine(writer,
+ "Commands: :reload <flags>, :show, :help, :quit. Use quotes around multi-word values such as --query \"select count(status) from stats\".")
+}
+
+func (c *baseClient) writeInteractiveState(writer io.Writer) error {
+ spec := c.sessionSpec
+ ready := 0
+ for _, conn := range c.connections {
+ if conn.SupportsQueryUpdates(0) {
+ ready++
+ }
+ }
+
+ line := fmt.Sprintf(
+ "mode=%s files=%s query=%q regex=%q options=%q timeout=%d capable=%d/%d",
+ c.Args.Mode,
+ strings.Join(spec.Files, ","),
+ spec.Query,
+ spec.Regex,
+ spec.Options,
+ spec.Timeout,
+ ready,
+ len(c.connections),
+ )
+ return writeControlLine(writer, line)
+}
+
+func parseInteractiveCommand(current config.Args, line string) (interactiveCommand, error) {
+ line = strings.TrimSpace(line)
+
+ switch {
+ case line == ":help":
+ return interactiveCommand{kind: "help"}, nil
+ case line == ":show":
+ return interactiveCommand{kind: "show"}, nil
+ case line == ":quit":
+ return interactiveCommand{kind: "quit"}, nil
+ case strings.HasPrefix(line, ":reload"):
+ remainder := strings.TrimSpace(strings.TrimPrefix(line, ":reload"))
+ if remainder == "" {
+ return interactiveCommand{}, errors.New("reload requires flags to change")
+ }
+ tokens, err := splitInteractiveArgs(remainder)
+ if err != nil {
+ return interactiveCommand{}, err
+ }
+ nextArgs, err := parseInteractiveReloadArgs(current, tokens)
+ if err != nil {
+ return interactiveCommand{}, err
+ }
+ nextSpec, err := buildInteractiveSessionSpec(nextArgs)
+ if err != nil {
+ return interactiveCommand{}, err
+ }
+ return interactiveCommand{
+ kind: "reload",
+ next: nextArgs,
+ spec: nextSpec,
+ }, nil
+ default:
+ return interactiveCommand{}, fmt.Errorf("unknown command %q", line)
+ }
+}
+
+func parseInteractiveReloadArgs(current config.Args, tokens []string) (config.Args, error) {
+ next := current
+ fs := flag.NewFlagSet("reload", flag.ContinueOnError)
+ fs.SetOutput(io.Discard)
+
+ fs.StringVar(&next.What, "files", current.What, "File(s) to read")
+ fs.BoolVar(&next.Plain, "plain", current.Plain, "Plain output mode")
+ fs.BoolVar(&next.Quiet, "quiet", current.Quiet, "Quiet output mode")
+ fs.IntVar(&next.Timeout, "timeout", current.Timeout, "Max time dtail server will collect data until disconnection")
+
+ switch {
+ case isInteractiveQueryMode(current):
+ fs.StringVar(&next.QueryStr, "query", current.QueryStr, "Map reduce query")
+ case current.Mode == omode.GrepClient || current.Mode == omode.TailClient:
+ var grep string
+ fs.StringVar(&next.RegexStr, "regex", current.RegexStr, "Regular expression")
+ fs.StringVar(&grep, "grep", "", "Alias for -regex")
+ fs.BoolVar(&next.RegexInvert, "invert", current.RegexInvert, "Invert regex")
+ fs.IntVar(&next.LContext.BeforeContext, "before", current.LContext.BeforeContext, "Leading context lines")
+ fs.IntVar(&next.LContext.AfterContext, "after", current.LContext.AfterContext, "Trailing context lines")
+ fs.IntVar(&next.LContext.MaxCount, "max", current.LContext.MaxCount, "Maximum number of matches")
+ if err := fs.Parse(tokens); err != nil {
+ return current, err
+ }
+ if grep != "" {
+ next.RegexStr = grep
+ }
+ if len(fs.Args()) > 0 {
+ return current, fmt.Errorf("unexpected arguments: %s", strings.Join(fs.Args(), " "))
+ }
+ return next, nil
+ case current.Mode == omode.CatClient:
+ default:
+ return current, fmt.Errorf("interactive reload is unsupported for mode %s", current.Mode)
+ }
+
+ if err := fs.Parse(tokens); err != nil {
+ return current, err
+ }
+ if len(fs.Args()) > 0 {
+ return current, fmt.Errorf("unexpected arguments: %s", strings.Join(fs.Args(), " "))
+ }
+ return next, nil
+}
+
+func buildInteractiveSessionSpec(args config.Args) (SessionSpec, error) {
+ normalizedArgs, err := normalizeInteractiveArgs(args)
+ if err != nil {
+ return SessionSpec{}, err
+ }
+
+ spec := NewSessionSpec(normalizedArgs)
+ if _, err := spec.Commands(); err != nil {
+ return SessionSpec{}, err
+ }
+ return spec, nil
+}
+
+func normalizeInteractiveArgs(args config.Args) (config.Args, error) {
+ if !isInteractiveQueryMode(args) {
+ return args, nil
+ }
+
+ _, regexValue, err := maprRegexFromQueryString(args.QueryStr)
+ if err != nil {
+ return args, err
+ }
+ args.RegexStr = regexValue
+ return args, nil
+}
+
+func isInteractiveQueryMode(args config.Args) bool {
+ return strings.TrimSpace(args.QueryStr) != "" &&
+ (args.Mode == omode.MapClient || args.Mode == omode.TailClient)
+}
+
+func splitInteractiveArgs(raw string) ([]string, error) {
+ var (
+ tokens []string
+ current strings.Builder
+ inQuote rune
+ escaped bool
+ )
+
+ flush := func() {
+ if current.Len() == 0 {
+ return
+ }
+ tokens = append(tokens, current.String())
+ current.Reset()
+ }
+
+ for _, r := range raw {
+ switch {
+ case escaped:
+ current.WriteRune(r)
+ escaped = false
+ case r == '\\':
+ escaped = true
+ case inQuote != 0:
+ if r == inQuote {
+ inQuote = 0
+ continue
+ }
+ current.WriteRune(r)
+ case r == '\'' || r == '"':
+ inQuote = r
+ case r == ' ' || r == '\t':
+ flush()
+ default:
+ current.WriteRune(r)
+ }
+ }
+
+ if escaped {
+ return nil, errors.New("unterminated escape sequence")
+ }
+ if inQuote != 0 {
+ return nil, errors.New("unterminated quoted string")
+ }
+
+ flush()
+ return tokens, nil
+}
+
+func writeControlLine(writer io.Writer, message string) error {
+ _, err := fmt.Fprintf(writer, "%s\n", message)
+ return err
+}
diff --git a/internal/clients/interactive_control_test.go b/internal/clients/interactive_control_test.go
new file mode 100644
index 0000000..1cc31ec
--- /dev/null
+++ b/internal/clients/interactive_control_test.go
@@ -0,0 +1,223 @@
+package clients
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/mimecast/dtail/internal/clients/connectors"
+ "github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/omode"
+ sessionspec "github.com/mimecast/dtail/internal/session"
+)
+
+func TestParseInteractiveCommandForGrepReload(t *testing.T) {
+ current := config.Args{
+ Mode: omode.GrepClient,
+ What: "/var/log/app.log",
+ RegexStr: "ERROR",
+ }
+
+ command, err := parseInteractiveCommand(current,
+ `:reload --grep WARN --before 2 --after 3 --max 4 --invert --files "/tmp/other.log" --plain --quiet --timeout 7`)
+ if err != nil {
+ t.Fatalf("parseInteractiveCommand() error = %v", err)
+ }
+
+ if command.kind != "reload" {
+ t.Fatalf("command kind = %q, want reload", command.kind)
+ }
+ if command.next.What != "/tmp/other.log" {
+ t.Fatalf("files = %q, want /tmp/other.log", command.next.What)
+ }
+ if command.next.RegexStr != "WARN" {
+ t.Fatalf("regex = %q, want WARN", command.next.RegexStr)
+ }
+ if !command.next.RegexInvert || !command.next.Plain || !command.next.Quiet {
+ t.Fatalf("expected invert/plain/quiet flags to be set: %#v", command.next)
+ }
+ if command.next.LContext.BeforeContext != 2 || command.next.LContext.AfterContext != 3 || command.next.LContext.MaxCount != 4 {
+ t.Fatalf("unexpected context values: %#v", command.next.LContext)
+ }
+ if command.spec.Regex != "WARN" {
+ t.Fatalf("session spec regex = %q, want WARN", command.spec.Regex)
+ }
+}
+
+func TestParseInteractiveCommandForMapReloadDerivesRegex(t *testing.T) {
+ current := config.Args{
+ Mode: omode.MapClient,
+ What: "/var/log/app.log",
+ QueryStr: "select count(status) from stats group by status",
+ }
+
+ command, err := parseInteractiveCommand(current,
+ `:reload --query "select count(status) from warnings group by status" --files /tmp/new.log --plain --timeout 9`)
+ if err != nil {
+ t.Fatalf("parseInteractiveCommand() error = %v", err)
+ }
+
+ if command.kind != "reload" {
+ t.Fatalf("command kind = %q, want reload", command.kind)
+ }
+ if command.next.QueryStr != "select count(status) from warnings group by status" {
+ t.Fatalf("query = %q", command.next.QueryStr)
+ }
+ if command.spec.Regex != "\\|MAPREDUCE:WARNINGS\\|" {
+ t.Fatalf("session spec regex = %q, want WARNINGS table regex", command.spec.Regex)
+ }
+}
+
+func TestParseInteractiveCommandRejectsUnterminatedQuotes(t *testing.T) {
+ current := config.Args{
+ Mode: omode.MapClient,
+ QueryStr: "select count(status) from stats group by status",
+ }
+
+ if _, err := parseInteractiveCommand(current, `:reload --query "select count(status) from stats`); err == nil {
+ t.Fatalf("expected parseInteractiveCommand() to reject unterminated quoted input")
+ }
+}
+
+func TestApplyInteractiveReloadRejectsUnsupportedConnections(t *testing.T) {
+ client := &baseClient{
+ Args: config.Args{
+ Mode: omode.GrepClient,
+ What: "/var/log/app.log",
+ RegexStr: "ERROR",
+ },
+ sessionSpec: SessionSpec{
+ Mode: omode.GrepClient,
+ Files: []string{"/var/log/app.log"},
+ Regex: "ERROR",
+ },
+ connections: []connectors.Connector{
+ &interactiveReloadConnector{server: "srv1", supported: false},
+ },
+ }
+
+ err := client.applyInteractiveReload(config.Args{
+ Mode: omode.GrepClient,
+ What: "/tmp/next.log",
+ RegexStr: "WARN",
+ }, SessionSpec{
+ Mode: omode.GrepClient,
+ Files: []string{"/tmp/next.log"},
+ Regex: "WARN",
+ })
+ if !errors.Is(err, connectors.ErrSessionUnsupported) {
+ t.Fatalf("expected ErrSessionUnsupported, got %v", err)
+ }
+ if client.Args.What != "/var/log/app.log" || client.sessionSpec.Regex != "ERROR" {
+ t.Fatalf("client state changed on unsupported reload: args=%#v spec=%#v", client.Args, client.sessionSpec)
+ }
+}
+
+func TestApplyInteractiveReloadCommitsSharedState(t *testing.T) {
+ connA := &interactiveReloadConnector{server: "srv1", supported: true, generation: 4}
+ connB := &interactiveReloadConnector{server: "srv2", supported: true, generation: 4}
+ maker := &interactiveReloadMaker{}
+
+ client := &baseClient{
+ Args: config.Args{
+ Mode: omode.MapClient,
+ What: "/var/log/app.log",
+ QueryStr: "select count(status) from stats group by status",
+ },
+ sessionSpec: SessionSpec{
+ Mode: omode.MapClient,
+ Files: []string{"/var/log/app.log"},
+ Query: "select count(status) from stats group by status",
+ Regex: "\\|MAPREDUCE:STATS\\|",
+ },
+ connections: []connectors.Connector{connA, connB},
+ maker: maker,
+ }
+
+ nextArgs := config.Args{
+ Mode: omode.MapClient,
+ What: "/tmp/new.log",
+ QueryStr: "select count(status) from warnings group by status",
+ Plain: true,
+ Timeout: 5,
+ }
+ nextSpec := SessionSpec{
+ Mode: omode.MapClient,
+ Files: []string{"/tmp/new.log"},
+ Query: nextArgs.QueryStr,
+ Regex: "\\|MAPREDUCE:WARNINGS\\|",
+ Timeout: 5,
+ }
+
+ if err := client.applyInteractiveReload(nextArgs, nextSpec); err != nil {
+ t.Fatalf("applyInteractiveReload() error = %v", err)
+ }
+
+ if client.Args.What != "/tmp/new.log" || client.sessionSpec.Query != nextArgs.QueryStr {
+ t.Fatalf("client state not committed: args=%#v spec=%#v", client.Args, client.sessionSpec)
+ }
+ if len(maker.commits) != 1 {
+ t.Fatalf("expected one sessionCommitter call, got %d", len(maker.commits))
+ }
+ if maker.commits[0].generation != 4 || maker.commits[0].spec.Query != nextArgs.QueryStr {
+ t.Fatalf("unexpected commit payload: %#v", maker.commits[0])
+ }
+ if connA.appliedSpec.Query != nextArgs.QueryStr || connB.appliedSpec.Query != nextArgs.QueryStr {
+ t.Fatalf("connectors did not receive new session spec: %#v %#v", connA.appliedSpec, connB.appliedSpec)
+ }
+}
+
+type interactiveReloadConnector struct {
+ appliedSpec sessionspec.Spec
+ applyErr error
+ generation uint64
+ server string
+ supported bool
+}
+
+func (*interactiveReloadConnector) Start(context.Context, context.CancelFunc, chan struct{}, chan struct{}) {
+}
+
+func (c *interactiveReloadConnector) Server() string { return c.server }
+
+func (*interactiveReloadConnector) Handler() handlers.Handler { return nil }
+
+func (c *interactiveReloadConnector) SupportsQueryUpdates(time.Duration) bool { return c.supported }
+
+func (c *interactiveReloadConnector) ApplySessionSpec(spec sessionspec.Spec, _ time.Duration) error {
+ if c.applyErr != nil {
+ return c.applyErr
+ }
+ c.appliedSpec = spec
+ return nil
+}
+
+func (c *interactiveReloadConnector) CommittedSession() (sessionspec.Spec, uint64, bool) {
+ if c.generation == 0 {
+ return sessionspec.Spec{}, 0, false
+ }
+ return c.appliedSpec, c.generation, true
+}
+
+type interactiveReloadMaker struct {
+ commits []interactiveReloadCommit
+}
+
+type interactiveReloadCommit struct {
+ generation uint64
+ spec SessionSpec
+}
+
+func (*interactiveReloadMaker) makeHandler(string) handlers.Handler { return nil }
+
+func (*interactiveReloadMaker) makeCommands() []string { return nil }
+
+func (m *interactiveReloadMaker) commitSessionSpec(spec SessionSpec, generation uint64) error {
+ m.commits = append(m.commits, interactiveReloadCommit{
+ generation: generation,
+ spec: spec,
+ })
+ return nil
+}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 68770ec..3e46ecf 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -258,19 +258,7 @@ func (c *MaprClient) isCumulative(query *mapr.Query) bool {
}
func (c *MaprClient) setRegexForQuery(query *mapr.Query) {
- if query == nil {
- c.RegexStr = "."
- return
- }
-
- switch query.Table {
- case "", ".":
- c.RegexStr = "."
- case "*":
- c.RegexStr = "\\|MAPREDUCE:\\|"
- default:
- c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", query.Table)
- }
+ c.RegexStr = maprRegexForQuery(query)
}
func (c *MaprClient) reportDelay(query *mapr.Query, rampUp bool) time.Duration {
diff --git a/internal/clients/query_regex.go b/internal/clients/query_regex.go
new file mode 100644
index 0000000..9ccd233
--- /dev/null
+++ b/internal/clients/query_regex.go
@@ -0,0 +1,30 @@
+package clients
+
+import (
+ "fmt"
+
+ "github.com/mimecast/dtail/internal/mapr"
+)
+
+func maprRegexForQuery(query *mapr.Query) string {
+ if query == nil {
+ return "."
+ }
+
+ switch query.Table {
+ case "", ".":
+ return "."
+ case "*":
+ return "\\|MAPREDUCE:\\|"
+ default:
+ return fmt.Sprintf("\\|MAPREDUCE:%s\\|", query.Table)
+ }
+}
+
+func maprRegexFromQueryString(queryStr string) (*mapr.Query, string, error) {
+ query, err := mapr.NewQuery(queryStr)
+ if err != nil {
+ return nil, "", err
+ }
+ return query, maprRegexForQuery(query), nil
+}