diff options
Diffstat (limited to 'integrationtests/interactive_runtime_query_test.go')
| -rw-r--r-- | integrationtests/interactive_runtime_query_test.go | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/integrationtests/interactive_runtime_query_test.go b/integrationtests/interactive_runtime_query_test.go new file mode 100644 index 0000000..213c877 --- /dev/null +++ b/integrationtests/interactive_runtime_query_test.go @@ -0,0 +1,312 @@ +package integrationtests + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "strings" + "sync" + "testing" + "time" +) + +type interactiveStep struct { + Delay time.Duration + Input string +} + +type processOutputCollector struct { + mu sync.Mutex + lines []string +} + +func TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches(t *testing.T) { + skipIfNotIntegrationTest(t) + + testLogger := NewTestLogger("TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches") + defer testLogger.WriteLogFile() + cleanupTmpFiles(t) + + ctx, cancel := createTestContextWithTimeout(t) + ctx = WithTestLogger(ctx, testLogger) + defer cancel() + + followFile := "interactive_dtail_reload.tmp" + if err := os.WriteFile(followFile, nil, 0600); err != nil { + t.Fatalf("unable to create follow file: %v", err) + } + cleanupFiles(t, followFile, "interactive_dtail_reload.stdout.tmp") + + port := getUniquePortNumber() + serverStdout, serverStderr, _, err := startCommand(ctx, t, "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "debug", + "--bindAddress", "localhost", + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + t.Fatalf("start dserver: %v", err) + } + serverLogs := startProcessOutputCollector(ctx, serverStdout, serverStderr) + + writerDone := make(chan error, 1) + go func() { + writerDone <- appendLinesOnSchedule(ctx, followFile, []interactiveStep{ + {Delay: 200 * time.Millisecond, Input: "ERROR initial"}, + {Delay: 1200 * time.Millisecond, Input: "ERROR late"}, + {Delay: 1400 * time.Millisecond, Input: "WARN live"}, + }) + }() + + clientOutput, err := runInteractivePTYCommand(ctx, []string{ + "../dtail", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--servers", fmt.Sprintf("localhost:%d", port), + "--files", followFile, + "--grep", "ERROR", + "--plain", + "--trustAllHosts", + "--interactive-query", + }, []interactiveStep{ + {Delay: 700 * time.Millisecond, Input: ":reload --grep WARN"}, + {Delay: 1700 * time.Millisecond, Input: ":quit"}, + }) + if err != nil { + t.Fatalf("run interactive dtail: %v\noutput:\n%s", err, clientOutput) + } + + if err := <-writerDone; err != nil { + t.Fatalf("write follow file: %v", err) + } + + if !strings.Contains(clientOutput, "ERROR initial") { + t.Fatalf("expected initial ERROR line in output:\n%s", clientOutput) + } + if !strings.Contains(clientOutput, "WARN live") { + t.Fatalf("expected WARN line after reload in output:\n%s", clientOutput) + } + if strings.Contains(clientOutput, "ERROR late") { + t.Fatalf("unexpected stale ERROR line after reload:\n%s", clientOutput) + } + if !strings.Contains(clientOutput, "reload applied successfully") { + t.Fatalf("expected reload success message in output:\n%s", clientOutput) + } + if countSubstring(serverLogs.snapshot(), "Creating new server handler") != 1 { + t.Fatalf("expected one SSH session on the server, logs:\n%s", strings.Join(serverLogs.snapshot(), "\n")) + } +} + +func TestDGrepInteractiveReloadReusesSessionAfterCompletedRead(t *testing.T) { + skipIfNotIntegrationTest(t) + + testLogger := NewTestLogger("TestDGrepInteractiveReloadReusesSessionAfterCompletedRead") + defer testLogger.WriteLogFile() + cleanupTmpFiles(t) + + ctx, cancel := createTestContextWithTimeout(t) + ctx = WithTestLogger(ctx, testLogger) + defer cancel() + + inputFile := "interactive_dgrep_reload.tmp" + if err := os.WriteFile(inputFile, []byte("ERROR first\nWARN second\n"), 0600); err != nil { + t.Fatalf("unable to create input file: %v", err) + } + cleanupFiles(t, inputFile, "interactive_dgrep_reload.stdout.tmp") + + port := getUniquePortNumber() + serverStdout, serverStderr, _, err := startCommand(ctx, t, "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "debug", + "--bindAddress", "localhost", + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + t.Fatalf("start dserver: %v", err) + } + serverLogs := startProcessOutputCollector(ctx, serverStdout, serverStderr) + + clientOutput, err := runInteractivePTYCommand(ctx, []string{ + "../dgrep", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--servers", fmt.Sprintf("localhost:%d", port), + "--files", inputFile, + "--grep", "ERROR", + "--plain", + "--trustAllHosts", + "--interactive-query", + }, []interactiveStep{ + {Delay: 400 * time.Millisecond, Input: ":reload --grep WARN"}, + {Delay: 1000 * time.Millisecond, Input: ":quit"}, + }) + if err != nil { + t.Fatalf("run interactive dgrep: %v\noutput:\n%s", err, clientOutput) + } + + if !strings.Contains(clientOutput, "ERROR first") { + t.Fatalf("expected initial grep result in output:\n%s", clientOutput) + } + if !strings.Contains(clientOutput, "WARN second") { + t.Fatalf("expected reloaded grep result in output:\n%s", clientOutput) + } + if !strings.Contains(clientOutput, "reload applied successfully") { + t.Fatalf("expected reload success message in output:\n%s", clientOutput) + } + if countSubstring(serverLogs.snapshot(), "Creating new server handler") != 1 { + t.Fatalf("expected one SSH session on the server, logs:\n%s", strings.Join(serverLogs.snapshot(), "\n")) + } +} + +func startProcessOutputCollector(ctx context.Context, stdoutCh, stderrCh <-chan string) *processOutputCollector { + collector := &processOutputCollector{} + collect := func(ch <-chan string) { + for { + select { + case line, ok := <-ch: + if !ok { + return + } + collector.append(line) + case <-ctx.Done(): + return + } + } + } + go collect(stdoutCh) + go collect(stderrCh) + return collector +} + +func (c *processOutputCollector) append(line string) { + c.mu.Lock() + defer c.mu.Unlock() + c.lines = append(c.lines, line) +} + +func (c *processOutputCollector) snapshot() []string { + c.mu.Lock() + defer c.mu.Unlock() + + out := make([]string, len(c.lines)) + copy(out, c.lines) + return out +} + +func appendLinesOnSchedule(ctx context.Context, path string, steps []interactiveStep) error { + fd, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return err + } + defer fd.Close() + + start := time.Now() + for _, step := range steps { + wait := time.Until(start.Add(step.Delay)) + if wait > 0 { + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } + + if _, err := fd.WriteString(step.Input + "\n"); err != nil { + return err + } + } + return nil +} + +func runInteractivePTYCommand(ctx context.Context, argv []string, steps []interactiveStep) (string, error) { + if _, err := exec.LookPath("python3"); err != nil { + return "", fmt.Errorf("python3 is required for PTY integration tests: %w", err) + } + + script := ` +import json +import os +import pty +import sys +import threading +import time + +argv = json.loads(os.environ["DTAIL_PTY_ARGV"]) +steps = json.loads(os.environ["DTAIL_PTY_STEPS"]) + +pid, fd = pty.fork() +if pid == 0: + os.execv(argv[0], argv) + +def writer(): + for step in steps: + time.sleep(step["delay_ms"] / 1000.0) + data = step["input"] + if not data.endswith("\n"): + data += "\n" + os.write(fd, data.encode("utf-8")) + +threading.Thread(target=writer, daemon=True).start() + +output = bytearray() +while True: + try: + chunk = os.read(fd, 4096) + except OSError: + break + if not chunk: + break + output.extend(chunk) + +_, status = os.waitpid(pid, 0) +sys.stdout.buffer.write(output) +if os.WIFEXITED(status): + sys.exit(os.WEXITSTATUS(status)) +if os.WIFSIGNALED(status): + sys.exit(128 + os.WTERMSIG(status)) +sys.exit(1) +` + + encodedSteps := make([]map[string]any, 0, len(steps)) + for _, step := range steps { + encodedSteps = append(encodedSteps, map[string]any{ + "delay_ms": step.Delay.Milliseconds(), + "input": step.Input, + }) + } + + argvPayload, err := json.Marshal(argv) + if err != nil { + return "", err + } + stepsPayload, err := json.Marshal(encodedSteps) + if err != nil { + return "", err + } + + cmd := exec.CommandContext(ctx, "python3", "-c", script) + cmd.Env = append(os.Environ(), + "DTAIL_PTY_ARGV="+string(argvPayload), + "DTAIL_PTY_STEPS="+string(stepsPayload), + ) + output, err := cmd.CombinedOutput() + return string(output), err +} + +func countSubstring(lines []string, needle string) int { + count := 0 + for _, line := range lines { + if strings.Contains(line, needle) { + count++ + } + } + return count +} |
