diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-26 11:26:53 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-07 13:31:15 +0000 |
| commit | 57825a5b6dd8e37dd00bfff1dcd81f807f5630e5 (patch) | |
| tree | f06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/server/handlers/readcommand.go | |
| parent | dfa5306339b1a5b166d0e66b5cc8051487f4fea2 (diff) | |
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go new file mode 100644 index 0000000..e4079e8 --- /dev/null +++ b/internal/server/handlers/readcommand.go @@ -0,0 +1,158 @@ +package handlers + +import ( + "context" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/mimecast/dtail/internal/io/fs" + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/omode" +) + +type readCommand struct { + server *ServerHandler + mode omode.Mode +} + +func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { + return &readCommand{ + server: server, + mode: mode, + } +} + +func (r *readCommand) Start(ctx context.Context, argc int, args []string) { + regex := "." + if argc >= 4 { + regex = args[3] + } + if argc < 3 { + r.server.sendServerMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) + return + } + r.readGlob(ctx, args[1], regex) +} + +func (r *readCommand) readGlob(ctx context.Context, glob string, regex string) { + retryInterval := time.Second * 5 + glob = filepath.Clean(glob) + + maxRetries := 10 + for { + maxRetries-- + if maxRetries < 0 { + r.server.sendServerMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) + return + } + + paths, err := filepath.Glob(glob) + if err != nil { + logger.Warn(r.server.user, glob, err) + time.Sleep(retryInterval) + continue + } + + if numPaths := len(paths); numPaths == 0 { + logger.Error(r.server.user, "No such file(s) to read", glob) + r.server.sendServerMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + select { + case <-ctx.Done(): + return + default: + } + time.Sleep(retryInterval) + continue + } + + r.readFiles(ctx, paths, glob, regex, retryInterval) + break + } +} + +func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, regex string, retryInterval time.Duration) { + var wg sync.WaitGroup + wg.Add(len(paths)) + + for _, path := range paths { + go r.readFileIfPermissions(ctx, &wg, path, glob, regex) + } + + wg.Wait() +} + +func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob, regex string) { + defer wg.Done() + globID := r.makeGlobID(path, glob) + + if !r.server.user.HasFilePermission(path, "readfiles") { + logger.Error(r.server.user, "No permission to read file", path, globID) + r.server.sendServerMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + return + } + + r.readFile(ctx, path, globID, regex) +} + +func (r *readCommand) readFile(ctx context.Context, path, globID, regex string) { + logger.Info(r.server.user, "Start reading file", path, globID) + + var reader fs.FileReader + switch r.mode { + case omode.TailClient: + reader = fs.NewTailFile(path, globID, r.server.serverMessages, r.server.tailLimiter) + case omode.GrepClient, omode.CatClient: + reader = fs.NewCatFile(path, globID, r.server.serverMessages, r.server.catLimiter) + default: + reader = fs.NewTailFile(path, globID, r.server.serverMessages, r.server.tailLimiter) + } + + lines := r.server.lines + + // Plug in mappreduce engine + if r.server.aggregate != nil { + lines = r.server.aggregate.Lines + } + + for { + if err := reader.Start(ctx, lines, regex); err != nil { + logger.Error(r.server.user, path, globID, err) + } + + select { + case <-ctx.Done(): + return + default: + if !reader.Retry() { + return + } + } + + time.Sleep(time.Second * 2) + logger.Info(path, globID, "Reading file again") + } +} + +func (r *readCommand) makeGlobID(path, glob string) string { + var idParts []string + pathParts := strings.Split(path, "/") + + for i, globPart := range strings.Split(glob, "/") { + if strings.Contains(globPart, "*") { + idParts = append(idParts, pathParts[i]) + } + } + + if len(idParts) > 0 { + return strings.Join(idParts, "/") + } + + if len(pathParts) > 0 { + return pathParts[len(pathParts)-1] + } + + r.server.sendServerMessage(logger.Error("Empty file path given?", path, glob)) + return "" +} |
