summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-18 09:34:06 +0300
committerPaul Buetow <paul@buetow.org>2025-06-18 09:34:06 +0300
commit06de5147268508bb26702d2d08355b2b9600703e (patch)
tree91caf91428bf972fba5dc249885c57234118725f
parenta399c65856296f637ae6e9447928d3520b4a133d (diff)
Rename channelless functions to use cleaner naming
Now that channel-based code is completely removed, renamed all functions and references from "channelless" to more descriptive names: - startChannelless() → start() - readGlobChannelless() → readGlob() - readFilesChannelless() → readFiles() - readChannellessStdin() → readStdin() - createChannellessProcessor() → createProcessor() Updated comments and debug messages to use "direct processing" terminology. Renamed test file and functions to use "Direct" naming convention. Changed source IDs from "channelless" to "direct". All functionality preserved with improved code clarity and maintainability. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
-rw-r--r--.claude/commands/commit.md1
-rw-r--r--channelless_output.txt7
-rw-r--r--integrationtests/direct_processing_test.go (renamed from integrationtests/channelless_test.go)18
-rw-r--r--internal/io/fs/directprocessor.go4
-rw-r--r--internal/server/handlers/networkwriter.go6
-rw-r--r--internal/server/handlers/readcommand.go44
6 files changed, 37 insertions, 43 deletions
diff --git a/.claude/commands/commit.md b/.claude/commands/commit.md
new file mode 100644
index 0000000..b6df22d
--- /dev/null
+++ b/.claude/commands/commit.md
@@ -0,0 +1 @@
+Commit changes to git and push them to git.
diff --git a/channelless_output.txt b/channelless_output.txt
deleted file mode 100644
index f2bfbed..0000000
--- a/channelless_output.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-INFO|1002-071939|1|stats.go:56|8|11|7|0.80|471h8m17s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
-INFO|1002-071946|1|stats.go:56|8|11|7|0.67|471h8m24s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
-INFO|1002-071946|1|stats.go:56|8|11|7|0.67|471h8m24s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
-INFO|1002-071947|1|stats.go:56|8|11|7|0.67|471h8m24s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
-INFO|1002-071947|1|stats.go:56|8|11|7|0.67|471h8m25s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
-INFO|1002-071947|1|stats.go:56|8|11|7|0.67|471h8m25s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
-INFO|1002-071948|1|stats.go:56|8|11|7|0.67|471h8m25s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5
diff --git a/integrationtests/channelless_test.go b/integrationtests/direct_processing_test.go
index 41845df..51a068a 100644
--- a/integrationtests/channelless_test.go
+++ b/integrationtests/direct_processing_test.go
@@ -8,15 +8,15 @@ import (
"github.com/mimecast/dtail/internal/config"
)
-func TestDGrepChannelless(t *testing.T) {
+func TestDGrepDirect(t *testing.T) {
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
}
- // Test dgrep with channelless mode (now the default)
+ // Test dgrep with direct processing (now the default)
inFile := "mapr_testdata.log"
- outFile := "dgrepchannelless.stdout.tmp"
+ outFile := "dgrepdirect.stdout.tmp"
expectedOutFile := "dgrepcontext1.txt.expected"
_, err := runCommand(context.TODO(), t, outFile,
@@ -41,15 +41,15 @@ func TestDGrepChannelless(t *testing.T) {
os.Remove(outFile)
}
-func TestDCatChannelless(t *testing.T) {
+func TestDCatDirect(t *testing.T) {
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
}
- // Test dcat with channelless mode (now the default)
+ // Test dcat with direct processing (now the default)
inFile := "dcat1a.txt"
- outFile := "dcatchannelless.stdout.tmp"
+ outFile := "dcatdirect.stdout.tmp"
_, err := runCommand(context.TODO(), t, outFile,
"../dcat",
@@ -70,17 +70,17 @@ func TestDCatChannelless(t *testing.T) {
os.Remove(outFile)
}
-func TestChannellessMode(t *testing.T) {
+func TestDirectProcessingMode(t *testing.T) {
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
}
- // Test that channelless mode (now default) works correctly
+ // Test that direct processing (now default) works correctly
// Test grep
inFile := "mapr_testdata.log"
- outFile := "grep_channelless.tmp"
+ outFile := "grep_direct.tmp"
expectedOutFile := "dgrep1.txt.expected"
_, err := runCommand(context.TODO(), t, outFile,
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go
index 00ecedb..2b4e4df 100644
--- a/internal/io/fs/directprocessor.go
+++ b/internal/io/fs/directprocessor.go
@@ -21,7 +21,7 @@ import (
"github.com/mimecast/dtail/internal/regex"
)
-// LineProcessor interface for channelless line-by-line processing
+// LineProcessor interface for direct line-by-line processing
type LineProcessor interface {
ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) (result []byte, shouldSend bool)
Flush() []byte // For any buffered output (e.g., MapReduce)
@@ -153,7 +153,7 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex
// Check if the accumulated data exceeds max line length
if len(data) >= maxLineLength {
if !warnedAboutLongLine {
- // Note: we don't have server messages channel in channelless mode
+ // Note: we don't have server messages channel in direct processing mode
// so we'll just split without warning
warnedAboutLongLine = true
}
diff --git a/internal/server/handlers/networkwriter.go b/internal/server/handlers/networkwriter.go
index bb5ad1d..fb77b47 100644
--- a/internal/server/handlers/networkwriter.go
+++ b/internal/server/handlers/networkwriter.go
@@ -11,7 +11,7 @@ import (
"github.com/mimecast/dtail/internal/user/server"
)
-// NetworkOutputWriter provides direct network streaming for channelless processing
+// NetworkOutputWriter provides direct network streaming
type NetworkOutputWriter struct {
conn net.Conn
serverMessages chan<- string // Keep existing channel for server messages (low frequency)
@@ -201,7 +201,7 @@ func (cow *ChannelOutputWriter) Write(data []byte) (int, error) {
// Create a line object using the proper constructor
contentBuffer := bytes.NewBuffer(data)
- lineObj := line.New(contentBuffer, 0, 100, "channelless")
+ lineObj := line.New(contentBuffer, 0, 100, "direct")
select {
case cow.linesCh <- lineObj:
@@ -288,7 +288,7 @@ func (shw *ServerHandlerWriter) Write(data []byte) (int, error) {
// Create a line object and send it through the server's lines channel
contentBuffer := bytes.NewBuffer(data)
- lineObj := line.New(contentBuffer, 0, 100, "channelless")
+ lineObj := line.New(contentBuffer, 0, 100, "direct")
select {
case shw.server.lines <- lineObj:
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 967cae4..14441b8 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -52,7 +52,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
}
dlog.Server.Debug("Processing mode:", r.mode)
- r.startChannelless(ctx, ltx, args, re, retries, queryStr)
+ r.start(ctx, ltx, args, re, retries, queryStr)
}
@@ -87,23 +87,23 @@ func (r *readCommand) isInputFromPipe() bool {
return fileInfo.Mode()&os.ModeCharDevice == 0
}
-// startChannelless implements channelless processing for better performance
-func (r *readCommand) startChannelless(ctx context.Context, ltx lcontext.LContext,
+// start implements direct processing for better performance
+func (r *readCommand) start(ctx context.Context, ltx lcontext.LContext,
args []string, re regex.Regex, retries int, queryStr string) {
// Handle stdin input in serverless mode
if (args[1] == "" || args[1] == "-") && r.isInputFromPipe() {
- dlog.Server.Debug("Reading data from stdin pipe (channelless)")
- r.readChannellessStdin(ctx, ltx, re, queryStr)
+ dlog.Server.Debug("Reading data from stdin pipe")
+ r.readStdin(ctx, ltx, re, queryStr)
return
}
- dlog.Server.Debug("Reading data from file(s) (channelless)")
- r.readGlobChannelless(ctx, ltx, args[1], re, retries, queryStr)
+ dlog.Server.Debug("Reading data from file(s)")
+ r.readGlob(ctx, ltx, args[1], re, retries, queryStr)
}
-// readGlobChannelless processes files using channelless approach
-func (r *readCommand) readGlobChannelless(ctx context.Context, ltx lcontext.LContext,
+// readGlob processes files using direct processing
+func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
glob string, re regex.Regex, retries int, queryStr string) {
retryInterval := time.Second * 5
@@ -130,7 +130,7 @@ func (r *readCommand) readGlobChannelless(ctx context.Context, ltx lcontext.LCon
continue
}
- r.readFilesChannelless(ctx, ltx, paths, glob, re, queryStr)
+ r.readFiles(ctx, ltx, paths, glob, re, queryStr)
return
}
@@ -138,8 +138,8 @@ func (r *readCommand) readGlobChannelless(ctx context.Context, ltx lcontext.LCon
"Giving up to read file(s)"))
}
-// readFilesChannelless processes multiple files using channelless approach
-func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LContext,
+// readFiles processes multiple files using direct processing
+func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
paths []string, glob string, re regex.Regex, queryStr string) {
// Choose output writer based on server mode
@@ -153,7 +153,7 @@ func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LCo
}
// Create appropriate processor based on mode
- processor, needsFollowing := r.createChannellessProcessor(re, ltx, output, queryStr)
+ processor, needsFollowing := r.createProcessor(re, ltx, output, queryStr)
// Process each file
for _, path := range paths {
@@ -167,7 +167,7 @@ func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LCo
continue
}
- dlog.Server.Info(r.server.user, "Start reading (channelless)", path)
+ dlog.Server.Info(r.server.user, "Start reading", path)
// Handle file following for tail operations
if needsFollowing {
@@ -208,8 +208,8 @@ func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LCo
}
}
-// readChannellessStdin processes stdin using channelless approach
-func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LContext, re regex.Regex, queryStr string) {
+// readStdin processes stdin using direct processing
+func (r *readCommand) readStdin(ctx context.Context, ltx lcontext.LContext, re regex.Regex, queryStr string) {
// Choose output writer based on server mode
var output io.Writer
if r.server.serverless {
@@ -221,12 +221,12 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo
}
// Create appropriate processor based on mode
- processor, _ := r.createChannellessProcessor(re, ltx, output, queryStr)
+ processor, _ := r.createProcessor(re, ltx, output, queryStr)
// Create direct processor with "-" as globID for stdin
directProcessor := fs.NewDirectProcessor(processor, output, "-", ltx)
- dlog.Server.Info(r.server.user, "Start reading from stdin (channelless)")
+ dlog.Server.Info(r.server.user, "Start reading from stdin")
if err := directProcessor.ProcessReader(ctx, os.Stdin, "-"); err != nil {
dlog.Server.Error(r.server.user, "stdin", err)
@@ -248,16 +248,16 @@ func (r *readCommand) isMapReduceCommand(re regex.Regex) bool {
return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop()
}
-// createChannellessProcessor creates the appropriate processor based on command mode
-func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext, output io.Writer, queryStr string) (fs.LineProcessor, bool) {
+// createProcessor creates the appropriate processor based on command mode
+func (r *readCommand) createProcessor(re regex.Regex, ltx lcontext.LContext, output io.Writer, queryStr string) (fs.LineProcessor, bool) {
hostname := r.server.hostname // Use server hostname
plain := r.server.plain // Use actual plain mode from server
- noColor := false // Enable colors by default in channelless mode
+ noColor := false // Enable colors by default
// If there's an existing aggregate (from a 'map' command), we need to feed data to it
// Create a lines channel and connect it to the aggregate
if r.server.aggregate != nil {
- dlog.Server.Debug("Using existing aggregate, creating bridge processor for channelless mode")
+ dlog.Server.Debug("Using existing aggregate, creating bridge processor")
// Create a lines channel for the aggregate with larger buffer
linesCh := make(chan *line.Line, 1000)
// Connect the lines channel to the aggregate