diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-17 14:57:41 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-17 14:57:41 +0300 |
| commit | c9c037275e4a5f2c8c87d736f3521c0817770798 (patch) | |
| tree | 80911b418f51dfc903dd74e769f6b2120b3fdee3 | |
| parent | b2cb4ca0563cc73af20460fe3b319263a96a6989 (diff) | |
Fix channelless mode for DTail operations
- Exclude TailClient operations from channelless processing to ensure proper real-time file monitoring
- Add comprehensive MapReduce detection for both cat and tail commands with MAPREDUCE patterns and noop regex
- Add IsNoop() method to Regex type for proper noop regex detection in CSV logformat operations
- Update build instructions and testing guidance in CLAUDE.md
All integration tests now pass with channelless mode enabled.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
| -rw-r--r-- | CLAUDE.md | 4 | ||||
| -rw-r--r-- | internal/regex/regex.go | 5 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 23 |
3 files changed, 29 insertions, 3 deletions
@@ -7,7 +7,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ### Building ```bash # Build all binaries -make build +make clean; make build # Build individual binaries make dserver # Server daemon @@ -33,6 +33,8 @@ make clean build && DTAIL_INTEGRATION_TEST_RUN_MODE=yes go test -v ./integration go clean -testcache ``` +Before deciding work is done, ensure that all tests pass (including integration tests) and that the code is well-documented. Before testing, always rebuild all binaries. + ### Code Quality ```bash # Run go vet on all packages diff --git a/internal/regex/regex.go b/internal/regex/regex.go index eb6e1b3..b2ea201 100644 --- a/internal/regex/regex.go +++ b/internal/regex/regex.go @@ -23,6 +23,11 @@ func (r Regex) String() string { r.regexStr, r.flags, r.initialized, r.re == nil) } +// IsNoop returns true if this regex is a noop regex +func (r Regex) IsNoop() bool { + return len(r.flags) > 0 && r.flags[0] == Noop +} + // NewNoop is a noop regex (doing nothing). func NewNoop() Regex { return Regex{ diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 17054df..89bb757 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -48,13 +48,19 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, } // Check if channelless mode is enabled - useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" + // Note: MapReduce operations require the full channel-based aggregation infrastructure + // Note: Tail operations require continuous monitoring and real-time streaming + isMapReduceCmd := r.mode == omode.MapClient || r.isMapReduceCommand(re) + isTailCmd := r.mode == omode.TailClient + useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" && !isMapReduceCmd && !isTailCmd if useChannelless { - dlog.Server.Debug("Using channelless processing mode") + dlog.Server.Debug("Using channelless processing mode for mode:", r.mode) r.startChannelless(ctx, ltx, args, re, retries) return } + + dlog.Server.Debug("Using channel-based processing mode for mode:", r.mode) // In serverless mode, can also read data from pipe // e.g.: grep foo bar.log | dmap 'from STATS select ...' @@ -335,6 +341,19 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo } } +// isMapReduceCommand checks if this is a command that's part of a MapReduce operation +func (r *readCommand) isMapReduceCommand(re regex.Regex) bool { + // Only cat and tail commands can be part of MapReduce operations + if r.mode != omode.CatClient && r.mode != omode.TailClient { + return false + } + + // Check if the regex contains MAPREDUCE pattern OR if it's a noop regex + // (noop regex is used for CSV logformat in MapReduce operations) + pattern := re.String() + return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop() +} + // createChannellessProcessor creates the appropriate processor based on command mode func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext) fs.LineProcessor { hostname := r.server.hostname // Use server hostname |
