diff options
| author | Paul Buetow <git@mx.buetow.org> | 2020-11-16 22:11:44 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2020-11-16 22:11:44 +0000 |
| commit | 7df612f527bd5dc2e785bf766d7d61124c260b94 (patch) | |
| tree | 9d1674b4fe3d7e492afeefc839009e5b11d5fe27 /internal/server | |
| parent | 3c889d2eed4e12af505ea84d46d8e52d21057a1f (diff) | |
remove drun command for simplicity. only focus on interactive commands dealing with log streams
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/background/background.go | 126 | ||||
| -rw-r--r-- | internal/server/handlers/runcommand.go | 111 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 85 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
4 files changed, 2 insertions, 326 deletions
diff --git a/internal/server/background/background.go b/internal/server/background/background.go deleted file mode 100644 index 3907448..0000000 --- a/internal/server/background/background.go +++ /dev/null @@ -1,126 +0,0 @@ -package background - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - - "github.com/mimecast/dtail/internal/io/logger" -) - -type job struct { - cancel context.CancelFunc - wg *sync.WaitGroup -} - -// Background specifies a job or command run in background on server side. -// This does not require an active DTail client SSH connection/session. -type Background struct { - mutex *sync.Mutex - jobs map[string]job -} - -// New returns a new background manager. -func New() Background { - return Background{ - jobs: make(map[string]job), - mutex: &sync.Mutex{}, - } -} - -// Add a background job. -func (b Background) Add(userName, jobName string, cancel context.CancelFunc, wg *sync.WaitGroup) error { - key := b.key(userName, jobName) - logger.Debug("background", "Add", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - if _, ok := b.jobs[key]; ok { - return errors.New("job already exists") - } - - b.jobs[key] = job{cancel, wg} - - // Clean up background job database. - go func() { - wg.Wait() - b.cancel(key) - }() - - return nil -} - -// Cancel a background job. -func (b Background) Cancel(userName, jobName string) error { - key := b.key(userName, jobName) - logger.Debug("background", "Cancel", key) - - return b.cancel(key) -} - -func (b Background) cancel(key string) error { - job, ok := b.get(key) - logger.Debug("background", "cancel", key, job, ok) - - if !ok { - return errors.New("no job to cancel") - } - - logger.Debug("background", "cancel", "run job.cancel()") - job.cancel() - logger.Debug("background", "cancel", "run job.wg.Wait()") - job.wg.Wait() - logger.Debug("background", "cancel", "run b.delete(key)") - b.delete(key) - - return nil -} - -// ListJobsC returns a channel listing all jobs of the given user. -func (b Background) ListJobsC(userName string) <-chan string { - logger.Debug("background", "ListJobC", userName) - - ch := make(chan string) - - go func() { - defer close(ch) - - b.mutex.Lock() - defer b.mutex.Unlock() - - for k := range b.jobs { - logger.Debug("ListJobsC", k, userName) - if strings.HasPrefix(k, fmt.Sprintf("%s.", userName)) { - ch <- k - } - } - }() - - return ch -} - -func (b Background) get(key string) (job, bool) { - logger.Debug("background", "get", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - job, ok := b.jobs[key] - return job, ok -} - -func (b Background) delete(key string) { - logger.Debug("background", "delete", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - delete(b.jobs, key) -} - -func (Background) key(userName, jobName string) string { - return fmt.Sprintf("%s.%s", userName, jobName) -} diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go deleted file mode 100644 index 8e5895b..0000000 --- a/internal/server/handlers/runcommand.go +++ /dev/null @@ -1,111 +0,0 @@ -package handlers - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "os/exec" - "strings" - "sync" - "time" - - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/io/run" -) - -type runCommand struct { - server *ServerHandler - run run.Run -} - -func newRunCommand(server *ServerHandler) runCommand { - return runCommand{ - server: server, - } -} - -func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error { - if argc < 2 { - return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc) - } - - ec := make(chan int, 1) - var pid int - var err error - - command := strings.Join(args[1:], " ") - if strings.Contains(command, ";") || strings.Contains(command, "\n") { - if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - return nil - } - - if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - - exitCode := <-ec - r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode)) - r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode))) - - return nil -} - -func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) { - if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) { - return -1, err - } - - timestamp := time.Now().UnixNano() - scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp) - - // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so) - logger.Debug(r.server.user, "Writing temp script", scriptPath) - - script = fmt.Sprintf("#!/bin/sh\n%s", script) - if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil { - return -1, err - } - - pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs) - go func() { - wg.Wait() - logger.Debug("Deleting script", scriptPath) - os.Remove(scriptPath) - }() - - return pid, err -} - -func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) { - if len(command) == 0 { - return -1, errors.New("Empty command provided") - } - - splitted := strings.Split(command, " ") - path := splitted[0] - args := splitted[1:] - args = append(args, outerArgs...) - - qualifiedPath, err := exec.LookPath(path) - if err != nil { - return -1, err - } - - if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") { - return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath) - } - - r.run = run.New(qualifiedPath, args) - pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines) - if err != nil { - return pid, err - } - return pid, nil -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 164a280..7ad1224 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -9,7 +9,6 @@ import ( "os" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -19,7 +18,6 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/server/background" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" ) @@ -47,11 +45,10 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 - background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { h := ServerHandler{ done: internal.NewDone(), lines: make(chan line.Line, 100), @@ -63,7 +60,6 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWait globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, - background: background, } fqdn, err := os.Hostname() @@ -314,85 +310,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() }() - case "run": - // TODO: Refactor this "run" case, move code to runcommand.go - command := newRunCommand(h) - - jobName, _ := options["jobName"] - logger.Debug(h.user, "run", options) - - if val, ok := options["background"]; ok && (val == "cancel" || val == "stop") { - if err := h.background.Cancel(h.user.Name, jobName); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - } else { - h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName)) - } - commandFinished() - return - } - - if val, ok := options["background"]; ok && val == "list" { - h.sendServerMessage("Listing jobs") - count := 0 - for jobName := range h.background.ListJobsC(h.user.Name) { - h.sendServerMessage(jobName) - count++ - } - h.sendServerMessage(fmt.Sprintf("Found %d jobs", count)) - commandFinished() - return - } - - str, _ := options["outerArgs"] - outerArgs := strings.Split(str, " ") - - var background bool - if val, ok := options["background"]; ok && val == "start" { - background = true - } - - var wg sync.WaitGroup - wg.Add(1) - - if background { - if timeout == 0 { - // Set default background timeout. - timeout = time.Hour * 1 - } - - commandCtx, cancel := context.WithTimeout(ctx, timeout) - - if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - commandFinished() - return - } - ctx = commandCtx - } - - if err := command.StartBackground(ctx, &wg, argc, args, outerArgs); err != nil { - h.sendServerMessage(logger.Error(h.user, "Unable to execute command", argc, args, err)) - commandFinished() - return - } - - // Make sure that server waits for all sub-processes to finish on shutdown - go func() { h.globalServerWaitFor <- struct{}{} }() - go func() { - wg.Wait() - <-h.globalServerWaitFor - }() - - if background { - h.sendServerMessage(logger.Info(h.user, jobName, "job started in background")) - commandFinished() - return - } - - // Command run in foreground, wait for it to complete before finishing the connection. - wg.Wait() - commandFinished() - case "ack", ".ack": h.handleAckCommand(argc, args) commandFinished() diff --git a/internal/server/server.go b/internal/server/server.go index 5e2a521..d4255a3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -11,7 +11,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/server/background" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" @@ -36,8 +35,6 @@ type Server struct { cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} - // Background jobs - background background.Background } // New returns a new server. @@ -51,7 +48,6 @@ func New() *Server { shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), - background: background.New(), } s.sshServerConfig.PasswordCallback = s.Callback @@ -183,7 +179,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler = handlers.NewControlHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) } terminate := func() { |
