summaryrefslogtreecommitdiff
path: root/integrationtests/interactive_runtime_query_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'integrationtests/interactive_runtime_query_test.go')
-rw-r--r--integrationtests/interactive_runtime_query_test.go312
1 files changed, 312 insertions, 0 deletions
diff --git a/integrationtests/interactive_runtime_query_test.go b/integrationtests/interactive_runtime_query_test.go
new file mode 100644
index 0000000..213c877
--- /dev/null
+++ b/integrationtests/interactive_runtime_query_test.go
@@ -0,0 +1,312 @@
+package integrationtests
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+type interactiveStep struct {
+ Delay time.Duration
+ Input string
+}
+
+type processOutputCollector struct {
+ mu sync.Mutex
+ lines []string
+}
+
+func TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches(t *testing.T) {
+ skipIfNotIntegrationTest(t)
+
+ testLogger := NewTestLogger("TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches")
+ defer testLogger.WriteLogFile()
+ cleanupTmpFiles(t)
+
+ ctx, cancel := createTestContextWithTimeout(t)
+ ctx = WithTestLogger(ctx, testLogger)
+ defer cancel()
+
+ followFile := "interactive_dtail_reload.tmp"
+ if err := os.WriteFile(followFile, nil, 0600); err != nil {
+ t.Fatalf("unable to create follow file: %v", err)
+ }
+ cleanupFiles(t, followFile, "interactive_dtail_reload.stdout.tmp")
+
+ port := getUniquePortNumber()
+ serverStdout, serverStderr, _, err := startCommand(ctx, t, "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "debug",
+ "--bindAddress", "localhost",
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Fatalf("start dserver: %v", err)
+ }
+ serverLogs := startProcessOutputCollector(ctx, serverStdout, serverStderr)
+
+ writerDone := make(chan error, 1)
+ go func() {
+ writerDone <- appendLinesOnSchedule(ctx, followFile, []interactiveStep{
+ {Delay: 200 * time.Millisecond, Input: "ERROR initial"},
+ {Delay: 1200 * time.Millisecond, Input: "ERROR late"},
+ {Delay: 1400 * time.Millisecond, Input: "WARN live"},
+ })
+ }()
+
+ clientOutput, err := runInteractivePTYCommand(ctx, []string{
+ "../dtail",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--servers", fmt.Sprintf("localhost:%d", port),
+ "--files", followFile,
+ "--grep", "ERROR",
+ "--plain",
+ "--trustAllHosts",
+ "--interactive-query",
+ }, []interactiveStep{
+ {Delay: 700 * time.Millisecond, Input: ":reload --grep WARN"},
+ {Delay: 1700 * time.Millisecond, Input: ":quit"},
+ })
+ if err != nil {
+ t.Fatalf("run interactive dtail: %v\noutput:\n%s", err, clientOutput)
+ }
+
+ if err := <-writerDone; err != nil {
+ t.Fatalf("write follow file: %v", err)
+ }
+
+ if !strings.Contains(clientOutput, "ERROR initial") {
+ t.Fatalf("expected initial ERROR line in output:\n%s", clientOutput)
+ }
+ if !strings.Contains(clientOutput, "WARN live") {
+ t.Fatalf("expected WARN line after reload in output:\n%s", clientOutput)
+ }
+ if strings.Contains(clientOutput, "ERROR late") {
+ t.Fatalf("unexpected stale ERROR line after reload:\n%s", clientOutput)
+ }
+ if !strings.Contains(clientOutput, "reload applied successfully") {
+ t.Fatalf("expected reload success message in output:\n%s", clientOutput)
+ }
+ if countSubstring(serverLogs.snapshot(), "Creating new server handler") != 1 {
+ t.Fatalf("expected one SSH session on the server, logs:\n%s", strings.Join(serverLogs.snapshot(), "\n"))
+ }
+}
+
+func TestDGrepInteractiveReloadReusesSessionAfterCompletedRead(t *testing.T) {
+ skipIfNotIntegrationTest(t)
+
+ testLogger := NewTestLogger("TestDGrepInteractiveReloadReusesSessionAfterCompletedRead")
+ defer testLogger.WriteLogFile()
+ cleanupTmpFiles(t)
+
+ ctx, cancel := createTestContextWithTimeout(t)
+ ctx = WithTestLogger(ctx, testLogger)
+ defer cancel()
+
+ inputFile := "interactive_dgrep_reload.tmp"
+ if err := os.WriteFile(inputFile, []byte("ERROR first\nWARN second\n"), 0600); err != nil {
+ t.Fatalf("unable to create input file: %v", err)
+ }
+ cleanupFiles(t, inputFile, "interactive_dgrep_reload.stdout.tmp")
+
+ port := getUniquePortNumber()
+ serverStdout, serverStderr, _, err := startCommand(ctx, t, "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "debug",
+ "--bindAddress", "localhost",
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Fatalf("start dserver: %v", err)
+ }
+ serverLogs := startProcessOutputCollector(ctx, serverStdout, serverStderr)
+
+ clientOutput, err := runInteractivePTYCommand(ctx, []string{
+ "../dgrep",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--servers", fmt.Sprintf("localhost:%d", port),
+ "--files", inputFile,
+ "--grep", "ERROR",
+ "--plain",
+ "--trustAllHosts",
+ "--interactive-query",
+ }, []interactiveStep{
+ {Delay: 400 * time.Millisecond, Input: ":reload --grep WARN"},
+ {Delay: 1000 * time.Millisecond, Input: ":quit"},
+ })
+ if err != nil {
+ t.Fatalf("run interactive dgrep: %v\noutput:\n%s", err, clientOutput)
+ }
+
+ if !strings.Contains(clientOutput, "ERROR first") {
+ t.Fatalf("expected initial grep result in output:\n%s", clientOutput)
+ }
+ if !strings.Contains(clientOutput, "WARN second") {
+ t.Fatalf("expected reloaded grep result in output:\n%s", clientOutput)
+ }
+ if !strings.Contains(clientOutput, "reload applied successfully") {
+ t.Fatalf("expected reload success message in output:\n%s", clientOutput)
+ }
+ if countSubstring(serverLogs.snapshot(), "Creating new server handler") != 1 {
+ t.Fatalf("expected one SSH session on the server, logs:\n%s", strings.Join(serverLogs.snapshot(), "\n"))
+ }
+}
+
+func startProcessOutputCollector(ctx context.Context, stdoutCh, stderrCh <-chan string) *processOutputCollector {
+ collector := &processOutputCollector{}
+ collect := func(ch <-chan string) {
+ for {
+ select {
+ case line, ok := <-ch:
+ if !ok {
+ return
+ }
+ collector.append(line)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ go collect(stdoutCh)
+ go collect(stderrCh)
+ return collector
+}
+
+func (c *processOutputCollector) append(line string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.lines = append(c.lines, line)
+}
+
+func (c *processOutputCollector) snapshot() []string {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ out := make([]string, len(c.lines))
+ copy(out, c.lines)
+ return out
+}
+
+func appendLinesOnSchedule(ctx context.Context, path string, steps []interactiveStep) error {
+ fd, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0600)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ start := time.Now()
+ for _, step := range steps {
+ wait := time.Until(start.Add(step.Delay))
+ if wait > 0 {
+ timer := time.NewTimer(wait)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return ctx.Err()
+ case <-timer.C:
+ }
+ }
+
+ if _, err := fd.WriteString(step.Input + "\n"); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func runInteractivePTYCommand(ctx context.Context, argv []string, steps []interactiveStep) (string, error) {
+ if _, err := exec.LookPath("python3"); err != nil {
+ return "", fmt.Errorf("python3 is required for PTY integration tests: %w", err)
+ }
+
+ script := `
+import json
+import os
+import pty
+import sys
+import threading
+import time
+
+argv = json.loads(os.environ["DTAIL_PTY_ARGV"])
+steps = json.loads(os.environ["DTAIL_PTY_STEPS"])
+
+pid, fd = pty.fork()
+if pid == 0:
+ os.execv(argv[0], argv)
+
+def writer():
+ for step in steps:
+ time.sleep(step["delay_ms"] / 1000.0)
+ data = step["input"]
+ if not data.endswith("\n"):
+ data += "\n"
+ os.write(fd, data.encode("utf-8"))
+
+threading.Thread(target=writer, daemon=True).start()
+
+output = bytearray()
+while True:
+ try:
+ chunk = os.read(fd, 4096)
+ except OSError:
+ break
+ if not chunk:
+ break
+ output.extend(chunk)
+
+_, status = os.waitpid(pid, 0)
+sys.stdout.buffer.write(output)
+if os.WIFEXITED(status):
+ sys.exit(os.WEXITSTATUS(status))
+if os.WIFSIGNALED(status):
+ sys.exit(128 + os.WTERMSIG(status))
+sys.exit(1)
+`
+
+ encodedSteps := make([]map[string]any, 0, len(steps))
+ for _, step := range steps {
+ encodedSteps = append(encodedSteps, map[string]any{
+ "delay_ms": step.Delay.Milliseconds(),
+ "input": step.Input,
+ })
+ }
+
+ argvPayload, err := json.Marshal(argv)
+ if err != nil {
+ return "", err
+ }
+ stepsPayload, err := json.Marshal(encodedSteps)
+ if err != nil {
+ return "", err
+ }
+
+ cmd := exec.CommandContext(ctx, "python3", "-c", script)
+ cmd.Env = append(os.Environ(),
+ "DTAIL_PTY_ARGV="+string(argvPayload),
+ "DTAIL_PTY_STEPS="+string(stepsPayload),
+ )
+ output, err := cmd.CombinedOutput()
+ return string(output), err
+}
+
+func countSubstring(lines []string, needle string) int {
+ count := 0
+ for _, line := range lines {
+ if strings.Contains(line, needle) {
+ count++
+ }
+ }
+ return count
+}