diff options
Diffstat (limited to 'internal/io/fs')
| -rw-r--r-- | internal/io/fs/catprocessor.go | 63 | ||||
| -rw-r--r-- | internal/io/fs/directprocessor.go | 17 | ||||
| -rw-r--r-- | internal/io/fs/grepprocessor.go | 63 | ||||
| -rw-r--r-- | internal/io/fs/grepprocessor_test.go | 10 | ||||
| -rw-r--r-- | internal/io/fs/utils.go | 12 |
5 files changed, 137 insertions, 28 deletions
diff --git a/internal/io/fs/catprocessor.go b/internal/io/fs/catprocessor.go index 0c88114..533328b 100644 --- a/internal/io/fs/catprocessor.go +++ b/internal/io/fs/catprocessor.go @@ -2,19 +2,26 @@ package fs import ( "context" + "fmt" + "github.com/mimecast/dtail/internal/protocol" ) // CatProcessor handles cat-style output type CatProcessor struct { - plain bool - hostname string + plain bool + hostname string + serverless bool } // NewCatProcessor creates a new cat processor -func NewCatProcessor(plain, noColor bool, hostname string) *CatProcessor { +func NewCatProcessor(plain, noColor bool, hostname string, serverless bool) *CatProcessor { + // Debug: log the parameters + // fmt.Fprintf(os.Stderr, "DEBUG CatProcessor: hostname='%s', serverless=%v, plain=%v\n", hostname, serverless, plain) + return &CatProcessor{ - plain: plain, - hostname: hostname, + plain: plain, + hostname: hostname, + serverless: serverless, } } @@ -28,7 +35,8 @@ func (cp *CatProcessor) Cleanup() error { // ProcessLine processes a single line for cat output. // In plain mode, it preserves the original line exactly including line endings. -// In non-plain mode, it returns just the content - the baseHandler will format the protocol. +// In non-plain mode in server context, it returns just the content - the baseHandler will format the protocol. +// In non-plain mode in serverless context, it formats the output with REMOTE protocol. // Returns the line content and true (cat always outputs all lines). func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) { // Update stats for matched line (cat always matches all lines) @@ -36,14 +44,51 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s stats.updateLineMatched() } - // In both plain and non-plain modes, just return the line content - // The baseHandler will handle protocol formatting for non-plain mode + // In plain mode, just return the line content + if cp.plain { + result := make([]byte, len(line)) + copy(result, line) + return result, true + } + + // In non-plain serverless mode, we need to format with REMOTE protocol + // since there's no server baseHandler to do it for us + if cp.serverless { + // Format exactly like original basehandler.go for non-plain mode + // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content} + var transmittedPerc int + var count uint64 + if stats != nil { + // For cat, we always transmit all matched lines, so transmittedPerc should be 100 + transmittedPerc = 100 + count = stats.totalLineCount() + } + + // Use actual hostname from system, not "serverless" + actualHostname := getHostname() + + // Build the protocol line without the message delimiter + protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s", + protocol.FieldDelimiter, actualHostname, protocol.FieldDelimiter, + transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter, + sourceID, protocol.FieldDelimiter, string(line)) + + // Return formatted line without color reset prefix + // The ColorWriter will handle proper coloring + result := []byte(protocolLine) + return result, true + } + + // In server mode, just return the line content + // The baseHandler will handle protocol formatting result := make([]byte, len(line)) copy(result, line) return result, true } func (cp *CatProcessor) Flush() []byte { - // Server should not send color codes - client handles colorization + // No need to add color reset codes here + // The ColorWriter handles all coloring return nil } + diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go index e02d4c2..0069397 100644 --- a/internal/io/fs/directprocessor.go +++ b/internal/io/fs/directprocessor.go @@ -115,15 +115,18 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, } } else { // Regular write path (e.g., stdout in serverless mode) - if _, err := dp.output.Write(result); err != nil { - return err - } - - // Only add newline if the processor doesn't already handle it - // CatProcessor doesn't add newlines, but GrepProcessor does + // Check if we need to add a newline if _, isCat := dp.processor.(*CatProcessor); isCat { // Scanner strips newlines, so we need to add them back for cat - if _, err := dp.output.Write([]byte{'\n'}); err != nil { + // Combine the result with newline in a single write to avoid + // double color processing + resultWithNewline := append(result, '\n') + if _, err := dp.output.Write(resultWithNewline); err != nil { + return err + } + } else { + // For other processors, just write the result as-is + if _, err := dp.output.Write(result); err != nil { return err } } diff --git a/internal/io/fs/grepprocessor.go b/internal/io/fs/grepprocessor.go index 56967ab..c4b3b1e 100644 --- a/internal/io/fs/grepprocessor.go +++ b/internal/io/fs/grepprocessor.go @@ -2,15 +2,18 @@ package fs import ( "context" + "fmt" + "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" ) // GrepProcessor handles grep-style filtering type GrepProcessor struct { - regex regex.Regex - plain bool - hostname string + regex regex.Regex + plain bool + hostname string + serverless bool // Context handling beforeContext int @@ -25,11 +28,12 @@ type GrepProcessor struct { } // NewGrepProcessor creates a new grep processor -func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, beforeContext, afterContext, maxCount int) *GrepProcessor { +func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, serverless bool, beforeContext, afterContext, maxCount int) *GrepProcessor { gp := &GrepProcessor{ regex: re, plain: plain, hostname: hostname, + serverless: serverless, beforeContext: beforeContext, afterContext: afterContext, maxCount: maxCount, @@ -127,9 +131,54 @@ func (gp *GrepProcessor) Flush() []byte { // formatLine formats a line for output (shared by matching lines and context lines) func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) []byte { - // In both plain and non-plain modes, just return the line content - // The baseHandler will handle protocol formatting for non-plain mode - + // In plain mode, just return the line content + if gp.plain { + // If line already ends with a line ending, preserve it as-is + // Otherwise, add LF for consistency with bufio.Scanner behavior + if len(line) > 0 && (line[len(line)-1] == '\n' || (len(line) > 1 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n')) { + // Line already has line ending, preserve it exactly + result := make([]byte, len(line)) + copy(result, line) + return result + } else { + // Line doesn't have line ending, add LF + result := make([]byte, len(line)+1) + copy(result, line) + result[len(line)] = '\n' + return result + } + } + + // In non-plain serverless mode, we need to format with REMOTE protocol + // since there's no server baseHandler to do it for us + if gp.serverless { + // Format exactly like original basehandler.go for non-plain mode + // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content} + var transmittedPerc int + var count uint64 + if stats != nil { + transmittedPerc = stats.transmittedPerc() + count = stats.totalLineCount() + } + + // Use actual hostname from system, not "serverless" + actualHostname := getHostname() + + // Build the protocol line without the message delimiter + protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s", + protocol.FieldDelimiter, actualHostname, protocol.FieldDelimiter, + transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter, + sourceID, protocol.FieldDelimiter, string(line)) + + // Return formatted line with newline since we might concatenate multiple lines + result := make([]byte, len(protocolLine)+1) + copy(result, protocolLine) + result[len(protocolLine)] = '\n' + return result + } + + // In server mode, just return the line content + // The baseHandler will handle protocol formatting // If line already ends with a line ending, preserve it as-is // Otherwise, add LF for consistency with bufio.Scanner behavior if len(line) > 0 && (line[len(line)-1] == '\n' || (len(line) > 1 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n')) { diff --git a/internal/io/fs/grepprocessor_test.go b/internal/io/fs/grepprocessor_test.go index b558eab..376277c 100644 --- a/internal/io/fs/grepprocessor_test.go +++ b/internal/io/fs/grepprocessor_test.go @@ -12,7 +12,7 @@ func TestGrepProcessorBasic(t *testing.T) { testutil.AssertNoError(t, err) // Use plain mode to avoid color formatting issues in tests - gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0) + gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0) tests := []struct { name string @@ -46,7 +46,7 @@ func TestGrepProcessorWithContext(t *testing.T) { testutil.AssertNoError(t, err) // Test with before context = 2, after context = 1 - gp := NewGrepProcessor(re, true, false, "testhost", 2, 1, 0) + gp := NewGrepProcessor(re, true, false, "testhost", false, 2, 1, 0) lines := []string{ "line 1", @@ -83,7 +83,7 @@ func TestGrepProcessorMaxCount(t *testing.T) { testutil.AssertNoError(t, err) // Limit to 2 matches - gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 2) + gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 2) matchCount := 0 for i := 0; i < 5; i++ { @@ -106,7 +106,7 @@ func TestGrepProcessorPlainMode(t *testing.T) { re, err := regex.New("test", regex.Default) testutil.AssertNoError(t, err) - gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0) + gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0) // Test that plain mode preserves line endings tests := []struct { @@ -137,7 +137,7 @@ func TestGrepProcessorFormatLine(t *testing.T) { testutil.AssertNoError(t, err) // Test plain mode formatting to avoid color issues - gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0) + gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0) stats := &stats{} stats.updatePosition() diff --git a/internal/io/fs/utils.go b/internal/io/fs/utils.go new file mode 100644 index 0000000..106b58a --- /dev/null +++ b/internal/io/fs/utils.go @@ -0,0 +1,12 @@ +package fs + +import "os" + +// getHostname returns the system hostname +func getHostname() string { + hostname, err := os.Hostname() + if err != nil { + return "unknown" + } + return hostname +}
\ No newline at end of file |
