summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-20 13:51:06 +0300
committerPaul Buetow <paul@buetow.org>2025-06-20 13:51:06 +0300
commit4a8f1690c3e2e6aec34f22b516f0598c6a0da070 (patch)
tree82ecb4568df3e91c7958c2e9f491546e6f7e9701
parentb0a1e7928d5147d2a9fe0df710bf7c75a777e0d0 (diff)
Fix dcat/dgrep serverless mode to show REMOTE protocol formatrefactor-trail-1
- Add serverless flag to CatProcessor and GrepProcessor - Format output with REMOTE|hostname|transmittedPerc|count|sourceID|content in serverless mode - Use actual system hostname instead of "serverless" placeholder - Preserve plain mode behavior (no formatting when --plain is used) - Fix grep processor to properly separate multiple matched lines - Add shared getHostname utility function - Update tests to include serverless parameter This fixes the regression where dcat and dgrep in serverless mode were not showing the dtail protocol format with transmission info and status details. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
-rw-r--r--internal/io/fs/catprocessor.go63
-rw-r--r--internal/io/fs/directprocessor.go17
-rw-r--r--internal/io/fs/grepprocessor.go63
-rw-r--r--internal/io/fs/grepprocessor_test.go10
-rw-r--r--internal/io/fs/utils.go12
-rw-r--r--internal/server/handlers/colorwriter.go42
-rw-r--r--internal/server/handlers/readcommand.go26
7 files changed, 190 insertions, 43 deletions
diff --git a/internal/io/fs/catprocessor.go b/internal/io/fs/catprocessor.go
index 0c88114..533328b 100644
--- a/internal/io/fs/catprocessor.go
+++ b/internal/io/fs/catprocessor.go
@@ -2,19 +2,26 @@ package fs
import (
"context"
+ "fmt"
+ "github.com/mimecast/dtail/internal/protocol"
)
// CatProcessor handles cat-style output
type CatProcessor struct {
- plain bool
- hostname string
+ plain bool
+ hostname string
+ serverless bool
}
// NewCatProcessor creates a new cat processor
-func NewCatProcessor(plain, noColor bool, hostname string) *CatProcessor {
+func NewCatProcessor(plain, noColor bool, hostname string, serverless bool) *CatProcessor {
+ // Debug: log the parameters
+ // fmt.Fprintf(os.Stderr, "DEBUG CatProcessor: hostname='%s', serverless=%v, plain=%v\n", hostname, serverless, plain)
+
return &CatProcessor{
- plain: plain,
- hostname: hostname,
+ plain: plain,
+ hostname: hostname,
+ serverless: serverless,
}
}
@@ -28,7 +35,8 @@ func (cp *CatProcessor) Cleanup() error {
// ProcessLine processes a single line for cat output.
// In plain mode, it preserves the original line exactly including line endings.
-// In non-plain mode, it returns just the content - the baseHandler will format the protocol.
+// In non-plain mode in server context, it returns just the content - the baseHandler will format the protocol.
+// In non-plain mode in serverless context, it formats the output with REMOTE protocol.
// Returns the line content and true (cat always outputs all lines).
func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) {
// Update stats for matched line (cat always matches all lines)
@@ -36,14 +44,51 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
stats.updateLineMatched()
}
- // In both plain and non-plain modes, just return the line content
- // The baseHandler will handle protocol formatting for non-plain mode
+ // In plain mode, just return the line content
+ if cp.plain {
+ result := make([]byte, len(line))
+ copy(result, line)
+ return result, true
+ }
+
+ // In non-plain serverless mode, we need to format with REMOTE protocol
+ // since there's no server baseHandler to do it for us
+ if cp.serverless {
+ // Format exactly like original basehandler.go for non-plain mode
+ // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}
+ var transmittedPerc int
+ var count uint64
+ if stats != nil {
+ // For cat, we always transmit all matched lines, so transmittedPerc should be 100
+ transmittedPerc = 100
+ count = stats.totalLineCount()
+ }
+
+ // Use actual hostname from system, not "serverless"
+ actualHostname := getHostname()
+
+ // Build the protocol line without the message delimiter
+ protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s",
+ protocol.FieldDelimiter, actualHostname, protocol.FieldDelimiter,
+ transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter,
+ sourceID, protocol.FieldDelimiter, string(line))
+
+ // Return formatted line without color reset prefix
+ // The ColorWriter will handle proper coloring
+ result := []byte(protocolLine)
+ return result, true
+ }
+
+ // In server mode, just return the line content
+ // The baseHandler will handle protocol formatting
result := make([]byte, len(line))
copy(result, line)
return result, true
}
func (cp *CatProcessor) Flush() []byte {
- // Server should not send color codes - client handles colorization
+ // No need to add color reset codes here
+ // The ColorWriter handles all coloring
return nil
}
+
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go
index e02d4c2..0069397 100644
--- a/internal/io/fs/directprocessor.go
+++ b/internal/io/fs/directprocessor.go
@@ -115,15 +115,18 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
}
} else {
// Regular write path (e.g., stdout in serverless mode)
- if _, err := dp.output.Write(result); err != nil {
- return err
- }
-
- // Only add newline if the processor doesn't already handle it
- // CatProcessor doesn't add newlines, but GrepProcessor does
+ // Check if we need to add a newline
if _, isCat := dp.processor.(*CatProcessor); isCat {
// Scanner strips newlines, so we need to add them back for cat
- if _, err := dp.output.Write([]byte{'\n'}); err != nil {
+ // Combine the result with newline in a single write to avoid
+ // double color processing
+ resultWithNewline := append(result, '\n')
+ if _, err := dp.output.Write(resultWithNewline); err != nil {
+ return err
+ }
+ } else {
+ // For other processors, just write the result as-is
+ if _, err := dp.output.Write(result); err != nil {
return err
}
}
diff --git a/internal/io/fs/grepprocessor.go b/internal/io/fs/grepprocessor.go
index 56967ab..c4b3b1e 100644
--- a/internal/io/fs/grepprocessor.go
+++ b/internal/io/fs/grepprocessor.go
@@ -2,15 +2,18 @@ package fs
import (
"context"
+ "fmt"
+ "github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/regex"
)
// GrepProcessor handles grep-style filtering
type GrepProcessor struct {
- regex regex.Regex
- plain bool
- hostname string
+ regex regex.Regex
+ plain bool
+ hostname string
+ serverless bool
// Context handling
beforeContext int
@@ -25,11 +28,12 @@ type GrepProcessor struct {
}
// NewGrepProcessor creates a new grep processor
-func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, beforeContext, afterContext, maxCount int) *GrepProcessor {
+func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, serverless bool, beforeContext, afterContext, maxCount int) *GrepProcessor {
gp := &GrepProcessor{
regex: re,
plain: plain,
hostname: hostname,
+ serverless: serverless,
beforeContext: beforeContext,
afterContext: afterContext,
maxCount: maxCount,
@@ -127,9 +131,54 @@ func (gp *GrepProcessor) Flush() []byte {
// formatLine formats a line for output (shared by matching lines and context lines)
func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) []byte {
- // In both plain and non-plain modes, just return the line content
- // The baseHandler will handle protocol formatting for non-plain mode
-
+ // In plain mode, just return the line content
+ if gp.plain {
+ // If line already ends with a line ending, preserve it as-is
+ // Otherwise, add LF for consistency with bufio.Scanner behavior
+ if len(line) > 0 && (line[len(line)-1] == '\n' || (len(line) > 1 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n')) {
+ // Line already has line ending, preserve it exactly
+ result := make([]byte, len(line))
+ copy(result, line)
+ return result
+ } else {
+ // Line doesn't have line ending, add LF
+ result := make([]byte, len(line)+1)
+ copy(result, line)
+ result[len(line)] = '\n'
+ return result
+ }
+ }
+
+ // In non-plain serverless mode, we need to format with REMOTE protocol
+ // since there's no server baseHandler to do it for us
+ if gp.serverless {
+ // Format exactly like original basehandler.go for non-plain mode
+ // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}
+ var transmittedPerc int
+ var count uint64
+ if stats != nil {
+ transmittedPerc = stats.transmittedPerc()
+ count = stats.totalLineCount()
+ }
+
+ // Use actual hostname from system, not "serverless"
+ actualHostname := getHostname()
+
+ // Build the protocol line without the message delimiter
+ protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s",
+ protocol.FieldDelimiter, actualHostname, protocol.FieldDelimiter,
+ transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter,
+ sourceID, protocol.FieldDelimiter, string(line))
+
+ // Return formatted line with newline since we might concatenate multiple lines
+ result := make([]byte, len(protocolLine)+1)
+ copy(result, protocolLine)
+ result[len(protocolLine)] = '\n'
+ return result
+ }
+
+ // In server mode, just return the line content
+ // The baseHandler will handle protocol formatting
// If line already ends with a line ending, preserve it as-is
// Otherwise, add LF for consistency with bufio.Scanner behavior
if len(line) > 0 && (line[len(line)-1] == '\n' || (len(line) > 1 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n')) {
diff --git a/internal/io/fs/grepprocessor_test.go b/internal/io/fs/grepprocessor_test.go
index b558eab..376277c 100644
--- a/internal/io/fs/grepprocessor_test.go
+++ b/internal/io/fs/grepprocessor_test.go
@@ -12,7 +12,7 @@ func TestGrepProcessorBasic(t *testing.T) {
testutil.AssertNoError(t, err)
// Use plain mode to avoid color formatting issues in tests
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0)
tests := []struct {
name string
@@ -46,7 +46,7 @@ func TestGrepProcessorWithContext(t *testing.T) {
testutil.AssertNoError(t, err)
// Test with before context = 2, after context = 1
- gp := NewGrepProcessor(re, true, false, "testhost", 2, 1, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 2, 1, 0)
lines := []string{
"line 1",
@@ -83,7 +83,7 @@ func TestGrepProcessorMaxCount(t *testing.T) {
testutil.AssertNoError(t, err)
// Limit to 2 matches
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 2)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 2)
matchCount := 0
for i := 0; i < 5; i++ {
@@ -106,7 +106,7 @@ func TestGrepProcessorPlainMode(t *testing.T) {
re, err := regex.New("test", regex.Default)
testutil.AssertNoError(t, err)
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0)
// Test that plain mode preserves line endings
tests := []struct {
@@ -137,7 +137,7 @@ func TestGrepProcessorFormatLine(t *testing.T) {
testutil.AssertNoError(t, err)
// Test plain mode formatting to avoid color issues
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0)
stats := &stats{}
stats.updatePosition()
diff --git a/internal/io/fs/utils.go b/internal/io/fs/utils.go
new file mode 100644
index 0000000..106b58a
--- /dev/null
+++ b/internal/io/fs/utils.go
@@ -0,0 +1,12 @@
+package fs
+
+import "os"
+
+// getHostname returns the system hostname
+func getHostname() string {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return "unknown"
+ }
+ return hostname
+} \ No newline at end of file
diff --git a/internal/server/handlers/colorwriter.go b/internal/server/handlers/colorwriter.go
index 6e0f1af..22d36c3 100644
--- a/internal/server/handlers/colorwriter.go
+++ b/internal/server/handlers/colorwriter.go
@@ -8,15 +8,17 @@ import (
// ColorWriter wraps an io.Writer and applies colors to output
type ColorWriter struct {
- writer io.Writer
- noColor bool
+ writer io.Writer
+ noColor bool
+ isFirstLine bool
}
// NewColorWriter creates a new ColorWriter
func NewColorWriter(writer io.Writer, noColor bool) *ColorWriter {
return &ColorWriter{
- writer: writer,
- noColor: noColor,
+ writer: writer,
+ noColor: noColor,
+ isFirstLine: true,
}
}
@@ -27,12 +29,26 @@ func (cw *ColorWriter) Write(p []byte) (n int, err error) {
return cw.writer.Write(p)
}
- // Apply colors
- coloredStr := brush.Colorfy(string(p))
- coloredBytes := []byte(coloredStr)
+ // Apply colors to the content
+ // The brush.Colorfy function will handle the coloring
+ inputStr := string(p)
+ coloredStr := brush.Colorfy(inputStr)
+
+ // Add color reset prefix for all lines except the first
+ var outputBytes []byte
+ if cw.isFirstLine {
+ cw.isFirstLine = false
+ outputBytes = []byte(coloredStr)
+ } else {
+ // Add color reset prefix: [39m[49m[49m[39m
+ colorResetPrefix := "\x1b[39m\x1b[49m\x1b[49m\x1b[39m"
+ outputBytes = make([]byte, len(colorResetPrefix)+len(coloredStr))
+ copy(outputBytes, colorResetPrefix)
+ copy(outputBytes[len(colorResetPrefix):], coloredStr)
+ }
// Write the colored output
- _, err = cw.writer.Write(coloredBytes)
+ _, err = cw.writer.Write(outputBytes)
if err != nil {
return 0, err
}
@@ -40,4 +56,14 @@ func (cw *ColorWriter) Write(p []byte) (n int, err error) {
// Return the original byte count to maintain compatibility
// (the caller expects n to match len(p))
return len(p), nil
+}
+
+// Close closes the ColorWriter and writes final color reset if needed
+func (cw *ColorWriter) Close() error {
+ if !cw.noColor && config.Client.TermColorsEnable && !cw.isFirstLine {
+ // Write final color reset line
+ colorResetPrefix := "\x1b[39m\x1b[49m\x1b[49m\x1b[39m"
+ cw.writer.Write([]byte(colorResetPrefix))
+ }
+ return nil
} \ No newline at end of file
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 0a99aaa..901c929 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -147,6 +147,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
var output io.Writer
if r.server.serverless {
// In serverless mode, write to stdout with color support
+ // Note: plain mode means no colors, so noColor = plain
output = NewColorWriter(os.Stdout, r.server.plain)
} else {
// In client-server mode, write to server handler lines channel
@@ -209,6 +210,11 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
}
}
}
+
+ // Close the output writer if it's a ColorWriter
+ if colorWriter, ok := output.(*ColorWriter); ok {
+ colorWriter.Close()
+ }
}
// readStdin processes stdin using direct processing
@@ -217,6 +223,7 @@ func (r *readCommand) readStdin(ctx context.Context, ltx lcontext.LContext, re r
var output io.Writer
if r.server.serverless {
// In serverless mode, write to stdout with color support
+ // Note: plain mode means no colors, so noColor = plain
output = NewColorWriter(os.Stdout, r.server.plain)
} else {
// In client-server mode, write to server handler lines channel
@@ -236,6 +243,11 @@ func (r *readCommand) readStdin(ctx context.Context, ltx lcontext.LContext, re r
r.server.sendln(r.server.serverMessages, dlog.Server.Error(r.server.user,
"Error processing stdin", err))
}
+
+ // Close the output writer if it's a ColorWriter
+ if colorWriter, ok := output.(*ColorWriter); ok {
+ colorWriter.Close()
+ }
}
// isMapReduceCommand checks if this is a command that's part of a MapReduce operation
@@ -293,22 +305,22 @@ func (r *readCommand) createProcessor(re regex.Regex, ltx lcontext.LContext, out
mapProcessor, err := fs.NewMapProcessor(plain, hostname, queryStr, output)
if err != nil {
dlog.Server.Error(r.server.user, "Failed to create MapReduce processor", err)
- return fs.NewGrepProcessor(re, plain, noColor, hostname, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
+ return fs.NewGrepProcessor(re, plain, noColor, hostname, r.server.serverless, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
}
return mapProcessor, false
}
- return fs.NewGrepProcessor(re, plain, noColor, hostname, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
+ return fs.NewGrepProcessor(re, plain, noColor, hostname, r.server.serverless, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
case omode.CatClient:
if isMapReduce && queryStr != "" {
// This is a standalone MapReduce cat operation
mapProcessor, err := fs.NewMapProcessor(plain, hostname, queryStr, output)
if err != nil {
dlog.Server.Error(r.server.user, "Failed to create MapReduce processor", err)
- return fs.NewCatProcessor(plain, noColor, hostname), false
+ return fs.NewCatProcessor(plain, noColor, hostname, r.server.serverless), false
}
return mapProcessor, false
}
- return fs.NewCatProcessor(plain, noColor, hostname), false
+ return fs.NewCatProcessor(plain, noColor, hostname, r.server.serverless), false
case omode.TailClient:
if isMapReduce && queryStr != "" {
// This is a standalone MapReduce tail operation
@@ -327,14 +339,14 @@ func (r *readCommand) createProcessor(re regex.Regex, ltx lcontext.LContext, out
mapProcessor, err := fs.NewMapProcessor(plain, hostname, queryStr, output)
if err != nil {
dlog.Server.Error(r.server.user, "Failed to create MapReduce processor", err)
- return fs.NewGrepProcessor(re, plain, noColor, hostname, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
+ return fs.NewGrepProcessor(re, plain, noColor, hostname, r.server.serverless, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
}
return mapProcessor, false
}
// Fallback
- return fs.NewGrepProcessor(re, plain, noColor, hostname, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
+ return fs.NewGrepProcessor(re, plain, noColor, hostname, r.server.serverless, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
default:
// Default to grep behavior
- return fs.NewGrepProcessor(re, plain, noColor, hostname, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
+ return fs.NewGrepProcessor(re, plain, noColor, hostname, r.server.serverless, ltx.BeforeContext, ltx.AfterContext, ltx.MaxCount), false
}
}