summaryrefslogtreecommitdiff
path: root/internal/io/fs
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io/fs')
-rw-r--r--internal/io/fs/catprocessor.go63
-rw-r--r--internal/io/fs/directprocessor.go17
-rw-r--r--internal/io/fs/grepprocessor.go63
-rw-r--r--internal/io/fs/grepprocessor_test.go10
-rw-r--r--internal/io/fs/utils.go12
5 files changed, 137 insertions, 28 deletions
diff --git a/internal/io/fs/catprocessor.go b/internal/io/fs/catprocessor.go
index 0c88114..533328b 100644
--- a/internal/io/fs/catprocessor.go
+++ b/internal/io/fs/catprocessor.go
@@ -2,19 +2,26 @@ package fs
import (
"context"
+ "fmt"
+ "github.com/mimecast/dtail/internal/protocol"
)
// CatProcessor handles cat-style output
type CatProcessor struct {
- plain bool
- hostname string
+ plain bool
+ hostname string
+ serverless bool
}
// NewCatProcessor creates a new cat processor
-func NewCatProcessor(plain, noColor bool, hostname string) *CatProcessor {
+func NewCatProcessor(plain, noColor bool, hostname string, serverless bool) *CatProcessor {
+ // Debug: log the parameters
+ // fmt.Fprintf(os.Stderr, "DEBUG CatProcessor: hostname='%s', serverless=%v, plain=%v\n", hostname, serverless, plain)
+
return &CatProcessor{
- plain: plain,
- hostname: hostname,
+ plain: plain,
+ hostname: hostname,
+ serverless: serverless,
}
}
@@ -28,7 +35,8 @@ func (cp *CatProcessor) Cleanup() error {
// ProcessLine processes a single line for cat output.
// In plain mode, it preserves the original line exactly including line endings.
-// In non-plain mode, it returns just the content - the baseHandler will format the protocol.
+// In non-plain mode in server context, it returns just the content - the baseHandler will format the protocol.
+// In non-plain mode in serverless context, it formats the output with REMOTE protocol.
// Returns the line content and true (cat always outputs all lines).
func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) ([]byte, bool) {
// Update stats for matched line (cat always matches all lines)
@@ -36,14 +44,51 @@ func (cp *CatProcessor) ProcessLine(line []byte, lineNum int, filePath string, s
stats.updateLineMatched()
}
- // In both plain and non-plain modes, just return the line content
- // The baseHandler will handle protocol formatting for non-plain mode
+ // In plain mode, just return the line content
+ if cp.plain {
+ result := make([]byte, len(line))
+ copy(result, line)
+ return result, true
+ }
+
+ // In non-plain serverless mode, we need to format with REMOTE protocol
+ // since there's no server baseHandler to do it for us
+ if cp.serverless {
+ // Format exactly like original basehandler.go for non-plain mode
+ // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}
+ var transmittedPerc int
+ var count uint64
+ if stats != nil {
+ // For cat, we always transmit all matched lines, so transmittedPerc should be 100
+ transmittedPerc = 100
+ count = stats.totalLineCount()
+ }
+
+ // Use actual hostname from system, not "serverless"
+ actualHostname := getHostname()
+
+ // Build the protocol line without the message delimiter
+ protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s",
+ protocol.FieldDelimiter, actualHostname, protocol.FieldDelimiter,
+ transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter,
+ sourceID, protocol.FieldDelimiter, string(line))
+
+ // Return formatted line without color reset prefix
+ // The ColorWriter will handle proper coloring
+ result := []byte(protocolLine)
+ return result, true
+ }
+
+ // In server mode, just return the line content
+ // The baseHandler will handle protocol formatting
result := make([]byte, len(line))
copy(result, line)
return result, true
}
func (cp *CatProcessor) Flush() []byte {
- // Server should not send color codes - client handles colorization
+ // No need to add color reset codes here
+ // The ColorWriter handles all coloring
return nil
}
+
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go
index e02d4c2..0069397 100644
--- a/internal/io/fs/directprocessor.go
+++ b/internal/io/fs/directprocessor.go
@@ -115,15 +115,18 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
}
} else {
// Regular write path (e.g., stdout in serverless mode)
- if _, err := dp.output.Write(result); err != nil {
- return err
- }
-
- // Only add newline if the processor doesn't already handle it
- // CatProcessor doesn't add newlines, but GrepProcessor does
+ // Check if we need to add a newline
if _, isCat := dp.processor.(*CatProcessor); isCat {
// Scanner strips newlines, so we need to add them back for cat
- if _, err := dp.output.Write([]byte{'\n'}); err != nil {
+ // Combine the result with newline in a single write to avoid
+ // double color processing
+ resultWithNewline := append(result, '\n')
+ if _, err := dp.output.Write(resultWithNewline); err != nil {
+ return err
+ }
+ } else {
+ // For other processors, just write the result as-is
+ if _, err := dp.output.Write(result); err != nil {
return err
}
}
diff --git a/internal/io/fs/grepprocessor.go b/internal/io/fs/grepprocessor.go
index 56967ab..c4b3b1e 100644
--- a/internal/io/fs/grepprocessor.go
+++ b/internal/io/fs/grepprocessor.go
@@ -2,15 +2,18 @@ package fs
import (
"context"
+ "fmt"
+ "github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/regex"
)
// GrepProcessor handles grep-style filtering
type GrepProcessor struct {
- regex regex.Regex
- plain bool
- hostname string
+ regex regex.Regex
+ plain bool
+ hostname string
+ serverless bool
// Context handling
beforeContext int
@@ -25,11 +28,12 @@ type GrepProcessor struct {
}
// NewGrepProcessor creates a new grep processor
-func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, beforeContext, afterContext, maxCount int) *GrepProcessor {
+func NewGrepProcessor(re regex.Regex, plain, noColor bool, hostname string, serverless bool, beforeContext, afterContext, maxCount int) *GrepProcessor {
gp := &GrepProcessor{
regex: re,
plain: plain,
hostname: hostname,
+ serverless: serverless,
beforeContext: beforeContext,
afterContext: afterContext,
maxCount: maxCount,
@@ -127,9 +131,54 @@ func (gp *GrepProcessor) Flush() []byte {
// formatLine formats a line for output (shared by matching lines and context lines)
func (gp *GrepProcessor) formatLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) []byte {
- // In both plain and non-plain modes, just return the line content
- // The baseHandler will handle protocol formatting for non-plain mode
-
+ // In plain mode, just return the line content
+ if gp.plain {
+ // If line already ends with a line ending, preserve it as-is
+ // Otherwise, add LF for consistency with bufio.Scanner behavior
+ if len(line) > 0 && (line[len(line)-1] == '\n' || (len(line) > 1 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n')) {
+ // Line already has line ending, preserve it exactly
+ result := make([]byte, len(line))
+ copy(result, line)
+ return result
+ } else {
+ // Line doesn't have line ending, add LF
+ result := make([]byte, len(line)+1)
+ copy(result, line)
+ result[len(line)] = '\n'
+ return result
+ }
+ }
+
+ // In non-plain serverless mode, we need to format with REMOTE protocol
+ // since there's no server baseHandler to do it for us
+ if gp.serverless {
+ // Format exactly like original basehandler.go for non-plain mode
+ // REMOTE|{hostname}|{TransmittedPerc}|{Count}|{SourceID}|{Content}
+ var transmittedPerc int
+ var count uint64
+ if stats != nil {
+ transmittedPerc = stats.transmittedPerc()
+ count = stats.totalLineCount()
+ }
+
+ // Use actual hostname from system, not "serverless"
+ actualHostname := getHostname()
+
+ // Build the protocol line without the message delimiter
+ protocolLine := fmt.Sprintf("REMOTE%s%s%s%3d%s%v%s%s%s%s",
+ protocol.FieldDelimiter, actualHostname, protocol.FieldDelimiter,
+ transmittedPerc, protocol.FieldDelimiter, count, protocol.FieldDelimiter,
+ sourceID, protocol.FieldDelimiter, string(line))
+
+ // Return formatted line with newline since we might concatenate multiple lines
+ result := make([]byte, len(protocolLine)+1)
+ copy(result, protocolLine)
+ result[len(protocolLine)] = '\n'
+ return result
+ }
+
+ // In server mode, just return the line content
+ // The baseHandler will handle protocol formatting
// If line already ends with a line ending, preserve it as-is
// Otherwise, add LF for consistency with bufio.Scanner behavior
if len(line) > 0 && (line[len(line)-1] == '\n' || (len(line) > 1 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n')) {
diff --git a/internal/io/fs/grepprocessor_test.go b/internal/io/fs/grepprocessor_test.go
index b558eab..376277c 100644
--- a/internal/io/fs/grepprocessor_test.go
+++ b/internal/io/fs/grepprocessor_test.go
@@ -12,7 +12,7 @@ func TestGrepProcessorBasic(t *testing.T) {
testutil.AssertNoError(t, err)
// Use plain mode to avoid color formatting issues in tests
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0)
tests := []struct {
name string
@@ -46,7 +46,7 @@ func TestGrepProcessorWithContext(t *testing.T) {
testutil.AssertNoError(t, err)
// Test with before context = 2, after context = 1
- gp := NewGrepProcessor(re, true, false, "testhost", 2, 1, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 2, 1, 0)
lines := []string{
"line 1",
@@ -83,7 +83,7 @@ func TestGrepProcessorMaxCount(t *testing.T) {
testutil.AssertNoError(t, err)
// Limit to 2 matches
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 2)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 2)
matchCount := 0
for i := 0; i < 5; i++ {
@@ -106,7 +106,7 @@ func TestGrepProcessorPlainMode(t *testing.T) {
re, err := regex.New("test", regex.Default)
testutil.AssertNoError(t, err)
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0)
// Test that plain mode preserves line endings
tests := []struct {
@@ -137,7 +137,7 @@ func TestGrepProcessorFormatLine(t *testing.T) {
testutil.AssertNoError(t, err)
// Test plain mode formatting to avoid color issues
- gp := NewGrepProcessor(re, true, false, "testhost", 0, 0, 0)
+ gp := NewGrepProcessor(re, true, false, "testhost", false, 0, 0, 0)
stats := &stats{}
stats.updatePosition()
diff --git a/internal/io/fs/utils.go b/internal/io/fs/utils.go
new file mode 100644
index 0000000..106b58a
--- /dev/null
+++ b/internal/io/fs/utils.go
@@ -0,0 +1,12 @@
+package fs
+
+import "os"
+
+// getHostname returns the system hostname
+func getHostname() string {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return "unknown"
+ }
+ return hostname
+} \ No newline at end of file