diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-18 09:34:06 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-18 09:34:06 +0300 |
| commit | 06de5147268508bb26702d2d08355b2b9600703e (patch) | |
| tree | 91caf91428bf972fba5dc249885c57234118725f | |
| parent | a399c65856296f637ae6e9447928d3520b4a133d (diff) | |
Rename channelless functions to use cleaner naming
Now that channel-based code is completely removed, renamed all functions
and references from "channelless" to more descriptive names:
- startChannelless() → start()
- readGlobChannelless() → readGlob()
- readFilesChannelless() → readFiles()
- readChannellessStdin() → readStdin()
- createChannellessProcessor() → createProcessor()
Updated comments and debug messages to use "direct processing" terminology.
Renamed test file and functions to use "Direct" naming convention.
Changed source IDs from "channelless" to "direct".
All functionality preserved with improved code clarity and maintainability.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
| -rw-r--r-- | .claude/commands/commit.md | 1 | ||||
| -rw-r--r-- | channelless_output.txt | 7 | ||||
| -rw-r--r-- | integrationtests/direct_processing_test.go (renamed from integrationtests/channelless_test.go) | 18 | ||||
| -rw-r--r-- | internal/io/fs/directprocessor.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/networkwriter.go | 6 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 44 |
6 files changed, 37 insertions, 43 deletions
diff --git a/.claude/commands/commit.md b/.claude/commands/commit.md new file mode 100644 index 0000000..b6df22d --- /dev/null +++ b/.claude/commands/commit.md @@ -0,0 +1 @@ +Commit changes to git and push them to git. diff --git a/channelless_output.txt b/channelless_output.txt deleted file mode 100644 index f2bfbed..0000000 --- a/channelless_output.txt +++ /dev/null @@ -1,7 +0,0 @@ -INFO|1002-071939|1|stats.go:56|8|11|7|0.80|471h8m17s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 -INFO|1002-071946|1|stats.go:56|8|11|7|0.67|471h8m24s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 -INFO|1002-071946|1|stats.go:56|8|11|7|0.67|471h8m24s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 -INFO|1002-071947|1|stats.go:56|8|11|7|0.67|471h8m24s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 -INFO|1002-071947|1|stats.go:56|8|11|7|0.67|471h8m25s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 -INFO|1002-071947|1|stats.go:56|8|11|7|0.67|471h8m25s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 -INFO|1002-071948|1|stats.go:56|8|11|7|0.67|471h8m25s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=5 diff --git a/integrationtests/channelless_test.go b/integrationtests/direct_processing_test.go index 41845df..51a068a 100644 --- a/integrationtests/channelless_test.go +++ b/integrationtests/direct_processing_test.go @@ -8,15 +8,15 @@ import ( "github.com/mimecast/dtail/internal/config" ) -func TestDGrepChannelless(t *testing.T) { +func TestDGrepDirect(t *testing.T) { if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return } - // Test dgrep with channelless mode (now the default) + // Test dgrep with direct processing (now the default) inFile := "mapr_testdata.log" - outFile := "dgrepchannelless.stdout.tmp" + outFile := "dgrepdirect.stdout.tmp" expectedOutFile := "dgrepcontext1.txt.expected" _, err := runCommand(context.TODO(), t, outFile, @@ -41,15 +41,15 @@ func TestDGrepChannelless(t *testing.T) { os.Remove(outFile) } -func TestDCatChannelless(t *testing.T) { +func TestDCatDirect(t *testing.T) { if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return } - // Test dcat with channelless mode (now the default) + // Test dcat with direct processing (now the default) inFile := "dcat1a.txt" - outFile := "dcatchannelless.stdout.tmp" + outFile := "dcatdirect.stdout.tmp" _, err := runCommand(context.TODO(), t, outFile, "../dcat", @@ -70,17 +70,17 @@ func TestDCatChannelless(t *testing.T) { os.Remove(outFile) } -func TestChannellessMode(t *testing.T) { +func TestDirectProcessingMode(t *testing.T) { if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return } - // Test that channelless mode (now default) works correctly + // Test that direct processing (now default) works correctly // Test grep inFile := "mapr_testdata.log" - outFile := "grep_channelless.tmp" + outFile := "grep_direct.tmp" expectedOutFile := "dgrep1.txt.expected" _, err := runCommand(context.TODO(), t, outFile, diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go index 00ecedb..2b4e4df 100644 --- a/internal/io/fs/directprocessor.go +++ b/internal/io/fs/directprocessor.go @@ -21,7 +21,7 @@ import ( "github.com/mimecast/dtail/internal/regex" ) -// LineProcessor interface for channelless line-by-line processing +// LineProcessor interface for direct line-by-line processing type LineProcessor interface { ProcessLine(line []byte, lineNum int, filePath string, stats *stats, sourceID string) (result []byte, shouldSend bool) Flush() []byte // For any buffered output (e.g., MapReduce) @@ -153,7 +153,7 @@ func (dp *DirectProcessor) processReaderPreservingLineEndings(ctx context.Contex // Check if the accumulated data exceeds max line length if len(data) >= maxLineLength { if !warnedAboutLongLine { - // Note: we don't have server messages channel in channelless mode + // Note: we don't have server messages channel in direct processing mode // so we'll just split without warning warnedAboutLongLine = true } diff --git a/internal/server/handlers/networkwriter.go b/internal/server/handlers/networkwriter.go index bb5ad1d..fb77b47 100644 --- a/internal/server/handlers/networkwriter.go +++ b/internal/server/handlers/networkwriter.go @@ -11,7 +11,7 @@ import ( "github.com/mimecast/dtail/internal/user/server" ) -// NetworkOutputWriter provides direct network streaming for channelless processing +// NetworkOutputWriter provides direct network streaming type NetworkOutputWriter struct { conn net.Conn serverMessages chan<- string // Keep existing channel for server messages (low frequency) @@ -201,7 +201,7 @@ func (cow *ChannelOutputWriter) Write(data []byte) (int, error) { // Create a line object using the proper constructor contentBuffer := bytes.NewBuffer(data) - lineObj := line.New(contentBuffer, 0, 100, "channelless") + lineObj := line.New(contentBuffer, 0, 100, "direct") select { case cow.linesCh <- lineObj: @@ -288,7 +288,7 @@ func (shw *ServerHandlerWriter) Write(data []byte) (int, error) { // Create a line object and send it through the server's lines channel contentBuffer := bytes.NewBuffer(data) - lineObj := line.New(contentBuffer, 0, 100, "channelless") + lineObj := line.New(contentBuffer, 0, 100, "direct") select { case shw.server.lines <- lineObj: diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 967cae4..14441b8 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -52,7 +52,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, } dlog.Server.Debug("Processing mode:", r.mode) - r.startChannelless(ctx, ltx, args, re, retries, queryStr) + r.start(ctx, ltx, args, re, retries, queryStr) } @@ -87,23 +87,23 @@ func (r *readCommand) isInputFromPipe() bool { return fileInfo.Mode()&os.ModeCharDevice == 0 } -// startChannelless implements channelless processing for better performance -func (r *readCommand) startChannelless(ctx context.Context, ltx lcontext.LContext, +// start implements direct processing for better performance +func (r *readCommand) start(ctx context.Context, ltx lcontext.LContext, args []string, re regex.Regex, retries int, queryStr string) { // Handle stdin input in serverless mode if (args[1] == "" || args[1] == "-") && r.isInputFromPipe() { - dlog.Server.Debug("Reading data from stdin pipe (channelless)") - r.readChannellessStdin(ctx, ltx, re, queryStr) + dlog.Server.Debug("Reading data from stdin pipe") + r.readStdin(ctx, ltx, re, queryStr) return } - dlog.Server.Debug("Reading data from file(s) (channelless)") - r.readGlobChannelless(ctx, ltx, args[1], re, retries, queryStr) + dlog.Server.Debug("Reading data from file(s)") + r.readGlob(ctx, ltx, args[1], re, retries, queryStr) } -// readGlobChannelless processes files using channelless approach -func (r *readCommand) readGlobChannelless(ctx context.Context, ltx lcontext.LContext, +// readGlob processes files using direct processing +func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, glob string, re regex.Regex, retries int, queryStr string) { retryInterval := time.Second * 5 @@ -130,7 +130,7 @@ func (r *readCommand) readGlobChannelless(ctx context.Context, ltx lcontext.LCon continue } - r.readFilesChannelless(ctx, ltx, paths, glob, re, queryStr) + r.readFiles(ctx, ltx, paths, glob, re, queryStr) return } @@ -138,8 +138,8 @@ func (r *readCommand) readGlobChannelless(ctx context.Context, ltx lcontext.LCon "Giving up to read file(s)")) } -// readFilesChannelless processes multiple files using channelless approach -func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LContext, +// readFiles processes multiple files using direct processing +func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, paths []string, glob string, re regex.Regex, queryStr string) { // Choose output writer based on server mode @@ -153,7 +153,7 @@ func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LCo } // Create appropriate processor based on mode - processor, needsFollowing := r.createChannellessProcessor(re, ltx, output, queryStr) + processor, needsFollowing := r.createProcessor(re, ltx, output, queryStr) // Process each file for _, path := range paths { @@ -167,7 +167,7 @@ func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LCo continue } - dlog.Server.Info(r.server.user, "Start reading (channelless)", path) + dlog.Server.Info(r.server.user, "Start reading", path) // Handle file following for tail operations if needsFollowing { @@ -208,8 +208,8 @@ func (r *readCommand) readFilesChannelless(ctx context.Context, ltx lcontext.LCo } } -// readChannellessStdin processes stdin using channelless approach -func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LContext, re regex.Regex, queryStr string) { +// readStdin processes stdin using direct processing +func (r *readCommand) readStdin(ctx context.Context, ltx lcontext.LContext, re regex.Regex, queryStr string) { // Choose output writer based on server mode var output io.Writer if r.server.serverless { @@ -221,12 +221,12 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo } // Create appropriate processor based on mode - processor, _ := r.createChannellessProcessor(re, ltx, output, queryStr) + processor, _ := r.createProcessor(re, ltx, output, queryStr) // Create direct processor with "-" as globID for stdin directProcessor := fs.NewDirectProcessor(processor, output, "-", ltx) - dlog.Server.Info(r.server.user, "Start reading from stdin (channelless)") + dlog.Server.Info(r.server.user, "Start reading from stdin") if err := directProcessor.ProcessReader(ctx, os.Stdin, "-"); err != nil { dlog.Server.Error(r.server.user, "stdin", err) @@ -248,16 +248,16 @@ func (r *readCommand) isMapReduceCommand(re regex.Regex) bool { return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop() } -// createChannellessProcessor creates the appropriate processor based on command mode -func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext, output io.Writer, queryStr string) (fs.LineProcessor, bool) { +// createProcessor creates the appropriate processor based on command mode +func (r *readCommand) createProcessor(re regex.Regex, ltx lcontext.LContext, output io.Writer, queryStr string) (fs.LineProcessor, bool) { hostname := r.server.hostname // Use server hostname plain := r.server.plain // Use actual plain mode from server - noColor := false // Enable colors by default in channelless mode + noColor := false // Enable colors by default // If there's an existing aggregate (from a 'map' command), we need to feed data to it // Create a lines channel and connect it to the aggregate if r.server.aggregate != nil { - dlog.Server.Debug("Using existing aggregate, creating bridge processor for channelless mode") + dlog.Server.Debug("Using existing aggregate, creating bridge processor") // Create a lines channel for the aggregate with larger buffer linesCh := make(chan *line.Line, 1000) // Connect the lines channel to the aggregate |
