diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
| commit | f6e23930da2900c43a5389a2e7d1e38d8221a76f (patch) | |
| tree | 3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal | |
| parent | 1fc24f9affed5128702e4de80572cac8c82d399e (diff) | |
Refactor server-side config singleton reads
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/connectors/serverless.go | 6 | ||||
| -rw-r--r-- | internal/config/server.go | 20 | ||||
| -rw-r--r-- | internal/io/fs/catfile.go | 5 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 19 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 3 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 7 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_test.go | 39 | ||||
| -rw-r--r-- | internal/io/fs/tailfile.go | 5 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 13 | ||||
| -rw-r--r-- | internal/mapr/server/parsername.go | 18 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 13 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate_test.go | 108 | ||||
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 8 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 6 | ||||
| -rw-r--r-- | internal/server/server.go | 6 | ||||
| -rw-r--r-- | internal/ssh/server/hostkey.go | 16 | ||||
| -rw-r--r-- | internal/ssh/server/publickeycallback.go | 15 | ||||
| -rw-r--r-- | internal/user/server/user.go | 17 |
19 files changed, 188 insertions, 140 deletions
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index f4c4e9e..cedf37f 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -60,7 +60,11 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error { dlog.Client.Debug("Creating server handler for a serverless session") - user, err := user.New(s.userName, s.Server()) + var permissionLookup user.PermissionLookup + if config.Server != nil { + permissionLookup = config.Server.UserPermissions + } + user, err := user.New(s.userName, s.Server(), permissionLookup) if err != nil { return err } diff --git a/internal/config/server.go b/internal/config/server.go index 13ebde8..97c9a9d 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -145,10 +145,14 @@ func newDefaultServerConfig() *ServerConfig { } } -// ServerUserPermissions retrieves the permission set of a given user. -func ServerUserPermissions(userName string) (permissions []string, err error) { - permissions = Server.Permissions.Default - if p, ok := Server.Permissions.Users[userName]; ok { +// UserPermissions retrieves the permission set of a given user. +func (c *ServerConfig) UserPermissions(userName string) (permissions []string, err error) { + if c == nil { + return nil, errors.New("missing server config") + } + + permissions = c.Permissions.Default + if p, ok := c.Permissions.Users[userName]; ok { permissions = p } if len(permissions) == 0 { @@ -156,3 +160,11 @@ func ServerUserPermissions(userName string) (permissions []string, err error) { } return } + +// ServerUserPermissions retrieves the permission set of a given user. +func ServerUserPermissions(userName string) (permissions []string, err error) { + if Server == nil { + return nil, errors.New("missing server config") + } + return Server.UserPermissions(userName) +} diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index e4676f3..1f35a95 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,7 +6,9 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile { +func NewCatFile(filePath string, globID string, serverMessages chan<- string, + maxLineLength int) CatFile { + return CatFile{ readFile: readFile{ filePath: filePath, @@ -15,6 +17,7 @@ func NewCatFile(filePath string, globID string, serverMessages chan<- string) Ca retry: false, canSkipLines: false, seekEOF: false, + maxLineLength: maxLineLength, }, } } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index ee486bc..47a999d 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -26,9 +25,10 @@ import ( type readStatus int const ( - nothing readStatus = iota - abortReading readStatus = iota - continueReading readStatus = iota + nothing readStatus = iota + abortReading readStatus = iota + continueReading readStatus = iota + defaultMaxLineLength = 1024 * 1024 ) // Used to tail and filter a local log file. @@ -49,6 +49,8 @@ type readFile struct { seekEOF bool // Warned already about a long line. warnedAboutLongLine bool + // Maximum line length before a line is split. + maxLineLength int } // String returns the string representation of the readFile @@ -72,6 +74,13 @@ func (f readFile) Retry() bool { return f.retry } +func (f *readFile) lineLimit() int { + if f.maxLineLength <= 0 { + return defaultMaxLineLength + } + return f.maxLineLength +} + // Start tailing a log file. func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error { @@ -328,7 +337,7 @@ func (f *readFile) handleReadByte(ctx context.Context, b byte, return abortReading, message } default: - if message.Len() >= config.Server.MaxLineLength { + if message.Len() >= f.lineLimit() { if !f.warnedAboutLongLine { f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + "\n" diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index c68048d..8f56bdd 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -8,7 +8,6 @@ import ( "os" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -112,7 +111,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, return continueReading default: - if message.Len() >= config.Server.MaxLineLength { + if message.Len() >= f.lineLimit() { if !f.warnedAboutLongLine { f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + "\n" diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 6447f89..2e880e7 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -8,7 +8,6 @@ import ( "os" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -106,7 +105,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in return 0, nil, nil } - maxLineLen := config.Server.MaxLineLength + maxLineLen := f.lineLimit() // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { @@ -154,7 +153,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, return 0, nil, nil } - maxLineLen := config.Server.MaxLineLength + maxLineLen := f.lineLimit() // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { @@ -312,7 +311,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, partialLine.Write(data) // Check if line is too long - if partialLine.Len() >= config.Server.MaxLineLength { + if partialLine.Len() >= f.lineLimit() { if !f.warnedAboutLongLine { f.serverMessages <- dlog.Common.Warn(f.filePath, "Long log line, splitting into multiple lines") + "\n" diff --git a/internal/io/fs/readfile_processor_test.go b/internal/io/fs/readfile_processor_test.go index 2e3cb80..ae34d77 100644 --- a/internal/io/fs/readfile_processor_test.go +++ b/internal/io/fs/readfile_processor_test.go @@ -9,7 +9,6 @@ import ( "reflect" "testing" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" @@ -41,12 +40,10 @@ func (p *captureProcessor) Close() error { } func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) { - setServerConfigForProcessorTests(t) - filePath := writeProcessorTestFile(t, "alpha\nbeta\n") re := regex.NewNoop() - cat := NewCatFile(filePath, "glob-id", make(chan string, 1)) + cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength) processor := &captureProcessor{} if err := cat.readFile.StartWithProcessorOptimized( @@ -65,8 +62,6 @@ func TestStartWithProcessorOptimizedReadsAllLines(t *testing.T) { } func TestProcessorVariantsReturnOpenError(t *testing.T) { - setServerConfigForProcessorTests(t) - re := regex.NewNoop() missingFile := filepath.Join(t.TempDir(), "missing.log") @@ -90,7 +85,7 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cat := NewCatFile(missingFile, "glob-id", make(chan string, 1)) + cat := NewCatFile(missingFile, "glob-id", make(chan string, 1), defaultMaxLineLength) err := tt.start(&cat.readFile, context.Background(), lcontext.LContext{}, &captureProcessor{}, re) if err == nil { t.Fatalf("expected error for missing file") @@ -100,13 +95,11 @@ func TestProcessorVariantsReturnOpenError(t *testing.T) { } func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) { - setServerConfigForProcessorTests(t) - filePath := writeProcessorTestFile(t, "alpha\nbeta\n") re := regex.NewNoop() expectedErr := errors.New("processor failure") - cat := NewCatFile(filePath, "glob-id", make(chan string, 1)) + cat := NewCatFile(filePath, "glob-id", make(chan string, 1), defaultMaxLineLength) processor := &captureProcessor{ errAtLine: 1, processErr: expectedErr, @@ -123,16 +116,26 @@ func TestStartWithProcessorOptimizedPropagatesProcessError(t *testing.T) { } } -func setServerConfigForProcessorTests(t *testing.T) { - t.Helper() +func TestStartWithProcessorOptimizedUsesInjectedMaxLineLength(t *testing.T) { + filePath := writeProcessorTestFile(t, "abcdef\n") + re := regex.NewNoop() + + cat := NewCatFile(filePath, "glob-id", make(chan string, 1), 3) + processor := &captureProcessor{} + + if err := cat.readFile.StartWithProcessorOptimized( + context.Background(), + lcontext.LContext{}, + processor, + re, + ); err != nil { + t.Fatalf("optimized reader start failed: %v", err) + } - previousServer := config.Server - config.Server = &config.ServerConfig{ - MaxLineLength: 1024 * 1024, + want := []string{"abc", "def\n"} + if !reflect.DeepEqual(processor.lines, want) { + t.Fatalf("unexpected processed lines: got=%v want=%v", processor.lines, want) } - t.Cleanup(func() { - config.Server = previousServer - }) } func writeProcessorTestFile(t *testing.T, content string) string { diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index 7a40ac4..b2e9910 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,7 +6,9 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile { +func NewTailFile(filePath string, globID string, serverMessages chan<- string, + maxLineLength int) TailFile { + return TailFile{ readFile: readFile{ filePath: filePath, @@ -15,6 +17,7 @@ func NewTailFile(filePath string, globID string, serverMessages chan<- string) T retry: true, canSkipLines: true, seekEOF: true, + maxLineLength: maxLineLength, }, } } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 98fe817..9a736a5 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -34,7 +34,7 @@ type Aggregate struct { } // NewAggregate return a new server side aggregator. -func NewAggregate(queryStr string) (*Aggregate, error) { +func NewAggregate(queryStr string, defaultLogFormat string) (*Aggregate, error) { query, err := mapr.NewQuery(queryStr) if err != nil { return nil, err @@ -46,16 +46,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } s := strings.Split(fqdn, ".") - var parserName string - switch query.LogFormat { - case "": - parserName = config.Server.MapreduceLogFormat - if query.Table == "" { - parserName = "generic" - } - default: - parserName = query.LogFormat - } + parserName := resolveParserName(query, defaultLogFormat) dlog.Server.Info("Creating log format parser", parserName) logParser, err := logformat.NewParser(parserName, query) diff --git a/internal/mapr/server/parsername.go b/internal/mapr/server/parsername.go new file mode 100644 index 0000000..c5fb866 --- /dev/null +++ b/internal/mapr/server/parsername.go @@ -0,0 +1,18 @@ +package server + +import "github.com/mimecast/dtail/internal/mapr" + +const defaultLogFormat = "default" + +func resolveParserName(query *mapr.Query, configuredLogFormat string) string { + if query.LogFormat != "" { + return query.LogFormat + } + if query.Table == "" { + return "generic" + } + if configuredLogFormat == "" { + return defaultLogFormat + } + return configuredLogFormat +} diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go index a317578..9b5afe7 100644 --- a/internal/mapr/server/turbo_aggregate.go +++ b/internal/mapr/server/turbo_aggregate.go @@ -57,7 +57,7 @@ type rawLine struct { } // NewTurboAggregate returns a new turbo mode aggregator. -func NewTurboAggregate(queryStr string) (*TurboAggregate, error) { +func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) { query, err := mapr.NewQuery(queryStr) if err != nil { return nil, err @@ -69,16 +69,7 @@ func NewTurboAggregate(queryStr string) (*TurboAggregate, error) { } s := strings.Split(fqdn, ".") - var parserName string - switch query.LogFormat { - case "": - parserName = config.Server.MapreduceLogFormat - if query.Table == "" { - parserName = "generic" - } - default: - parserName = query.LogFormat - } + parserName := resolveParserName(query, defaultLogFormat) dlog.Server.Info("Creating turbo log format parser", "parserName", parserName, diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go index ec1d6a3..f556f50 100644 --- a/internal/mapr/server/turbo_aggregate_test.go +++ b/internal/mapr/server/turbo_aggregate_test.go @@ -25,7 +25,7 @@ func TestTurboAggregateVsRegular(t *testing.T) { if config.Server == nil { config.Server = &config.ServerConfig{ MapreduceLogFormat: "default", - TurboBoostDisable: false, + TurboBoostDisable: false, } } if dlog.Server == nil { @@ -35,10 +35,10 @@ func TestTurboAggregateVsRegular(t *testing.T) { wg.Add(1) dlog.Start(ctx, &wg, source.Server) } - + // Test query queryStr := `from STATS select count($time),$time,avg($goroutines) from - group by $time order by $time` - + // Test data - DTail MapReduce format testLines := []string{ "INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1", @@ -47,23 +47,23 @@ func TestTurboAggregateVsRegular(t *testing.T) { "INFO|1002-071147|1|stats.go:56|8|10|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1", "INFO|1002-071147|1|stats.go:56|8|11|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1", } - + t.Run("TurboAggregate", func(t *testing.T) { // Create turbo aggregate - turboAgg, err := NewTurboAggregate(queryStr) + turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat) if err != nil { t.Fatalf("Failed to create turbo aggregate: %v", err) } - + // Channel to collect messages messages := make(chan string, 100) // Use a cancellable context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - + // Start the turbo aggregate turboAgg.Start(ctx, messages) - + // Process lines processor := NewTurboAggregateProcessor(turboAgg, "test") for i, line := range testLines { @@ -73,25 +73,25 @@ func TestTurboAggregateVsRegular(t *testing.T) { t.Errorf("Failed to process line %d: %v", i+1, err) } } - + // Flush to ensure all data is processed err = processor.Flush() if err != nil { t.Errorf("Failed to flush: %v", err) } - + // Close the processor to decrement activeProcessors err = processor.Close() if err != nil { t.Errorf("Failed to close processor: %v", err) } - + // Shutdown and get results turboAgg.Shutdown() - + // Cancel context to stop background goroutines cancel() - + // Collect results with timeout done := make(chan struct{}) var results []string @@ -101,11 +101,11 @@ func TestTurboAggregateVsRegular(t *testing.T) { } close(done) }() - + // Wait a bit for serialization time.Sleep(200 * time.Millisecond) close(messages) - + // Wait for collection to complete with timeout select { case <-done: @@ -113,36 +113,36 @@ func TestTurboAggregateVsRegular(t *testing.T) { case <-time.After(2 * time.Second): t.Error("Timeout collecting messages") } - + t.Logf("Turbo mode processed %d lines", turboAgg.linesProcessed.Load()) t.Logf("Turbo mode results: %d messages", len(results)) for _, r := range results { t.Logf("Result: %s", r) } - + // Verify we got results if len(results) == 0 { t.Error("Turbo mode produced no results") } - + // Check line count if turboAgg.linesProcessed.Load() != uint64(len(testLines)) { t.Errorf("Expected %d lines processed, got %d", len(testLines), turboAgg.linesProcessed.Load()) } }) - + t.Run("RegularAggregate", func(t *testing.T) { // Create regular aggregate - regularAgg, err := NewAggregate(queryStr) + regularAgg, err := NewAggregate(queryStr, config.Server.MapreduceLogFormat) if err != nil { t.Fatalf("Failed to create regular aggregate: %v", err) } - + // Channel to collect messages messages := make(chan string, 100) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - + // Start the regular aggregate in a goroutine var wg sync.WaitGroup wg.Add(1) @@ -150,14 +150,14 @@ func TestTurboAggregateVsRegular(t *testing.T) { defer wg.Done() regularAgg.Start(ctx, messages) }() - + // Give it time to start time.Sleep(50 * time.Millisecond) - + // Create line channel lines := make(chan *line.Line, 100) regularAgg.NextLinesCh <- lines - + // Process lines for _, lineStr := range testLines { l := &line.Line{ @@ -167,30 +167,30 @@ func TestTurboAggregateVsRegular(t *testing.T) { lines <- l } close(lines) - + // Wait for processing time.Sleep(100 * time.Millisecond) - + // Shutdown regularAgg.Shutdown() cancel() - + // Wait for the Start goroutine to finish wg.Wait() - + // Collect results close(messages) - + var results []string for msg := range messages { results = append(results, msg) } - + t.Logf("Regular mode results: %d messages", len(results)) for _, r := range results { t.Logf("Result: %s", r) } - + // Verify we got results if len(results) == 0 { t.Error("Regular mode produced no results") @@ -210,7 +210,7 @@ func TestTurboAggregateConcurrency(t *testing.T) { if config.Server == nil { config.Server = &config.ServerConfig{ MapreduceLogFormat: "default", - TurboBoostDisable: false, + TurboBoostDisable: false, } } if dlog.Server == nil { @@ -220,81 +220,81 @@ func TestTurboAggregateConcurrency(t *testing.T) { wg.Add(1) dlog.Start(ctx, &wg, source.Server) } - + queryStr := `from STATS select count($time),$time from - group by $time` - + // Create turbo aggregate - turboAgg, err := NewTurboAggregate(queryStr) + turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat) if err != nil { t.Fatalf("Failed to create turbo aggregate: %v", err) } - + // Channel to collect messages messages := make(chan string, 1000) ctx := context.Background() - + // Start the turbo aggregate turboAgg.Start(ctx, messages) - + // Process multiple "files" concurrently var wg sync.WaitGroup numFiles := 10 linesPerFile := 100 - + for f := 0; f < numFiles; f++ { wg.Add(1) go func(fileNum int) { defer wg.Done() - + processor := NewTurboAggregateProcessor(turboAgg, "file"+string(rune(fileNum))) - + // Process lines for i := 0; i < linesPerFile; i++ { line := "INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1" buf := bytes.NewBufferString(line) _ = processor.ProcessLine(buf, uint64(i+1), "file"+string(rune(fileNum))) } - + // Flush when file completes _ = processor.Flush() - + // Close the processor to decrement activeProcessors _ = processor.Close() }(f) } - + // Wait for all files to complete wg.Wait() - + // Shutdown and get results turboAgg.Shutdown() - + // Collect results time.Sleep(200 * time.Millisecond) close(messages) - + var results []string for msg := range messages { if strings.Contains(msg, "1002-071143") { results = append(results, msg) } } - + t.Logf("Processed %d lines total", turboAgg.linesProcessed.Load()) t.Logf("Processed %d files", turboAgg.filesProcessed.Load()) t.Logf("Got %d result messages", len(results)) - + // Verify line count expectedLines := uint64(numFiles * linesPerFile) if turboAgg.linesProcessed.Load() != expectedLines { t.Errorf("Expected %d lines processed, got %d", expectedLines, turboAgg.linesProcessed.Load()) } - + // Verify file count (may be higher if test was run multiple times) if turboAgg.filesProcessed.Load() < uint64(numFiles) { t.Errorf("Expected at least %d files processed, got %d", numFiles, turboAgg.filesProcessed.Load()) } - + // Parse result to check count foundExpectedCount := false for _, result := range results { @@ -306,8 +306,8 @@ func TestTurboAggregateConcurrency(t *testing.T) { break } } - + if !foundExpectedCount { t.Error("Did not find expected count of 1000 in results") } -}
\ No newline at end of file +} diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index 36d9ef3..920c8dd 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -21,13 +21,17 @@ func newMapCommand(serverHandler *ServerHandler, argc int, m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") + defaultLogFormat := "" + if serverHandler.serverCfg != nil { + defaultLogFormat = serverHandler.serverCfg.MapreduceLogFormat + } // If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate // Turbo boost is enabled by default and is a server-side optimization dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless) if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless { dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) - turboAggregate, err := server.NewTurboAggregate(queryStr) + turboAggregate, err := server.NewTurboAggregate(queryStr, defaultLogFormat) if err != nil { return m, nil, nil, err } @@ -36,7 +40,7 @@ func newMapCommand(serverHandler *ServerHandler, argc int, } // Otherwise, create a regular Aggregate - aggregate, err := server.NewAggregate(queryStr) + aggregate, err := server.NewAggregate(queryStr, defaultLogFormat) if err != nil { return m, nil, nil, err } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index c03900f..493f4b7 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -204,13 +204,13 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, switch r.mode { case omode.GrepClient, omode.CatClient: - catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel()) + catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) reader = &catFile limiter = r.server.CatLimiter() case omode.TailClient: fallthrough default: - tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel()) + tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) reader = &tailFile limiter = r.server.TailLimiter() } diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 6d7a095..d073682 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -55,6 +55,7 @@ type readCommandTurbo interface { type readCommandTiming interface { ReadGlobRetryInterval() time.Duration ReadRetryInterval() time.Duration + MaxLineLength() int AggregateLinesChannelBufferSize() int TurboDataTransmissionDelay() time.Duration TurboEOFWaitDuration(fileCount int) time.Duration @@ -205,6 +206,11 @@ func (h *ServerHandler) ReadRetryInterval() time.Duration { return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second) } +// MaxLineLength returns the configured max line length for file readers. +func (h *ServerHandler) MaxLineLength() int { + return positiveIntOrDefault(h.serverCfg.MaxLineLength, 1024*1024) +} + // AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size. func (h *ServerHandler) AggregateLinesChannelBufferSize() int { return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000) diff --git a/internal/server/server.go b/internal/server/server.go index 72094ef..a788ba0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -79,7 +79,7 @@ func New(cfg config.RuntimeConfig) *Server { s.authKeyStore, ) - private, err := gossh.ParsePrivateKey(server.PrivateHostKey()) + private, err := gossh.ParsePrivateKey(server.PrivateHostKey(cfg.Server.HostKeyFile, cfg.Server.HostKeyBits)) if err != nil { dlog.Server.FatalPanic(err) } @@ -165,7 +165,7 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) { func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChannel gossh.NewChannel) { - user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String()) + user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String(), s.cfg.Server.UserPermissions) if err != nil { dlog.Server.Error(user, err) if err := newChannel.Reject(gossh.Prohibited, err.Error()); err != nil { @@ -290,7 +290,7 @@ func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn, func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { - user, err := user.New(c.User(), c.RemoteAddr().String()) + user, err := user.New(c.User(), c.RemoteAddr().String(), s.cfg.Server.UserPermissions) if err != nil { return nil, err } diff --git a/internal/ssh/server/hostkey.go b/internal/ssh/server/hostkey.go index b2d4569..809a870 100644 --- a/internal/ssh/server/hostkey.go +++ b/internal/ssh/server/hostkey.go @@ -8,9 +8,19 @@ import ( "github.com/mimecast/dtail/internal/ssh" ) +const ( + defaultHostKeyBits = 4096 + defaultHostKeyFile = "./cache/ssh_host_key" +) + // PrivateHostKey retrieves the private server RSA host key. -func PrivateHostKey() []byte { - hostKeyFile := config.Server.HostKeyFile +func PrivateHostKey(hostKeyFile string, hostKeyBits int) []byte { + if hostKeyFile == "" { + hostKeyFile = defaultHostKeyFile + } + if hostKeyBits <= 0 { + hostKeyBits = defaultHostKeyBits + } if config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { hostKeyFile = "./ssh_host_key" } @@ -18,7 +28,7 @@ func PrivateHostKey() []byte { if os.IsNotExist(err) { dlog.Server.Info("Generating private server RSA host key") - privateKey, err := ssh.GeneratePrivateRSAKey(config.Server.HostKeyBits) + privateKey, err := ssh.GeneratePrivateRSAKey(hostKeyBits) if err != nil { dlog.Server.FatalPanic("Failed to generate private server RSA host key", err) diff --git a/internal/ssh/server/publickeycallback.go b/internal/ssh/server/publickeycallback.go index ccf9111..d4e328b 100644 --- a/internal/ssh/server/publickeycallback.go +++ b/internal/ssh/server/publickeycallback.go @@ -12,19 +12,6 @@ import ( gossh "golang.org/x/crypto/ssh" ) -// PublicKeyCallback is for the server to check whether a public SSH key is -// authorized ot not. -func PublicKeyCallback(c gossh.ConnMetadata, - offeredPubKey gossh.PublicKey) (*gossh.Permissions, error) { - - authKeyEnabled := config.Server != nil && config.Server.AuthKeyEnabled - cacheDir := "" - if config.Common != nil { - cacheDir = config.Common.CacheDir - } - return publicKeyCallback(c, offeredPubKey, authKeyEnabled, cacheDir, authKeyStore) -} - // NewPublicKeyCallback creates an instance-scoped SSH public key callback. // It avoids relying on package-level mutable configuration/state. func NewPublicKeyCallback(authKeyEnabled bool, cacheDir string, @@ -41,7 +28,7 @@ func NewPublicKeyCallback(authKeyEnabled bool, cacheDir string, func publicKeyCallback(c gossh.ConnMetadata, offeredPubKey gossh.PublicKey, authKeyEnabled bool, cacheDir string, keyStore *AuthKeyStore) (*gossh.Permissions, error) { - user, err := user.New(c.User(), c.RemoteAddr().String()) + user, err := user.New(c.User(), c.RemoteAddr().String(), nil) if err != nil { return nil, err } diff --git a/internal/user/server/user.go b/internal/user/server/user.go index d391672..332ea96 100644 --- a/internal/user/server/user.go +++ b/internal/user/server/user.go @@ -24,11 +24,20 @@ type User struct { permissions []string } +// PermissionLookup resolves permissions for a given SSH user. +type PermissionLookup func(string) ([]string, error) + // New returns a new user. -func New(name, remoteAddress string) (*User, error) { - permissions, err := config.ServerUserPermissions(name) - if err != nil { - return nil, err +func New(name, remoteAddress string, permissionLookup PermissionLookup) (*User, error) { + var ( + permissions []string + err error + ) + if permissionLookup != nil { + permissions, err = permissionLookup(name) + if err != nil { + return nil, err + } } return &User{ Name: name, |
