From 5122df790b77fb915c910e57de260e6ed0563af7 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 20 Feb 2020 08:11:27 +0000 Subject: initial background commands --- internal/server/background/commands.go | 28 +++++++++++++++++++++++++ internal/server/handlers/serverhandler.go | 34 +++++++++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 internal/server/background/commands.go (limited to 'internal/server') diff --git a/internal/server/background/commands.go b/internal/server/background/commands.go new file mode 100644 index 0000000..f789150 --- /dev/null +++ b/internal/server/background/commands.go @@ -0,0 +1,28 @@ +package background + +import ( + "context" + "sync" +) + +type command struct { + cancel context.CancelFunc + done chan struct{} +} + +type Commands struct { + mutex sync.Mutex + commands map[string]command +} + +func NewCommands() *Commands { + return &Commands{ + commands: make(map[string]command), + } +} + +func (b Commands) Add(argc int, args []string, cancel context.CancelFunc, done <-chan struct{}) { +} + +func (h Commands) Stop(argc int, args []string) { +} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 946ae83..cc15c63 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -17,6 +17,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/server/background" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" ) @@ -46,6 +47,7 @@ type ServerHandler struct { ctx context.Context done chan struct{} activeReaders int + background *background.Commands } // NewServerHandler returns the server handler. @@ -63,6 +65,7 @@ func NewServerHandler(ctx context.Context, user *user.User, catLimiter, tailLimi globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, + background: background.NewCommands(), } fqdn, err := os.Hostname() @@ -235,7 +238,11 @@ func (h *ServerHandler) handleControlCommand(argc int, args []string) { func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { logger.Debug(h.user, "handleUserCommand", argc, args) - switch args[0] { + splitted := strings.Split(args[0], ":") + command := splitted[0] + commandFlags := splitted[1:] + + switch command { case "grep", "cat": command := newReadCommand(h, omode.CatClient) h.incrementActiveReaders() @@ -272,10 +279,24 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "run": command := newRunCommand(h) + + if contains(commandFlags, "stop_background") { + h.background.Stop(argc, args) + return + } + + done := make(chan struct{}) + if contains(commandFlags, "start_background") { + commandCtx, cancel := context.WithCancel(ctx) + h.background.Add(argc, args, cancel, done) + ctx = commandCtx + } + h.incrementActiveReaders() go func() { h.globalServerWaitFor <- struct{}{} }() go func() { command.Start(ctx, argc, args) + close(done) <-h.globalServerWaitFor if h.decrementActiveReaders() == 0 { h.shutdown() @@ -363,7 +384,7 @@ func (h *ServerHandler) shutdown() { } func (h *ServerHandler) incrementActiveReaders() { - // TODO: Use atomic counter variable instead, so we can get rid of the mutex + // REFACTOR: Use atomic counter variable instead, so we can get rid of the mutex h.mutex.Lock() defer h.mutex.Unlock() @@ -376,3 +397,12 @@ func (h *ServerHandler) decrementActiveReaders() int { h.activeReaders-- return h.activeReaders } + +func contains(haystack []string, needle string) bool { + for _, str := range haystack { + if str == needle { + return true + } + } + return false +} -- cgit v1.2.3