summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2020-11-16 22:11:44 +0000
committerPaul Buetow <git@mx.buetow.org>2020-11-16 22:11:44 +0000
commit7df612f527bd5dc2e785bf766d7d61124c260b94 (patch)
tree9d1674b4fe3d7e492afeefc839009e5b11d5fe27 /internal/server
parent3c889d2eed4e12af505ea84d46d8e52d21057a1f (diff)
remove drun command for simplicity. only focus on interactive commands dealing with log streams
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/background/background.go126
-rw-r--r--internal/server/handlers/runcommand.go111
-rw-r--r--internal/server/handlers/serverhandler.go85
-rw-r--r--internal/server/server.go6
4 files changed, 2 insertions, 326 deletions
diff --git a/internal/server/background/background.go b/internal/server/background/background.go
deleted file mode 100644
index 3907448..0000000
--- a/internal/server/background/background.go
+++ /dev/null
@@ -1,126 +0,0 @@
-package background
-
-import (
- "context"
- "errors"
- "fmt"
- "strings"
- "sync"
-
- "github.com/mimecast/dtail/internal/io/logger"
-)
-
-type job struct {
- cancel context.CancelFunc
- wg *sync.WaitGroup
-}
-
-// Background specifies a job or command run in background on server side.
-// This does not require an active DTail client SSH connection/session.
-type Background struct {
- mutex *sync.Mutex
- jobs map[string]job
-}
-
-// New returns a new background manager.
-func New() Background {
- return Background{
- jobs: make(map[string]job),
- mutex: &sync.Mutex{},
- }
-}
-
-// Add a background job.
-func (b Background) Add(userName, jobName string, cancel context.CancelFunc, wg *sync.WaitGroup) error {
- key := b.key(userName, jobName)
- logger.Debug("background", "Add", key)
-
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- if _, ok := b.jobs[key]; ok {
- return errors.New("job already exists")
- }
-
- b.jobs[key] = job{cancel, wg}
-
- // Clean up background job database.
- go func() {
- wg.Wait()
- b.cancel(key)
- }()
-
- return nil
-}
-
-// Cancel a background job.
-func (b Background) Cancel(userName, jobName string) error {
- key := b.key(userName, jobName)
- logger.Debug("background", "Cancel", key)
-
- return b.cancel(key)
-}
-
-func (b Background) cancel(key string) error {
- job, ok := b.get(key)
- logger.Debug("background", "cancel", key, job, ok)
-
- if !ok {
- return errors.New("no job to cancel")
- }
-
- logger.Debug("background", "cancel", "run job.cancel()")
- job.cancel()
- logger.Debug("background", "cancel", "run job.wg.Wait()")
- job.wg.Wait()
- logger.Debug("background", "cancel", "run b.delete(key)")
- b.delete(key)
-
- return nil
-}
-
-// ListJobsC returns a channel listing all jobs of the given user.
-func (b Background) ListJobsC(userName string) <-chan string {
- logger.Debug("background", "ListJobC", userName)
-
- ch := make(chan string)
-
- go func() {
- defer close(ch)
-
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- for k := range b.jobs {
- logger.Debug("ListJobsC", k, userName)
- if strings.HasPrefix(k, fmt.Sprintf("%s.", userName)) {
- ch <- k
- }
- }
- }()
-
- return ch
-}
-
-func (b Background) get(key string) (job, bool) {
- logger.Debug("background", "get", key)
-
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- job, ok := b.jobs[key]
- return job, ok
-}
-
-func (b Background) delete(key string) {
- logger.Debug("background", "delete", key)
-
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- delete(b.jobs, key)
-}
-
-func (Background) key(userName, jobName string) string {
- return fmt.Sprintf("%s.%s", userName, jobName)
-}
diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go
deleted file mode 100644
index 8e5895b..0000000
--- a/internal/server/handlers/runcommand.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package handlers
-
-import (
- "context"
- "errors"
- "fmt"
- "io/ioutil"
- "os"
- "os/exec"
- "strings"
- "sync"
- "time"
-
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/io/run"
-)
-
-type runCommand struct {
- server *ServerHandler
- run run.Run
-}
-
-func newRunCommand(server *ServerHandler) runCommand {
- return runCommand{
- server: server,
- }
-}
-
-func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error {
- if argc < 2 {
- return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc)
- }
-
- ec := make(chan int, 1)
- var pid int
- var err error
-
- command := strings.Join(args[1:], " ")
- if strings.Contains(command, ";") || strings.Contains(command, "\n") {
- if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil {
- r.server.sendServerMessage(".run exitstatus 255")
- return err
- }
- return nil
- }
-
- if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil {
- r.server.sendServerMessage(".run exitstatus 255")
- return err
- }
-
- exitCode := <-ec
- r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode))
- r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode)))
-
- return nil
-}
-
-func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) {
- if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) {
- return -1, err
- }
-
- timestamp := time.Now().UnixNano()
- scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp)
-
- // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so)
- logger.Debug(r.server.user, "Writing temp script", scriptPath)
-
- script = fmt.Sprintf("#!/bin/sh\n%s", script)
- if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil {
- return -1, err
- }
-
- pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs)
- go func() {
- wg.Wait()
- logger.Debug("Deleting script", scriptPath)
- os.Remove(scriptPath)
- }()
-
- return pid, err
-}
-
-func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) {
- if len(command) == 0 {
- return -1, errors.New("Empty command provided")
- }
-
- splitted := strings.Split(command, " ")
- path := splitted[0]
- args := splitted[1:]
- args = append(args, outerArgs...)
-
- qualifiedPath, err := exec.LookPath(path)
- if err != nil {
- return -1, err
- }
-
- if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") {
- return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath)
- }
-
- r.run = run.New(qualifiedPath, args)
- pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines)
- if err != nil {
- return pid, err
- }
- return pid, nil
-}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 164a280..7ad1224 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -9,7 +9,6 @@ import (
"os"
"strconv"
"strings"
- "sync"
"sync/atomic"
"time"
@@ -19,7 +18,6 @@ import (
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
- "github.com/mimecast/dtail/internal/server/background"
user "github.com/mimecast/dtail/internal/user/server"
"github.com/mimecast/dtail/internal/version"
)
@@ -47,11 +45,10 @@ type ServerHandler struct {
ackCloseReceived chan struct{}
activeCommands int32
activeReaders int32
- background background.Background
}
// NewServerHandler returns the server handler.
-func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) *ServerHandler {
+func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler {
h := ServerHandler{
done: internal.NewDone(),
lines: make(chan line.Line, 100),
@@ -63,7 +60,6 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWait
globalServerWaitFor: globalServerWaitFor,
regex: ".",
user: user,
- background: background,
}
fqdn, err := os.Hostname()
@@ -314,85 +310,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
commandFinished()
}()
- case "run":
- // TODO: Refactor this "run" case, move code to runcommand.go
- command := newRunCommand(h)
-
- jobName, _ := options["jobName"]
- logger.Debug(h.user, "run", options)
-
- if val, ok := options["background"]; ok && (val == "cancel" || val == "stop") {
- if err := h.background.Cancel(h.user.Name, jobName); err != nil {
- h.sendServerMessage(logger.Error(h.user, err, jobName, args))
- } else {
- h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName))
- }
- commandFinished()
- return
- }
-
- if val, ok := options["background"]; ok && val == "list" {
- h.sendServerMessage("Listing jobs")
- count := 0
- for jobName := range h.background.ListJobsC(h.user.Name) {
- h.sendServerMessage(jobName)
- count++
- }
- h.sendServerMessage(fmt.Sprintf("Found %d jobs", count))
- commandFinished()
- return
- }
-
- str, _ := options["outerArgs"]
- outerArgs := strings.Split(str, " ")
-
- var background bool
- if val, ok := options["background"]; ok && val == "start" {
- background = true
- }
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- if background {
- if timeout == 0 {
- // Set default background timeout.
- timeout = time.Hour * 1
- }
-
- commandCtx, cancel := context.WithTimeout(ctx, timeout)
-
- if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil {
- h.sendServerMessage(logger.Error(h.user, err, jobName, args))
- commandFinished()
- return
- }
- ctx = commandCtx
- }
-
- if err := command.StartBackground(ctx, &wg, argc, args, outerArgs); err != nil {
- h.sendServerMessage(logger.Error(h.user, "Unable to execute command", argc, args, err))
- commandFinished()
- return
- }
-
- // Make sure that server waits for all sub-processes to finish on shutdown
- go func() { h.globalServerWaitFor <- struct{}{} }()
- go func() {
- wg.Wait()
- <-h.globalServerWaitFor
- }()
-
- if background {
- h.sendServerMessage(logger.Info(h.user, jobName, "job started in background"))
- commandFinished()
- return
- }
-
- // Command run in foreground, wait for it to complete before finishing the connection.
- wg.Wait()
- commandFinished()
-
case "ack", ".ack":
h.handleAckCommand(argc, args)
commandFinished()
diff --git a/internal/server/server.go b/internal/server/server.go
index 5e2a521..d4255a3 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -11,7 +11,6 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/server/background"
"github.com/mimecast/dtail/internal/server/handlers"
"github.com/mimecast/dtail/internal/ssh/server"
user "github.com/mimecast/dtail/internal/user/server"
@@ -36,8 +35,6 @@ type Server struct {
cont *continuous
// Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed.
shutdownWaitFor chan struct{}
- // Background jobs
- background background.Background
}
// New returns a new server.
@@ -51,7 +48,6 @@ func New() *Server {
shutdownWaitFor: make(chan struct{}, 1000),
sched: newScheduler(),
cont: newContinuous(),
- background: background.New(),
}
s.sshServerConfig.PasswordCallback = s.Callback
@@ -183,7 +179,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
case config.ControlUser:
handler = handlers.NewControlHandler(user)
default:
- handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background)
+ handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor)
}
terminate := func() {