diff options
| author | Paul Buetow <paul@buetow.org> | 2020-02-20 08:11:27 +0000 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2020-02-20 08:11:27 +0000 |
| commit | 5122df790b77fb915c910e57de260e6ed0563af7 (patch) | |
| tree | 0ce1f65d663040d906fef6cd29a5f1245c36e64a /internal/server | |
| parent | c8dc769190404b3901a8d58ab1107c0328cd5b59 (diff) | |
initial background commands
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/background/commands.go | 28 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 34 |
2 files changed, 60 insertions, 2 deletions
diff --git a/internal/server/background/commands.go b/internal/server/background/commands.go new file mode 100644 index 0000000..f789150 --- /dev/null +++ b/internal/server/background/commands.go @@ -0,0 +1,28 @@ +package background + +import ( + "context" + "sync" +) + +type command struct { + cancel context.CancelFunc + done chan struct{} +} + +type Commands struct { + mutex sync.Mutex + commands map[string]command +} + +func NewCommands() *Commands { + return &Commands{ + commands: make(map[string]command), + } +} + +func (b Commands) Add(argc int, args []string, cancel context.CancelFunc, done <-chan struct{}) { +} + +func (h Commands) Stop(argc int, args []string) { +} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 946ae83..cc15c63 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -17,6 +17,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/server/background" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" ) @@ -46,6 +47,7 @@ type ServerHandler struct { ctx context.Context done chan struct{} activeReaders int + background *background.Commands } // NewServerHandler returns the server handler. @@ -63,6 +65,7 @@ func NewServerHandler(ctx context.Context, user *user.User, catLimiter, tailLimi globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, + background: background.NewCommands(), } fqdn, err := os.Hostname() @@ -235,7 +238,11 @@ func (h *ServerHandler) handleControlCommand(argc int, args []string) { func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { logger.Debug(h.user, "handleUserCommand", argc, args) - switch args[0] { + splitted := strings.Split(args[0], ":") + command := splitted[0] + commandFlags := splitted[1:] + + switch command { case "grep", "cat": command := newReadCommand(h, omode.CatClient) h.incrementActiveReaders() @@ -272,10 +279,24 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "run": command := newRunCommand(h) + + if contains(commandFlags, "stop_background") { + h.background.Stop(argc, args) + return + } + + done := make(chan struct{}) + if contains(commandFlags, "start_background") { + commandCtx, cancel := context.WithCancel(ctx) + h.background.Add(argc, args, cancel, done) + ctx = commandCtx + } + h.incrementActiveReaders() go func() { h.globalServerWaitFor <- struct{}{} }() go func() { command.Start(ctx, argc, args) + close(done) <-h.globalServerWaitFor if h.decrementActiveReaders() == 0 { h.shutdown() @@ -363,7 +384,7 @@ func (h *ServerHandler) shutdown() { } func (h *ServerHandler) incrementActiveReaders() { - // TODO: Use atomic counter variable instead, so we can get rid of the mutex + // REFACTOR: Use atomic counter variable instead, so we can get rid of the mutex h.mutex.Lock() defer h.mutex.Unlock() @@ -376,3 +397,12 @@ func (h *ServerHandler) decrementActiveReaders() int { h.activeReaders-- return h.activeReaders } + +func contains(haystack []string, needle string) bool { + for _, str := range haystack { + if str == needle { + return true + } + } + return false +} |
