summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
committerPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
commitf6e23930da2900c43a5389a2e7d1e38d8221a76f (patch)
tree3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal
parent1fc24f9affed5128702e4de80572cac8c82d399e (diff)
Refactor server-side config singleton reads
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/connectors/serverless.go6
-rw-r--r--internal/config/server.go20
-rw-r--r--internal/io/fs/catfile.go5
-rw-r--r--internal/io/fs/readfile.go19
-rw-r--r--internal/io/fs/readfile_processor.go3
-rw-r--r--internal/io/fs/readfile_processor_optimized.go7
-rw-r--r--internal/io/fs/readfile_processor_test.go39
-rw-r--r--internal/io/fs/tailfile.go5
-rw-r--r--internal/mapr/server/aggregate.go13
-rw-r--r--internal/mapr/server/parsername.go18
-rw-r--r--internal/mapr/server/turbo_aggregate.go13
-rw-r--r--internal/mapr/server/turbo_aggregate_test.go108
-rw-r--r--internal/server/handlers/mapcommand.go8
-rw-r--r--internal/server/handlers/readcommand.go4
-rw-r--r--internal/server/handlers/readcommand_server.go6
-rw-r--r--internal/server/server.go6
-rw-r--r--internal/ssh/server/hostkey.go16
-rw-r--r--internal/ssh/server/publickeycallback.go15
-rw-r--r--internal/user/server/user.go17
19 files changed, 188 insertions, 140 deletions
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index f4c4e9e..cedf37f 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -60,7 +60,11 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc,
func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error {
dlog.Client.Debug("Creating server handler for a serverless session")
- user, err := user.New(s.userName, s.Server())
+ var permissionLookup user.PermissionLookup
+ if config.Server != nil {
+ permissionLookup = config.Server.UserPermissions
+ }
+ user, err := user.New(s.userName, s.Server(), permissionLookup)
if err != nil {
return err
}
diff --git a/internal/config/server.go b/internal/config/server.go
index 13ebde8..97c9a9d 100644
--- a/internal/config/server.go
+++ b/internal/config/server.go
@@ -145,10 +145,14 @@ func newDefaultServerConfig() *ServerConfig {
}
}
-// ServerUserPermissions retrieves the permission set of a given user.
-func ServerUserPermissions(userName string) (permissions []string, err error) {
- permissions = Server.Permissions.Default
- if p, ok := Server.Permissions.Users[userName]; ok {
+// UserPermissions retrieves the permission set of a given user.
+func (c *ServerConfig) UserPermissions(userName string) (permissions []string, err error) {
+ if c == nil {
+ return nil, errors.New("missing server config")
+ }
+
+ permissions = c.Permissions.Default
+ if p, ok := c.Permissions.Users[userName]; ok {
permissions = p
}
if len(permissions) == 0 {
@@ -156,3 +160,11 @@ func ServerUserPermissions(userName string) (permissions []string, err error) {
}
return
}
+
+// ServerUserPermissions retrieves the permission set of a given user.
+func ServerUserPermissions(userName string) (permissions []string, err error) {
+ if Server == nil {
+ return nil, errors.New("missing server config")
+ }
+ return Server.UserPermissions(userName)
+}
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
index e4676f3..1f35a95 100644
--- a/internal/io/fs/catfile.go
+++ b/internal/io/fs/catfile.go
@@ -6,7 +6,9 @@ type CatFile struct {
}
// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile {
+func NewCatFile(filePath string, globID string, serverMessages chan<- string,
+ maxLineLength int) CatFile {
+
return CatFile{
readFile: readFile{
filePath: filePath,
@@ -15,6 +17,7 @@ func NewCatFile(filePath string, globID string, serverMessages chan<- string) Ca
retry: false,
canSkipLines: false,
seekEOF: false,
+ maxLineLength: maxLineLength,
},
}
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index ee486bc..47a999d 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -13,7 +13,6 @@ import (
"sync"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -26,9 +25,10 @@ import (
type readStatus int
const (
- nothing readStatus = iota
- abortReading readStatus = iota
- continueReading readStatus = iota
+ nothing readStatus = iota
+ abortReading readStatus = iota
+ continueReading readStatus = iota
+ defaultMaxLineLength = 1024 * 1024
)
// Used to tail and filter a local log file.
@@ -49,6 +49,8 @@ type readFile struct {
seekEOF bool
// Warned already about a long line.
warnedAboutLongLine bool
+ // Maximum line length before a line is split.
+ maxLineLength int
}
// String returns the string representation of the readFile
@@ -72,6 +74,13 @@ func (f readFile) Retry() bool {
return f.retry
}
+func (f *readFile) lineLimit() int {
+ if f.maxLineLength <= 0 {
+ return defaultMaxLineLength
+ }
+ return f.maxLineLength
+}
+
// Start tailing a log file.
func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
lines chan<- *line.Line, re regex.Regex) error {
@@ -328,7 +337,7 @@ func (f *readFile) handleReadByte(ctx context.Context, b byte,
return abortReading, message
}
default:
- if message.Len() >= config.Server.MaxLineLength {
+ if message.Len() >= f.lineLimit() {
if !f.warnedAboutLongLine {
f.serverMessages <- dlog.Common.Warn(f.filePath,
"Long log line, splitting into multiple lines") + "\n"
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
index c68048d..8f56bdd 100644
--- a/internal/io/fs/readfile_processor.go
+++ b/internal/io/fs/readfile_processor.go
@@ -8,7 +8,6 @@ import (
"os"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -112,7 +111,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
return continueReading
default:
- if message.Len() >= config.Server.MaxLineLength {
+ if message.Len() >= f.lineLimit() {
if !f.warnedAboutLongLine {
f.serverMessages <- dlog.Common.Warn(f.filePath,
"Long log line, splitting into multiple lines") + "\n"
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 6447f89..2e880e7 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -8,7 +8,6 @@ import (
"os"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -106,7 +105,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
return 0, nil, nil
}
- maxLineLen := config.Server.MaxLineLength
+ maxLineLen := f.lineLimit()
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
@@ -154,7 +153,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
return 0, nil, nil
}
- maxLineLen := config.Server.MaxLineLength
+ maxLineLen := f.lineLimit()
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
@@ -312,7 +311,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
partialLine.Write(data)
// Check if line is too long
- if partialLine.Len() >= config.Server.MaxLineLength {
+ if partialLine.Len() >= f.lineLimit() {
if !f.warnedAboutLongLine {
f.serverMessages <- dlog.Common.Warn(f.filePath,
"Long log line, splitting into multiple lines") + "\n"
diff --git a/internal/io/fs/readfile_processor_test.go b/internal/io/fs/readfile_processor_test.go
index 2e3cb80..ae34d77 100644
--- a/internal/io/fs/readfile_processor_test.go
+++ b/internal/io/fs/readfile_processor_test.go
@@ -9,7 +9,6 @@ import (
"reflect"
"testing"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
@@ -41,12 +40,10 @@ func (p *captureProcessor) Close() error {
}
func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) {
- setServerConfigForProcessorTests(t)
-
filePath := writeProcessorTestFile(t, "alpha\nbeta\n")
re := regex.NewNoop()
- cat := NewCatFile(filePath, "glob-id", make(chan string, 1))
+ cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength)
processor := &captureProcessor{}
if err := cat.readFile.StartWithProcessorOptimized(
@@ -65,8 +62,6 @@ func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) {
}
func TestProcessorVariantsReturnOpenError(t *testing.T) {
- setServerConfigForProcessorTests(t)
-
re := regex.NewNoop()
missingFile := filepath.Join(t.TempDir(), "missing.log")
@@ -90,7 +85,7 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- cat := NewCatFile(missingFile, "glob-id", make(chan string, 1))
+ cat := NewCatFile(missingFile, "glob-id", make(chan string, 1), defaultMaxLineLength)
err := tt.start(&cat.readFile, context.Background(), lcontext.LContext{}, &captureProcessor{}, re)
if err == nil {
t.Fatalf("expected error for missing file")
@@ -100,13 +95,11 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) {
}
func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) {
- setServerConfigForProcessorTests(t)
-
filePath := writeProcessorTestFile(t, "alpha\nbeta\n")
re := regex.NewNoop()
expectedErr := errors.New("processor failure")
- cat := NewCatFile(filePath, "glob-id", make(chan string, 1))
+ cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength)
processor := &captureProcessor{
errAtLine: 1,
processErr: expectedErr,
@@ -123,16 +116,26 @@ func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) {
}
}
-func setServerConfigForProcessorTests(t *testing.T) {
- t.Helper()
+func TestStartWithProcessorOptimizedUsesInjectedMaxLineLength(t *testing.T) {
+ filePath := writeProcessorTestFile(t, "abcdef\n")
+ re := regex.NewNoop()
+
+ cat := NewCatFile(filePath, "glob-id", make(chan string, 1), 3)
+ processor := &captureProcessor{}
+
+ if err := cat.readFile.StartWithProcessorOptimized(
+ context.Background(),
+ lcontext.LContext{},
+ processor,
+ re,
+ ); err != nil {
+ t.Fatalf("optimized reader start failed: %v", err)
+ }
- previousServer := config.Server
- config.Server = &config.ServerConfig{
- MaxLineLength: 1024 * 1024,
+ want := []string{"abc", "def\n"}
+ if !reflect.DeepEqual(processor.lines, want) {
+ t.Fatalf("unexpected processed lines: got=%v want=%v", processor.lines, want)
}
- t.Cleanup(func() {
- config.Server = previousServer
- })
}
func writeProcessorTestFile(t *testing.T, content string) string {
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
index 7a40ac4..b2e9910 100644
--- a/internal/io/fs/tailfile.go
+++ b/internal/io/fs/tailfile.go
@@ -6,7 +6,9 @@ type TailFile struct {
}
// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile {
+func NewTailFile(filePath string, globID string, serverMessages chan<- string,
+ maxLineLength int) TailFile {
+
return TailFile{
readFile: readFile{
filePath: filePath,
@@ -15,6 +17,7 @@ func NewTailFile(filePath string, globID string, serverMessages chan<- string) T
retry: true,
canSkipLines: true,
seekEOF: true,
+ maxLineLength: maxLineLength,
},
}
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 98fe817..9a736a5 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -34,7 +34,7 @@ type Aggregate struct {
}
// NewAggregate return a new server side aggregator.
-func NewAggregate(queryStr string) (*Aggregate, error) {
+func NewAggregate(queryStr string, defaultLogFormat string) (*Aggregate, error) {
query, err := mapr.NewQuery(queryStr)
if err != nil {
return nil, err
@@ -46,16 +46,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
}
s := strings.Split(fqdn, ".")
- var parserName string
- switch query.LogFormat {
- case "":
- parserName = config.Server.MapreduceLogFormat
- if query.Table == "" {
- parserName = "generic"
- }
- default:
- parserName = query.LogFormat
- }
+ parserName := resolveParserName(query, defaultLogFormat)
dlog.Server.Info("Creating log format parser", parserName)
logParser, err := logformat.NewParser(parserName, query)
diff --git a/internal/mapr/server/parsername.go b/internal/mapr/server/parsername.go
new file mode 100644
index 0000000..c5fb866
--- /dev/null
+++ b/internal/mapr/server/parsername.go
@@ -0,0 +1,18 @@
+package server
+
+import "github.com/mimecast/dtail/internal/mapr"
+
+const defaultLogFormat = "default"
+
+func resolveParserName(query *mapr.Query, configuredLogFormat string) string {
+ if query.LogFormat != "" {
+ return query.LogFormat
+ }
+ if query.Table == "" {
+ return "generic"
+ }
+ if configuredLogFormat == "" {
+ return defaultLogFormat
+ }
+ return configuredLogFormat
+}
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go
index a317578..9b5afe7 100644
--- a/internal/mapr/server/turbo_aggregate.go
+++ b/internal/mapr/server/turbo_aggregate.go
@@ -57,7 +57,7 @@ type rawLine struct {
}
// NewTurboAggregate returns a new turbo mode aggregator.
-func NewTurboAggregate(queryStr string) (*TurboAggregate, error) {
+func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) {
query, err := mapr.NewQuery(queryStr)
if err != nil {
return nil, err
@@ -69,16 +69,7 @@ func NewTurboAggregate(queryStr string) (*TurboAggregate, error) {
}
s := strings.Split(fqdn, ".")
- var parserName string
- switch query.LogFormat {
- case "":
- parserName = config.Server.MapreduceLogFormat
- if query.Table == "" {
- parserName = "generic"
- }
- default:
- parserName = query.LogFormat
- }
+ parserName := resolveParserName(query, defaultLogFormat)
dlog.Server.Info("Creating turbo log format parser",
"parserName", parserName,
diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go
index ec1d6a3..f556f50 100644
--- a/internal/mapr/server/turbo_aggregate_test.go
+++ b/internal/mapr/server/turbo_aggregate_test.go
@@ -25,7 +25,7 @@ func TestTurboAggregateVsRegular(t *testing.T) {
if config.Server == nil {
config.Server = &config.ServerConfig{
MapreduceLogFormat: "default",
- TurboBoostDisable: false,
+ TurboBoostDisable: false,
}
}
if dlog.Server == nil {
@@ -35,10 +35,10 @@ func TestTurboAggregateVsRegular(t *testing.T) {
wg.Add(1)
dlog.Start(ctx, &wg, source.Server)
}
-
+
// Test query
queryStr := `from STATS select count($time),$time,avg($goroutines) from - group by $time order by $time`
-
+
// Test data - DTail MapReduce format
testLines := []string{
"INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1",
@@ -47,23 +47,23 @@ func TestTurboAggregateVsRegular(t *testing.T) {
"INFO|1002-071147|1|stats.go:56|8|10|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1",
"INFO|1002-071147|1|stats.go:56|8|11|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1",
}
-
+
t.Run("TurboAggregate", func(t *testing.T) {
// Create turbo aggregate
- turboAgg, err := NewTurboAggregate(queryStr)
+ turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat)
if err != nil {
t.Fatalf("Failed to create turbo aggregate: %v", err)
}
-
+
// Channel to collect messages
messages := make(chan string, 100)
// Use a cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
-
+
// Start the turbo aggregate
turboAgg.Start(ctx, messages)
-
+
// Process lines
processor := NewTurboAggregateProcessor(turboAgg, "test")
for i, line := range testLines {
@@ -73,25 +73,25 @@ func TestTurboAggregateVsRegular(t *testing.T) {
t.Errorf("Failed to process line %d: %v", i+1, err)
}
}
-
+
// Flush to ensure all data is processed
err = processor.Flush()
if err != nil {
t.Errorf("Failed to flush: %v", err)
}
-
+
// Close the processor to decrement activeProcessors
err = processor.Close()
if err != nil {
t.Errorf("Failed to close processor: %v", err)
}
-
+
// Shutdown and get results
turboAgg.Shutdown()
-
+
// Cancel context to stop background goroutines
cancel()
-
+
// Collect results with timeout
done := make(chan struct{})
var results []string
@@ -101,11 +101,11 @@ func TestTurboAggregateVsRegular(t *testing.T) {
}
close(done)
}()
-
+
// Wait a bit for serialization
time.Sleep(200 * time.Millisecond)
close(messages)
-
+
// Wait for collection to complete with timeout
select {
case <-done:
@@ -113,36 +113,36 @@ func TestTurboAggregateVsRegular(t *testing.T) {
case <-time.After(2 * time.Second):
t.Error("Timeout collecting messages")
}
-
+
t.Logf("Turbo mode processed %d lines", turboAgg.linesProcessed.Load())
t.Logf("Turbo mode results: %d messages", len(results))
for _, r := range results {
t.Logf("Result: %s", r)
}
-
+
// Verify we got results
if len(results) == 0 {
t.Error("Turbo mode produced no results")
}
-
+
// Check line count
if turboAgg.linesProcessed.Load() != uint64(len(testLines)) {
t.Errorf("Expected %d lines processed, got %d", len(testLines), turboAgg.linesProcessed.Load())
}
})
-
+
t.Run("RegularAggregate", func(t *testing.T) {
// Create regular aggregate
- regularAgg, err := NewAggregate(queryStr)
+ regularAgg, err := NewAggregate(queryStr, config.Server.MapreduceLogFormat)
if err != nil {
t.Fatalf("Failed to create regular aggregate: %v", err)
}
-
+
// Channel to collect messages
messages := make(chan string, 100)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
-
+
// Start the regular aggregate in a goroutine
var wg sync.WaitGroup
wg.Add(1)
@@ -150,14 +150,14 @@ func TestTurboAggregateVsRegular(t *testing.T) {
defer wg.Done()
regularAgg.Start(ctx, messages)
}()
-
+
// Give it time to start
time.Sleep(50 * time.Millisecond)
-
+
// Create line channel
lines := make(chan *line.Line, 100)
regularAgg.NextLinesCh <- lines
-
+
// Process lines
for _, lineStr := range testLines {
l := &line.Line{
@@ -167,30 +167,30 @@ func TestTurboAggregateVsRegular(t *testing.T) {
lines <- l
}
close(lines)
-
+
// Wait for processing
time.Sleep(100 * time.Millisecond)
-
+
// Shutdown
regularAgg.Shutdown()
cancel()
-
+
// Wait for the Start goroutine to finish
wg.Wait()
-
+
// Collect results
close(messages)
-
+
var results []string
for msg := range messages {
results = append(results, msg)
}
-
+
t.Logf("Regular mode results: %d messages", len(results))
for _, r := range results {
t.Logf("Result: %s", r)
}
-
+
// Verify we got results
if len(results) == 0 {
t.Error("Regular mode produced no results")
@@ -210,7 +210,7 @@ func TestTurboAggregateConcurrency(t *testing.T) {
if config.Server == nil {
config.Server = &config.ServerConfig{
MapreduceLogFormat: "default",
- TurboBoostDisable: false,
+ TurboBoostDisable: false,
}
}
if dlog.Server == nil {
@@ -220,81 +220,81 @@ func TestTurboAggregateConcurrency(t *testing.T) {
wg.Add(1)
dlog.Start(ctx, &wg, source.Server)
}
-
+
queryStr := `from STATS select count($time),$time from - group by $time`
-
+
// Create turbo aggregate
- turboAgg, err := NewTurboAggregate(queryStr)
+ turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat)
if err != nil {
t.Fatalf("Failed to create turbo aggregate: %v", err)
}
-
+
// Channel to collect messages
messages := make(chan string, 1000)
ctx := context.Background()
-
+
// Start the turbo aggregate
turboAgg.Start(ctx, messages)
-
+
// Process multiple "files" concurrently
var wg sync.WaitGroup
numFiles := 10
linesPerFile := 100
-
+
for f := 0; f < numFiles; f++ {
wg.Add(1)
go func(fileNum int) {
defer wg.Done()
-
+
processor := NewTurboAggregateProcessor(turboAgg, "file"+string(rune(fileNum)))
-
+
// Process lines
for i := 0; i < linesPerFile; i++ {
line := "INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1"
buf := bytes.NewBufferString(line)
_ = processor.ProcessLine(buf, uint64(i+1), "file"+string(rune(fileNum)))
}
-
+
// Flush when file completes
_ = processor.Flush()
-
+
// Close the processor to decrement activeProcessors
_ = processor.Close()
}(f)
}
-
+
// Wait for all files to complete
wg.Wait()
-
+
// Shutdown and get results
turboAgg.Shutdown()
-
+
// Collect results
time.Sleep(200 * time.Millisecond)
close(messages)
-
+
var results []string
for msg := range messages {
if strings.Contains(msg, "1002-071143") {
results = append(results, msg)
}
}
-
+
t.Logf("Processed %d lines total", turboAgg.linesProcessed.Load())
t.Logf("Processed %d files", turboAgg.filesProcessed.Load())
t.Logf("Got %d result messages", len(results))
-
+
// Verify line count
expectedLines := uint64(numFiles * linesPerFile)
if turboAgg.linesProcessed.Load() != expectedLines {
t.Errorf("Expected %d lines processed, got %d", expectedLines, turboAgg.linesProcessed.Load())
}
-
+
// Verify file count (may be higher if test was run multiple times)
if turboAgg.filesProcessed.Load() < uint64(numFiles) {
t.Errorf("Expected at least %d files processed, got %d", numFiles, turboAgg.filesProcessed.Load())
}
-
+
// Parse result to check count
foundExpectedCount := false
for _, result := range results {
@@ -306,8 +306,8 @@ func TestTurboAggregateConcurrency(t *testing.T) {
break
}
}
-
+
if !foundExpectedCount {
t.Error("Did not find expected count of 1000 in results")
}
-} \ No newline at end of file
+}
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index 36d9ef3..920c8dd 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -21,13 +21,17 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
+ defaultLogFormat := ""
+ if serverHandler.serverCfg != nil {
+ defaultLogFormat = serverHandler.serverCfg.MapreduceLogFormat
+ }
// If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate
// Turbo boost is enabled by default and is a server-side optimization
dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless)
if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless {
dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
- turboAggregate, err := server.NewTurboAggregate(queryStr)
+ turboAggregate, err := server.NewTurboAggregate(queryStr, defaultLogFormat)
if err != nil {
return m, nil, nil, err
}
@@ -36,7 +40,7 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
}
// Otherwise, create a regular Aggregate
- aggregate, err := server.NewAggregate(queryStr)
+ aggregate, err := server.NewAggregate(queryStr, defaultLogFormat)
if err != nil {
return m, nil, nil, err
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index c03900f..493f4b7 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -204,13 +204,13 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
switch r.mode {
case omode.GrepClient, omode.CatClient:
- catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel())
+ catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
reader = &catFile
limiter = r.server.CatLimiter()
case omode.TailClient:
fallthrough
default:
- tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel())
+ tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
reader = &tailFile
limiter = r.server.TailLimiter()
}
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 6d7a095..d073682 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -55,6 +55,7 @@ type readCommandTurbo interface {
type readCommandTiming interface {
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
+ MaxLineLength() int
AggregateLinesChannelBufferSize() int
TurboDataTransmissionDelay() time.Duration
TurboEOFWaitDuration(fileCount int) time.Duration
@@ -205,6 +206,11 @@ func (h *ServerHandler) ReadRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
}
+// MaxLineLength returns the configured max line length for file readers.
+func (h *ServerHandler) MaxLineLength() int {
+ return positiveIntOrDefault(h.serverCfg.MaxLineLength, 1024*1024)
+}
+
// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size.
func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
diff --git a/internal/server/server.go b/internal/server/server.go
index 72094ef..a788ba0 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -79,7 +79,7 @@ func New(cfg config.RuntimeConfig) *Server {
s.authKeyStore,
)
- private, err := gossh.ParsePrivateKey(server.PrivateHostKey())
+ private, err := gossh.ParsePrivateKey(server.PrivateHostKey(cfg.Server.HostKeyFile, cfg.Server.HostKeyBits))
if err != nil {
dlog.Server.FatalPanic(err)
}
@@ -165,7 +165,7 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn,
newChannel gossh.NewChannel) {
- user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String())
+ user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String(), s.cfg.Server.UserPermissions)
if err != nil {
dlog.Server.Error(user, err)
if err := newChannel.Reject(gossh.Prohibited, err.Error()); err != nil {
@@ -290,7 +290,7 @@ func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn,
func (s *Server) Callback(c gossh.ConnMetadata,
authPayload []byte) (*gossh.Permissions, error) {
- user, err := user.New(c.User(), c.RemoteAddr().String())
+ user, err := user.New(c.User(), c.RemoteAddr().String(), s.cfg.Server.UserPermissions)
if err != nil {
return nil, err
}
diff --git a/internal/ssh/server/hostkey.go b/internal/ssh/server/hostkey.go
index b2d4569..809a870 100644
--- a/internal/ssh/server/hostkey.go
+++ b/internal/ssh/server/hostkey.go
@@ -8,9 +8,19 @@ import (
"github.com/mimecast/dtail/internal/ssh"
)
+const (
+ defaultHostKeyBits = 4096
+ defaultHostKeyFile = "./cache/ssh_host_key"
+)
+
// PrivateHostKey retrieves the private server RSA host key.
-func PrivateHostKey() []byte {
- hostKeyFile := config.Server.HostKeyFile
+func PrivateHostKey(hostKeyFile string, hostKeyBits int) []byte {
+ if hostKeyFile == "" {
+ hostKeyFile = defaultHostKeyFile
+ }
+ if hostKeyBits <= 0 {
+ hostKeyBits = defaultHostKeyBits
+ }
if config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
hostKeyFile = "./ssh_host_key"
}
@@ -18,7 +28,7 @@ func PrivateHostKey() []byte {
if os.IsNotExist(err) {
dlog.Server.Info("Generating private server RSA host key")
- privateKey, err := ssh.GeneratePrivateRSAKey(config.Server.HostKeyBits)
+ privateKey, err := ssh.GeneratePrivateRSAKey(hostKeyBits)
if err != nil {
dlog.Server.FatalPanic("Failed to generate private server RSA host key", err)
diff --git a/internal/ssh/server/publickeycallback.go b/internal/ssh/server/publickeycallback.go
index ccf9111..d4e328b 100644
--- a/internal/ssh/server/publickeycallback.go
+++ b/internal/ssh/server/publickeycallback.go
@@ -12,19 +12,6 @@ import (
gossh "golang.org/x/crypto/ssh"
)
-// PublicKeyCallback is for the server to check whether a public SSH key is
-// authorized ot not.
-func PublicKeyCallback(c gossh.ConnMetadata,
- offeredPubKey gossh.PublicKey) (*gossh.Permissions, error) {
-
- authKeyEnabled := config.Server != nil && config.Server.AuthKeyEnabled
- cacheDir := ""
- if config.Common != nil {
- cacheDir = config.Common.CacheDir
- }
- return publicKeyCallback(c, offeredPubKey, authKeyEnabled, cacheDir, authKeyStore)
-}
-
// NewPublicKeyCallback creates an instance-scoped SSH public key callback.
// It avoids relying on package-level mutable configuration/state.
func NewPublicKeyCallback(authKeyEnabled bool, cacheDir string,
@@ -41,7 +28,7 @@ func NewPublicKeyCallback(authKeyEnabled bool, cacheDir string,
func publicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey,
authKeyEnabled bool, cacheDir string, keyStore *AuthKeyStore) (*gossh.Permissions, error) {
- user, err := user.New(c.User(), c.RemoteAddr().String())
+ user, err := user.New(c.User(), c.RemoteAddr().String(), nil)
if err != nil {
return nil, err
}
diff --git a/internal/user/server/user.go b/internal/user/server/user.go
index d391672..332ea96 100644
--- a/internal/user/server/user.go
+++ b/internal/user/server/user.go
@@ -24,11 +24,20 @@ type User struct {
permissions []string
}
+// PermissionLookup resolves permissions for a given SSH user.
+type PermissionLookup func(string) ([]string, error)
+
// New returns a new user.
-func New(name, remoteAddress string) (*User, error) {
- permissions, err := config.ServerUserPermissions(name)
- if err != nil {
- return nil, err
+func New(name, remoteAddress string, permissionLookup PermissionLookup) (*User, error) {
+ var (
+ permissions []string
+ err error
+ )
+ if permissionLookup != nil {
+ permissions, err = permissionLookup(name)
+ if err != nil {
+ return nil, err
+ }
}
return &User{
Name: name,