1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
|
package handlers
import (
"bytes"
"strings"
"testing"
"github.com/mimecast/dtail/internal/protocol"
)
// TestDirectTurboWriter_ServerlessPlain tests plain serverless mode output
func TestDirectTurboWriter_ServerlessPlain(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", true, true)
// Write a line without trailing newline
err := w.WriteLineData([]byte("test line"), 1, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
// Flush to get output
err = w.Flush()
if err != nil {
t.Fatalf("Flush failed: %v", err)
}
// In plain serverless mode, output should be just the content with newline
expected := "test line\n"
if buf.String() != expected {
t.Errorf("Expected %q, got %q", expected, buf.String())
}
// Check stats
lines, bytesWritten := w.Stats()
if lines != 1 {
t.Errorf("Expected 1 line written, got %d", lines)
}
if bytesWritten == 0 {
t.Error("Expected non-zero bytes written")
}
}
// TestDirectTurboWriter_ServerlessPlainWithNewline tests that existing newlines are preserved
func TestDirectTurboWriter_ServerlessPlainWithNewline(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", true, true)
// Write a line with trailing newline
err := w.WriteLineData([]byte("test line\n"), 1, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
err = w.Flush()
if err != nil {
t.Fatalf("Flush failed: %v", err)
}
// Should not add extra newline
expected := "test line\n"
if buf.String() != expected {
t.Errorf("Expected %q, got %q", expected, buf.String())
}
}
// TestDirectTurboWriter_ServerlessColored tests colored serverless mode output
// Note: Skipped because it requires color config initialization which is complex to set up in tests
func TestDirectTurboWriter_ServerlessColored(t *testing.T) {
t.Skip("Requires color config initialization - tested via integration tests")
}
// TestDirectTurboWriter_NetworkPlain tests plain network mode output
func TestDirectTurboWriter_NetworkPlain(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", true, false)
err := w.WriteLineData([]byte("test line"), 1, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
err = w.Flush()
if err != nil {
t.Fatalf("Flush failed: %v", err)
}
// In plain network mode, output should be just the content with newline
expected := "test line\n"
if buf.String() != expected {
t.Errorf("Expected %q, got %q", expected, buf.String())
}
}
// TestDirectTurboWriter_NetworkFormatted tests formatted network mode output
func TestDirectTurboWriter_NetworkFormatted(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", false, false)
err := w.WriteLineData([]byte("test line"), 99, "myfile.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
err = w.Flush()
if err != nil {
t.Fatalf("Flush failed: %v", err)
}
output := buf.String()
// In formatted network mode, output should have protocol structure
if !strings.HasPrefix(output, "REMOTE") {
t.Errorf("Expected output to start with REMOTE, got %q", output)
}
if !strings.Contains(output, "testhost") {
t.Errorf("Expected output to contain hostname, got %q", output)
}
if !strings.Contains(output, "99") {
t.Errorf("Expected output to contain line number 99, got %q", output)
}
if !strings.Contains(output, "myfile.log") {
t.Errorf("Expected output to contain source ID, got %q", output)
}
// Should end with message delimiter
if output[len(output)-1] != protocol.MessageDelimiter {
t.Errorf("Expected output to end with message delimiter, got %q", output)
}
}
// TestDirectTurboWriter_WriteServerMessage tests server message writing
func TestDirectTurboWriter_WriteServerMessage(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", false, false)
err := w.WriteServerMessage("Hello from server")
if err != nil {
t.Fatalf("WriteServerMessage failed: %v", err)
}
output := buf.String()
if !strings.HasPrefix(output, "SERVER") {
t.Errorf("Expected output to start with SERVER, got %q", output)
}
if !strings.Contains(output, "testhost") {
t.Errorf("Expected output to contain hostname, got %q", output)
}
if !strings.Contains(output, "Hello from server") {
t.Errorf("Expected output to contain message, got %q", output)
}
}
// TestDirectTurboWriter_WriteServerMessage_Serverless tests that server messages are skipped in serverless mode
func TestDirectTurboWriter_WriteServerMessage_Serverless(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", false, true)
err := w.WriteServerMessage("Hello from server")
if err != nil {
t.Fatalf("WriteServerMessage failed: %v", err)
}
// In serverless mode, server messages should be skipped
if buf.Len() != 0 {
t.Errorf("Expected no output in serverless mode, got %q", buf.String())
}
}
// TestDirectTurboWriter_WriteServerMessage_HiddenMessage tests hidden message handling
func TestDirectTurboWriter_WriteServerMessage_HiddenMessage(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", false, false)
err := w.WriteServerMessage(".hidden")
if err != nil {
t.Fatalf("WriteServerMessage failed: %v", err)
}
output := buf.String()
// Hidden messages (starting with .) should be written directly
if !strings.HasPrefix(output, ".hidden") {
t.Errorf("Expected output to start with .hidden, got %q", output)
}
// Should NOT have SERVER prefix
if strings.Contains(output, "SERVER") {
t.Errorf("Hidden message should not have SERVER prefix, got %q", output)
}
}
// TestDirectTurboWriter_MultipleLines tests writing multiple lines
func TestDirectTurboWriter_MultipleLines(t *testing.T) {
var buf bytes.Buffer
w := NewDirectTurboWriter(&buf, "testhost", true, true)
for i := uint64(1); i <= 5; i++ {
err := w.WriteLineData([]byte("line content"), i, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed on line %d: %v", i, err)
}
}
err := w.Flush()
if err != nil {
t.Fatalf("Flush failed: %v", err)
}
lines, _ := w.Stats()
if lines != 5 {
t.Errorf("Expected 5 lines written, got %d", lines)
}
// Count actual lines in output
outputLines := strings.Count(buf.String(), "\n")
if outputLines != 5 {
t.Errorf("Expected 5 lines in output, got %d", outputLines)
}
}
// TestTurboChannelWriter_WriteLineData tests channel writer line data
func TestTurboChannelWriter_WriteLineData(t *testing.T) {
ch := make(chan []byte, 10)
w := NewTurboChannelWriter(ch, "testhost", false, false)
err := w.WriteLineData([]byte("test line"), 1, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
// Check that data was sent to channel
select {
case data := <-ch:
output := string(data)
if !strings.Contains(output, "REMOTE") {
t.Errorf("Expected output to contain REMOTE, got %q", output)
}
if !strings.Contains(output, "test line") {
t.Errorf("Expected output to contain line content, got %q", output)
}
default:
t.Error("Expected data in channel, got none")
}
}
// TestTurboChannelWriter_ChannelFull tests behavior when channel is full
func TestTurboChannelWriter_ChannelFull(t *testing.T) {
ch := make(chan []byte, 1)
w := NewTurboChannelWriter(ch, "testhost", true, false)
// Fill the channel
err := w.WriteLineData([]byte("first"), 1, "source.log")
if err != nil {
t.Fatalf("First WriteLineData failed: %v", err)
}
// Next write should fail (channel full)
err = w.WriteLineData([]byte("second"), 2, "source.log")
if err == nil {
t.Error("Expected error when channel is full")
}
if !strings.Contains(err.Error(), "channel full") {
t.Errorf("Expected 'channel full' error, got %v", err)
}
}
// TestTurboChannelWriter_PlainServerless tests plain serverless mode
func TestTurboChannelWriter_PlainServerless(t *testing.T) {
ch := make(chan []byte, 10)
w := NewTurboChannelWriter(ch, "testhost", true, true)
err := w.WriteLineData([]byte("test line"), 1, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
select {
case data := <-ch:
output := string(data)
// In plain serverless mode, should NOT have REMOTE prefix
if strings.Contains(output, "REMOTE") {
t.Errorf("Plain serverless should not have REMOTE prefix, got %q", output)
}
if !strings.Contains(output, "test line") {
t.Errorf("Expected output to contain line content, got %q", output)
}
default:
t.Error("Expected data in channel, got none")
}
}
// TestTurboChannelWriter_WriteServerMessage tests server message handling
func TestTurboChannelWriter_WriteServerMessage(t *testing.T) {
ch := make(chan []byte, 10)
w := NewTurboChannelWriter(ch, "testhost", false, false)
err := w.WriteServerMessage("Server says hello")
if err != nil {
t.Fatalf("WriteServerMessage failed: %v", err)
}
select {
case data := <-ch:
output := string(data)
if !strings.Contains(output, "SERVER") {
t.Errorf("Expected output to contain SERVER, got %q", output)
}
if !strings.Contains(output, "Server says hello") {
t.Errorf("Expected output to contain message, got %q", output)
}
default:
t.Error("Expected data in channel, got none")
}
}
// TestTurboChannelWriter_WriteServerMessage_Serverless tests server messages skipped in serverless
func TestTurboChannelWriter_WriteServerMessage_Serverless(t *testing.T) {
ch := make(chan []byte, 10)
w := NewTurboChannelWriter(ch, "testhost", false, true)
err := w.WriteServerMessage("Server says hello")
if err != nil {
t.Fatalf("WriteServerMessage failed: %v", err)
}
// Channel should be empty in serverless mode
select {
case <-ch:
t.Error("Expected no data in channel for serverless mode")
default:
// Expected
}
}
// TestTurboChannelWriter_Stats tests statistics tracking
func TestTurboChannelWriter_Stats(t *testing.T) {
ch := make(chan []byte, 10)
w := NewTurboChannelWriter(ch, "testhost", true, true)
for i := uint64(1); i <= 3; i++ {
err := w.WriteLineData([]byte("line"), i, "source.log")
if err != nil {
t.Fatalf("WriteLineData failed: %v", err)
}
}
lines, bytesWritten := w.Stats()
if lines != 3 {
t.Errorf("Expected 3 lines, got %d", lines)
}
if bytesWritten == 0 {
t.Error("Expected non-zero bytes written")
}
}
// TestDirectLineProcessor tests the line processor wrapper
// Note: Skipped because DirectLineProcessor uses dlog.Server which requires initialization
func TestDirectLineProcessor(t *testing.T) {
t.Skip("Requires dlog initialization - tested via integration tests")
}
// TestDirectLineProcessor_Close tests the close method
// Note: Skipped because DirectLineProcessor uses dlog.Server which requires initialization
func TestDirectLineProcessor_Close(t *testing.T) {
t.Skip("Requires dlog initialization - tested via integration tests")
}
|