summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2020-02-20 08:11:27 +0000
committerPaul Buetow <paul@buetow.org>2020-02-20 08:11:27 +0000
commit5122df790b77fb915c910e57de260e6ed0563af7 (patch)
tree0ce1f65d663040d906fef6cd29a5f1245c36e64a /internal/server
parentc8dc769190404b3901a8d58ab1107c0328cd5b59 (diff)
initial background commands
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/background/commands.go28
-rw-r--r--internal/server/handlers/serverhandler.go34
2 files changed, 60 insertions, 2 deletions
diff --git a/internal/server/background/commands.go b/internal/server/background/commands.go
new file mode 100644
index 0000000..f789150
--- /dev/null
+++ b/internal/server/background/commands.go
@@ -0,0 +1,28 @@
+package background
+
+import (
+ "context"
+ "sync"
+)
+
+type command struct {
+ cancel context.CancelFunc
+ done chan struct{}
+}
+
+type Commands struct {
+ mutex sync.Mutex
+ commands map[string]command
+}
+
+func NewCommands() *Commands {
+ return &Commands{
+ commands: make(map[string]command),
+ }
+}
+
+func (b Commands) Add(argc int, args []string, cancel context.CancelFunc, done <-chan struct{}) {
+}
+
+func (h Commands) Stop(argc int, args []string) {
+}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 946ae83..cc15c63 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -17,6 +17,7 @@ import (
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
+ "github.com/mimecast/dtail/internal/server/background"
user "github.com/mimecast/dtail/internal/user/server"
"github.com/mimecast/dtail/internal/version"
)
@@ -46,6 +47,7 @@ type ServerHandler struct {
ctx context.Context
done chan struct{}
activeReaders int
+ background *background.Commands
}
// NewServerHandler returns the server handler.
@@ -63,6 +65,7 @@ func NewServerHandler(ctx context.Context, user *user.User, catLimiter, tailLimi
globalServerWaitFor: globalServerWaitFor,
regex: ".",
user: user,
+ background: background.NewCommands(),
}
fqdn, err := os.Hostname()
@@ -235,7 +238,11 @@ func (h *ServerHandler) handleControlCommand(argc int, args []string) {
func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) {
logger.Debug(h.user, "handleUserCommand", argc, args)
- switch args[0] {
+ splitted := strings.Split(args[0], ":")
+ command := splitted[0]
+ commandFlags := splitted[1:]
+
+ switch command {
case "grep", "cat":
command := newReadCommand(h, omode.CatClient)
h.incrementActiveReaders()
@@ -272,10 +279,24 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
case "run":
command := newRunCommand(h)
+
+ if contains(commandFlags, "stop_background") {
+ h.background.Stop(argc, args)
+ return
+ }
+
+ done := make(chan struct{})
+ if contains(commandFlags, "start_background") {
+ commandCtx, cancel := context.WithCancel(ctx)
+ h.background.Add(argc, args, cancel, done)
+ ctx = commandCtx
+ }
+
h.incrementActiveReaders()
go func() { h.globalServerWaitFor <- struct{}{} }()
go func() {
command.Start(ctx, argc, args)
+ close(done)
<-h.globalServerWaitFor
if h.decrementActiveReaders() == 0 {
h.shutdown()
@@ -363,7 +384,7 @@ func (h *ServerHandler) shutdown() {
}
func (h *ServerHandler) incrementActiveReaders() {
- // TODO: Use atomic counter variable instead, so we can get rid of the mutex
+ // REFACTOR: Use atomic counter variable instead, so we can get rid of the mutex
h.mutex.Lock()
defer h.mutex.Unlock()
@@ -376,3 +397,12 @@ func (h *ServerHandler) decrementActiveReaders() int {
h.activeReaders--
return h.activeReaders
}
+
+func contains(haystack []string, needle string) bool {
+ for _, str := range haystack {
+ if str == needle {
+ return true
+ }
+ }
+ return false
+}