1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
package handlers
import (
"sync/atomic"
"time"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/mapr/server"
)
type readCommandContext interface {
LogContext() interface{}
}
type readCommandFiles interface {
CanReadFile(path string) bool
CatLimiter() chan struct{}
TailLimiter() chan struct{}
}
type readCommandMessages interface {
SendServerMessage(message string)
ServerMessagesChannel() chan string
Hostname() string
PlainOutput() bool
Serverless() bool
RegisterAggregateLines(lines chan *line.Line)
SharedLinesChannel() chan *line.Line
}
type readCommandAggregates interface {
HasRegularAggregate() bool
TurboAggregate() *server.TurboAggregate
}
type readCommandLifecycle interface {
AddPendingFiles(delta int32) int32
CompletePendingFile() (remaining int32, activeCommands int32)
PendingAndActive() (pending int32, activeCommands int32)
ActiveSessionGeneration() uint64
TriggerShutdown()
}
type readCommandTurbo interface {
TurboBoostDisabled() bool
IsTurboMode() bool
EnableTurboMode()
HasTurboEOF() bool
FlushTurboData()
SignalTurboEOF()
GetTurboChannel() chan []byte
TurboChannelLen() int
WaitForTurboEOFAck(timeout time.Duration) bool
}
type readCommandTiming interface {
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
MaxLineLength() int
AggregateLinesChannelBufferSize() int
TurboDataTransmissionDelay() time.Duration
TurboEOFWaitDuration(fileCount int) time.Duration
ShutdownTurboSerializeWait() time.Duration
ShutdownIdleRecheckWait() time.Duration
TurboEOFAckTimeout() time.Duration
}
type readCommandServer interface {
readCommandContext
readCommandFiles
readCommandMessages
readCommandAggregates
readCommandLifecycle
readCommandTurbo
readCommandTiming
}
var _ readCommandServer = (*ServerHandler)(nil)
// LogContext returns the logger context associated with the current user/session.
func (h *ServerHandler) LogContext() interface{} {
return h.user
}
// SendServerMessage sends a formatted server message to the client.
func (h *ServerHandler) SendServerMessage(message string) {
h.sendln(h.serverMessages, message)
}
// CanReadFile reports whether the current user can read the given path.
func (h *ServerHandler) CanReadFile(path string) bool {
return h.user.HasFilePermission(path, "readfiles")
}
// ServerMessagesChannel returns the server message channel.
func (h *ServerHandler) ServerMessagesChannel() chan string {
return h.serverMessages
}
// CatLimiter returns the concurrency limiter for cat/grep style reads.
func (h *ServerHandler) CatLimiter() chan struct{} {
return h.catLimiter
}
// TailLimiter returns the concurrency limiter for tail reads.
func (h *ServerHandler) TailLimiter() chan struct{} {
return h.tailLimiter
}
// Hostname returns the short hostname used for response formatting.
func (h *ServerHandler) Hostname() string {
return h.hostname
}
// PlainOutput reports whether plain output mode is enabled.
func (h *ServerHandler) PlainOutput() bool {
return h.plain
}
// Serverless reports whether the current session is running in serverless mode.
func (h *ServerHandler) Serverless() bool {
return h.serverless
}
// TurboBoostDisabled reports whether turbo mode is disabled by configuration.
func (h *ServerHandler) TurboBoostDisabled() bool {
return h.serverCfg.TurboBoostDisable
}
// HasRegularAggregate reports whether the regular map-reduce aggregate is active.
func (h *ServerHandler) HasRegularAggregate() bool {
return h.aggregate != nil
}
// RegisterAggregateLines attaches a file line channel to the active aggregate.
func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) {
if h.aggregate != nil {
h.aggregate.NextLinesCh <- lines
}
}
// SharedLinesChannel returns the shared outbound line channel.
func (h *ServerHandler) SharedLinesChannel() chan *line.Line {
return h.lines
}
// TurboAggregate returns the turbo aggregate if enabled for the session.
func (h *ServerHandler) TurboAggregate() *server.TurboAggregate {
return h.turboAggregate
}
// AddPendingFiles increments or decrements the pending file counter.
func (h *ServerHandler) AddPendingFiles(delta int32) int32 {
return atomic.AddInt32(&h.pendingFiles, delta)
}
// CompletePendingFile marks one file as completed and returns pending/active counters.
func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) {
remaining = atomic.AddInt32(&h.pendingFiles, -1)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return remaining, activeCommands
}
// PendingAndActive returns the current pending file and active command counts.
func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) {
pending = atomic.LoadInt32(&h.pendingFiles)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return pending, activeCommands
}
// ActiveSessionGeneration returns the currently active interactive session generation.
func (h *ServerHandler) ActiveSessionGeneration() uint64 {
return h.sessionState.currentGeneration()
}
// TriggerShutdown starts the handler shutdown sequence.
func (h *ServerHandler) TriggerShutdown() {
h.shutdown()
}
// FlushTurboData drains pending turbo data to the underlying writer.
func (h *ServerHandler) FlushTurboData() {
h.flushTurboData()
}
// TurboEOFAckTimeout returns the timeout used while waiting for turbo EOF ACK.
func (h *ServerHandler) TurboEOFAckTimeout() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second)
}
func durationFromMilliseconds(value int, fallback time.Duration) time.Duration {
if value <= 0 {
return fallback
}
return time.Duration(value) * time.Millisecond
}
func positiveIntOrDefault(value int, fallback int) int {
if value <= 0 {
return fallback
}
return value
}
// ReadGlobRetryInterval returns the retry interval for glob expansion failures.
func (h *ServerHandler) ReadGlobRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second)
}
// ReadRetryInterval returns the retry interval for repeated file reads.
func (h *ServerHandler) ReadRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
}
// MaxLineLength returns the configured max line length for file readers.
func (h *ServerHandler) MaxLineLength() int {
return positiveIntOrDefault(h.serverCfg.MaxLineLength, 1024*1024)
}
// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size.
func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
}
// TurboDataTransmissionDelay returns the delay used after turbo flushes.
func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond)
}
// TurboEOFWaitDuration returns the wait duration used before signaling turbo EOF.
func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond)
if fileCount <= 10 {
return baseWait
}
perFileWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitPerFileMs, 10*time.Millisecond)
maxWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitMaxMs, 2*time.Second)
wait := time.Duration(fileCount) * perFileWait
if wait > maxWait {
return maxWait
}
return wait
}
// ShutdownTurboSerializeWait returns the wait before final turbo shutdown checks.
func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond)
}
// ShutdownIdleRecheckWait returns the wait used for the final idle recheck.
func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond)
}
func (h *ServerHandler) turboManagerConfig() turboManagerConfig {
return turboManagerConfig{
channelBufferSize: positiveIntOrDefault(h.serverCfg.TurboChannelBufferSize, defaultTurboChannelBufferSize),
flushTimeout: durationFromMilliseconds(h.serverCfg.TurboFlushTimeoutMs, defaultTurboFlushTimeout),
flushPollInterval: durationFromMilliseconds(h.serverCfg.TurboFlushPollIntervalMs, defaultTurboFlushPollInterval),
readRetryInterval: durationFromMilliseconds(h.serverCfg.TurboReadRetryIntervalMs, defaultTurboReadRetryInterval),
eofAckQuietPeriod: durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, defaultTurboEOFAckQuietPeriod),
}
}
|