summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-17 14:57:41 +0300
committerPaul Buetow <paul@buetow.org>2025-06-17 14:57:41 +0300
commitc9c037275e4a5f2c8c87d736f3521c0817770798 (patch)
tree80911b418f51dfc903dd74e769f6b2120b3fdee3
parentb2cb4ca0563cc73af20460fe3b319263a96a6989 (diff)
Fix channelless mode for DTail operations
- Exclude TailClient operations from channelless processing to ensure proper real-time file monitoring - Add comprehensive MapReduce detection for both cat and tail commands with MAPREDUCE patterns and noop regex - Add IsNoop() method to Regex type for proper noop regex detection in CSV logformat operations - Update build instructions and testing guidance in CLAUDE.md All integration tests now pass with channelless mode enabled. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
-rw-r--r--CLAUDE.md4
-rw-r--r--internal/regex/regex.go5
-rw-r--r--internal/server/handlers/readcommand.go23
3 files changed, 29 insertions, 3 deletions
diff --git a/CLAUDE.md b/CLAUDE.md
index e2353af..3480485 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -7,7 +7,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
### Building
```bash
# Build all binaries
-make build
+make clean; make build
# Build individual binaries
make dserver # Server daemon
@@ -33,6 +33,8 @@ make clean build && DTAIL_INTEGRATION_TEST_RUN_MODE=yes go test -v ./integration
go clean -testcache
```
+Before deciding work is done, ensure that all tests pass (including integration tests) and that the code is well-documented. Before testing, always rebuild all binaries.
+
### Code Quality
```bash
# Run go vet on all packages
diff --git a/internal/regex/regex.go b/internal/regex/regex.go
index eb6e1b3..b2ea201 100644
--- a/internal/regex/regex.go
+++ b/internal/regex/regex.go
@@ -23,6 +23,11 @@ func (r Regex) String() string {
r.regexStr, r.flags, r.initialized, r.re == nil)
}
+// IsNoop returns true if this regex is a noop regex
+func (r Regex) IsNoop() bool {
+ return len(r.flags) > 0 && r.flags[0] == Noop
+}
+
// NewNoop is a noop regex (doing nothing).
func NewNoop() Regex {
return Regex{
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 17054df..89bb757 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -48,13 +48,19 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
}
// Check if channelless mode is enabled
- useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes"
+ // Note: MapReduce operations require the full channel-based aggregation infrastructure
+ // Note: Tail operations require continuous monitoring and real-time streaming
+ isMapReduceCmd := r.mode == omode.MapClient || r.isMapReduceCommand(re)
+ isTailCmd := r.mode == omode.TailClient
+ useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" && !isMapReduceCmd && !isTailCmd
if useChannelless {
- dlog.Server.Debug("Using channelless processing mode")
+ dlog.Server.Debug("Using channelless processing mode for mode:", r.mode)
r.startChannelless(ctx, ltx, args, re, retries)
return
}
+
+ dlog.Server.Debug("Using channel-based processing mode for mode:", r.mode)
// In serverless mode, can also read data from pipe
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
@@ -335,6 +341,19 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo
}
}
+// isMapReduceCommand checks if this is a command that's part of a MapReduce operation
+func (r *readCommand) isMapReduceCommand(re regex.Regex) bool {
+ // Only cat and tail commands can be part of MapReduce operations
+ if r.mode != omode.CatClient && r.mode != omode.TailClient {
+ return false
+ }
+
+ // Check if the regex contains MAPREDUCE pattern OR if it's a noop regex
+ // (noop regex is used for CSV logformat in MapReduce operations)
+ pattern := re.String()
+ return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop()
+}
+
// createChannellessProcessor creates the appropriate processor based on command mode
func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext) fs.LineProcessor {
hostname := r.server.hostname // Use server hostname