summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand_server.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-05 08:50:33 +0200
committerPaul Buetow <paul@buetow.org>2026-03-05 08:50:33 +0200
commit5d1b9f1062d38c301c0995ec6da980bdf5e48332 (patch)
tree81e1a8963ea66cf06164e89beb6cd2da0ee325f7 /internal/server/handlers/readcommand_server.go
parentbb46cfbccea301721fb93485ea7169f5841feda3 (diff)
Improve lint/vet reliability and refactor client runtime/bootstrap
Diffstat (limited to 'internal/server/handlers/readcommand_server.go')
-rw-r--r--internal/server/handlers/readcommand_server.go65
1 files changed, 60 insertions, 5 deletions
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 650dcf2..6d7a095 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -8,25 +8,40 @@ import (
"github.com/mimecast/dtail/internal/mapr/server"
)
-type readCommandServer interface {
+type readCommandContext interface {
LogContext() interface{}
- SendServerMessage(message string)
+}
+
+type readCommandFiles interface {
CanReadFile(path string) bool
- ServerMessagesChannel() chan string
CatLimiter() chan struct{}
TailLimiter() chan struct{}
+}
+
+type readCommandMessages interface {
+ SendServerMessage(message string)
+ ServerMessagesChannel() chan string
Hostname() string
PlainOutput() bool
Serverless() bool
- TurboBoostDisabled() bool
- HasRegularAggregate() bool
RegisterAggregateLines(lines chan *line.Line)
SharedLinesChannel() chan *line.Line
+}
+
+type readCommandAggregates interface {
+ HasRegularAggregate() bool
TurboAggregate() *server.TurboAggregate
+}
+
+type readCommandLifecycle interface {
AddPendingFiles(delta int32) int32
CompletePendingFile() (remaining int32, activeCommands int32)
PendingAndActive() (pending int32, activeCommands int32)
TriggerShutdown()
+}
+
+type readCommandTurbo interface {
+ TurboBoostDisabled() bool
IsTurboMode() bool
EnableTurboMode()
HasTurboEOF() bool
@@ -35,6 +50,9 @@ type readCommandServer interface {
GetTurboChannel() chan []byte
TurboChannelLen() int
WaitForTurboEOFAck(timeout time.Duration) bool
+}
+
+type readCommandTiming interface {
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
AggregateLinesChannelBufferSize() int
@@ -45,90 +63,120 @@ type readCommandServer interface {
TurboEOFAckTimeout() time.Duration
}
+type readCommandServer interface {
+ readCommandContext
+ readCommandFiles
+ readCommandMessages
+ readCommandAggregates
+ readCommandLifecycle
+ readCommandTurbo
+ readCommandTiming
+}
+
var _ readCommandServer = (*ServerHandler)(nil)
+// LogContext returns the logger context associated with the current user/session.
func (h *ServerHandler) LogContext() interface{} {
return h.user
}
+// SendServerMessage sends a formatted server message to the client.
func (h *ServerHandler) SendServerMessage(message string) {
h.sendln(h.serverMessages, message)
}
+// CanReadFile reports whether the current user can read the given path.
func (h *ServerHandler) CanReadFile(path string) bool {
return h.user.HasFilePermission(path, "readfiles")
}
+// ServerMessagesChannel returns the server message channel.
func (h *ServerHandler) ServerMessagesChannel() chan string {
return h.serverMessages
}
+// CatLimiter returns the concurrency limiter for cat/grep style reads.
func (h *ServerHandler) CatLimiter() chan struct{} {
return h.catLimiter
}
+// TailLimiter returns the concurrency limiter for tail reads.
func (h *ServerHandler) TailLimiter() chan struct{} {
return h.tailLimiter
}
+// Hostname returns the short hostname used for response formatting.
func (h *ServerHandler) Hostname() string {
return h.hostname
}
+// PlainOutput reports whether plain output mode is enabled.
func (h *ServerHandler) PlainOutput() bool {
return h.plain
}
+// Serverless reports whether the current session is running in serverless mode.
func (h *ServerHandler) Serverless() bool {
return h.serverless
}
+// TurboBoostDisabled reports whether turbo mode is disabled by configuration.
func (h *ServerHandler) TurboBoostDisabled() bool {
return h.serverCfg.TurboBoostDisable
}
+// HasRegularAggregate reports whether the regular map-reduce aggregate is active.
func (h *ServerHandler) HasRegularAggregate() bool {
return h.aggregate != nil
}
+// RegisterAggregateLines attaches a file line channel to the active aggregate.
func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) {
if h.aggregate != nil {
h.aggregate.NextLinesCh <- lines
}
}
+// SharedLinesChannel returns the shared outbound line channel.
func (h *ServerHandler) SharedLinesChannel() chan *line.Line {
return h.lines
}
+// TurboAggregate returns the turbo aggregate if enabled for the session.
func (h *ServerHandler) TurboAggregate() *server.TurboAggregate {
return h.turboAggregate
}
+// AddPendingFiles increments or decrements the pending file counter.
func (h *ServerHandler) AddPendingFiles(delta int32) int32 {
return atomic.AddInt32(&h.pendingFiles, delta)
}
+// CompletePendingFile marks one file as completed and returns pending/active counters.
func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) {
remaining = atomic.AddInt32(&h.pendingFiles, -1)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return remaining, activeCommands
}
+// PendingAndActive returns the current pending file and active command counts.
func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) {
pending = atomic.LoadInt32(&h.pendingFiles)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return pending, activeCommands
}
+// TriggerShutdown starts the handler shutdown sequence.
func (h *ServerHandler) TriggerShutdown() {
h.shutdown()
}
+// FlushTurboData drains pending turbo data to the underlying writer.
func (h *ServerHandler) FlushTurboData() {
h.flushTurboData()
}
+// TurboEOFAckTimeout returns the timeout used while waiting for turbo EOF ACK.
func (h *ServerHandler) TurboEOFAckTimeout() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second)
}
@@ -147,22 +195,27 @@ func positiveIntOrDefault(value int, fallback int) int {
return value
}
+// ReadGlobRetryInterval returns the retry interval for glob expansion failures.
func (h *ServerHandler) ReadGlobRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second)
}
+// ReadRetryInterval returns the retry interval for repeated file reads.
func (h *ServerHandler) ReadRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
}
+// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size.
func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
}
+// TurboDataTransmissionDelay returns the delay used after turbo flushes.
func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond)
}
+// TurboEOFWaitDuration returns the wait duration used before signaling turbo EOF.
func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond)
if fileCount <= 10 {
@@ -178,10 +231,12 @@ func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
return wait
}
+// ShutdownTurboSerializeWait returns the wait before final turbo shutdown checks.
func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond)
}
+// ShutdownIdleRecheckWait returns the wait used for the final idle recheck.
func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond)
}