summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-09 21:10:29 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 13:36:41 +0300
commitea1de3044e129d419f4e807f2624a009343a128f (patch)
tree9ff1335ca26afc90e55fd6de416457e252d75a35 /internal/server
parent7563abe9d5beaa18fa1eab0f65668f5dfcf79052 (diff)
vetting and linting and some code restyling
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/continuous.go9
-rw-r--r--internal/server/handlers/basehandler.go30
-rw-r--r--internal/server/handlers/healthhandler.go11
-rw-r--r--internal/server/handlers/mapcommand.go7
-rw-r--r--internal/server/handlers/readcommand.go41
-rw-r--r--internal/server/handlers/serverhandler.go20
-rw-r--r--internal/server/scheduler.go9
-rw-r--r--internal/server/server.go47
-rw-r--r--internal/server/stats.go11
9 files changed, 79 insertions, 106 deletions
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index 5f84afc..93b3fcb 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -13,8 +13,7 @@ import (
gossh "golang.org/x/crypto/ssh"
)
-type continuous struct {
-}
+type continuous struct{}
func newContinuous() *continuous {
return &continuous{}
@@ -23,7 +22,6 @@ func newContinuous() *continuous {
func (c *continuous) start(ctx context.Context) {
dlog.Server.Info("Starting continuous job runner after 10s")
time.Sleep(time.Second * 10)
-
c.runJobs(ctx)
}
@@ -33,7 +31,6 @@ func (c *continuous) runJobs(ctx context.Context) {
dlog.Server.Debug(job.Name, "Not running job as not enabled")
continue
}
-
go func(job config.Continuous) {
c.runJob(ctx, job)
for {
@@ -54,7 +51,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
files := fillDates(job.Files)
outfile := fillDates(job.Outfile)
-
servers := strings.Join(job.Servers, ",")
if servers == "" {
servers = config.Server.SSHBindAddress
@@ -70,7 +66,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
}
args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name))
-
args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile)
client, err := clients.NewMaprClient(args, clients.NonCumulativeMode)
if err != nil {
@@ -80,7 +75,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
-
if job.RestartOnDayChange {
go func() {
if c.waitForDayChange(ctx) {
@@ -93,7 +87,6 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
dlog.Server.Info(fmt.Sprintf("Starting job %s", job.Name))
status := client.Start(jobCtx, make(chan string))
logMessage := fmt.Sprintf("Job exited with status %d", status)
-
if status != 0 {
dlog.Server.Warn(logMessage)
return
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index f73f82e..847e8f9 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -37,7 +37,7 @@ type baseHandler struct {
activeCommands int32
quiet bool
spartan bool
- serverless bool
+ serverless int32
readBuf bytes.Buffer
writeBuf bytes.Buffer
}
@@ -59,16 +59,14 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
select {
case message := <-h.serverMessages:
if message[0] == '.' {
- // Handle hidden message (don't display to the user, interpreted by dtail client)
+ // Handle hidden message (don't display to the user)
h.readBuf.WriteString(message)
h.readBuf.WriteByte(protocol.MessageDelimiter)
n = copy(p, h.readBuf.Bytes())
return
}
- if h.serverless {
- // In serverless mode we have logged the server message already via the
- // dlog logger, no need to send the message again to the client part.
+ if h.serverless > 0 {
return
}
@@ -132,7 +130,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) {
h.writeBuf.WriteByte(b)
}
}
-
n = len(p)
return
}
@@ -145,13 +142,11 @@ func (h *baseHandler) handleCommand(commandStr string) {
h.send(h.serverMessages, dlog.Server.Error(h.user, err)+add)
return
}
-
args, argc, err = h.handleBase64(args, argc)
if err != nil {
h.send(h.serverMessages, dlog.Server.Error(h.user, err))
return
}
-
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-h.done.Done()
@@ -160,7 +155,6 @@ func (h *baseHandler) handleCommand(commandStr string) {
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
-
options, err := config.DeserializeOptions(splitted[1:])
if err != nil {
h.send(h.serverMessages, dlog.Server.Error(h.user, err))
@@ -191,8 +185,8 @@ func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, strin
if clientCompat > serverCompat {
toUpdate = "server"
}
-
- err := fmt.Errorf("DTail server protocol version '%s' does not match client protocol version '%s', please update DTail %s!",
+ err := fmt.Errorf("the DTail server protocol version '%s' does not match "+
+ "client protocol version '%s', please update DTail %s",
protocol.ProtocolCompat, args[1], toUpdate)
return args, argc, add, err
}
@@ -201,8 +195,8 @@ func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, strin
}
func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, error) {
- err := errors.New("Unable to decode client message, DTail server and client versions may not be compatible")
-
+ err := errors.New("unable to decode client message, DTail server and client " +
+ "versions may not be compatible")
if argc != 2 || args[0] != "base64" {
return args, argc, err
}
@@ -215,7 +209,8 @@ func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, erro
args = strings.Split(decodedStr, " ")
argc = len(decodedStr)
- dlog.Server.Trace(h.user, "Base64 decoded received command", decodedStr, argc, args)
+ dlog.Server.Trace(h.user, "Base64 decoded received command",
+ decodedStr, argc, args)
return args, argc, nil
}
@@ -223,7 +218,8 @@ func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, erro
func (h *baseHandler) handleAckCommand(argc int, args []string) {
if argc < 3 {
if !h.quiet {
- h.send(h.serverMessages, dlog.Server.Warn(h.user, "Unable to parse command", args, argc))
+ h.send(h.serverMessages, dlog.Server.Warn(h.user,
+ "Unable to parse command", args, argc))
}
return
}
@@ -245,11 +241,9 @@ func (h *baseHandler) send(ch chan<- string, message string) {
func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush()")
-
numUnsentMessages := func() int {
return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
-
for i := 0; i < 10; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
@@ -258,7 +252,6 @@ func (h *baseHandler) flush() {
dlog.Server.Debug(h.user, "Still lines to be sent")
time.Sleep(time.Millisecond * 10)
}
-
dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages())
}
@@ -279,7 +272,6 @@ func (h *baseHandler) shutdown() {
dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown")
case <-h.done.Done():
}
-
h.done.Shutdown()
}
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index 347ff66..8d6c400 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -35,24 +35,23 @@ func NewHealthHandler(user *user.User) *HealthHandler {
if err != nil {
dlog.Server.FatalPanic(err)
}
-
s := strings.Split(fqdn, ".")
h.hostname = s[0]
-
return &h
}
-func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, args []string,
- commandName string, options map[string]string) {
- dlog.Server.Debug(h.user, "Handling health command", argc, args)
+func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int,
+ args []string, commandName string, options map[string]string) {
+ dlog.Server.Debug(h.user, "Handling health command", argc, args)
switch commandName {
case "health":
h.send(h.serverMessages, "OK")
case ".ack":
h.handleAckCommand(argc, args)
default:
- h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown health command", commandName, argc, args))
+ h.send(h.serverMessages, dlog.Server.Error(h.user,
+ "Received unknown health command", commandName, argc, args))
}
h.shutdown()
}
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index c3e600e..65e0ed8 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -14,18 +14,17 @@ type mapCommand struct {
}
// NewMapCommand returns a new server side mapreduce command.
-func newMapCommand(serverHandler *ServerHandler, argc int, args []string) (mapCommand, *server.Aggregate, error) {
- m := mapCommand{server: serverHandler}
+func newMapCommand(serverHandler *ServerHandler, argc int,
+ args []string) (mapCommand, *server.Aggregate, error) {
+ m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
return m, nil, err
}
-
m.aggregate = aggregate
return m, aggregate, nil
-
}
func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index abc44c7..384e966 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -26,25 +26,30 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
}
}
-func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) {
- re := regex.NewNoop()
+func (r *readCommand) Start(ctx context.Context, argc int, args []string,
+ retries int) {
+ re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
if err != nil {
- r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, "Unable to parse command", err))
+ r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user,
+ "Unable to parse command", err))
return
}
re = deserializedRegex
}
if argc < 3 {
- r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to parse command", args, argc))
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Unable to parse command", args, argc))
return
}
r.readGlob(ctx, args[1], re, retries)
}
-func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) {
+func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
+ retries int) {
+
retryInterval := time.Second * 5
glob = filepath.Clean(glob)
@@ -58,7 +63,8 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
if numPaths := len(paths); numPaths == 0 {
dlog.Server.Error(r.server.user, "No such file(s) to read", glob)
- r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs"))
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Unable to read file(s), check server logs"))
select {
case <-ctx.Done():
return
@@ -72,31 +78,33 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
return
}
- r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Giving up to read file(s)"))
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Giving up to read file(s)"))
return
}
-func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string,
+ re regex.Regex, retryInterval time.Duration) {
+
var wg sync.WaitGroup
wg.Add(len(paths))
-
for _, path := range paths {
go r.readFileIfPermissions(ctx, &wg, path, glob, re)
}
-
wg.Wait()
}
-func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+func (r *readCommand) readFileIfPermissions(ctx context.Context,
+ wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+
defer wg.Done()
globID := r.makeGlobID(path, glob)
-
if !r.server.user.HasFilePermission(path, "readfiles") {
dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
- r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs"))
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Unable to read file(s), check server logs"))
return
}
-
r.readFile(ctx, path, globID, re)
}
@@ -137,7 +145,6 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
return
}
}
-
time.Sleep(time.Second * 2)
dlog.Server.Info(path, globID, "Reading file again")
}
@@ -156,11 +163,11 @@ func (r *readCommand) makeGlobID(path, glob string) string {
if len(idParts) > 0 {
return strings.Join(idParts, "/")
}
-
if len(pathParts) > 0 {
return pathParts[len(pathParts)-1]
}
- r.server.send(r.server.serverMessages, dlog.Server.Warn("Empty file path given?", path, glob))
+ r.server.send(r.server.serverMessages,
+ dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index aed8956..f12d590 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -4,6 +4,7 @@ import (
"context"
"os"
"strings"
+ "sync/atomic"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -23,7 +24,9 @@ type ServerHandler struct {
}
// NewServerHandler returns the server handler.
-func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler {
+func NewServerHandler(user *user.User, catLimiter,
+ tailLimiter chan struct{}) *ServerHandler {
+
dlog.Server.Debug(user, "Creating new server handler")
h := ServerHandler{
baseHandler: baseHandler{
@@ -51,11 +54,10 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S
return &h
}
-func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string,
- commandName string, options map[string]string) {
+func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
+ args []string, commandName string, options map[string]string) {
dlog.Server.Debug(h.user, "Handling user command", argc, args)
-
h.incrementActiveCommands()
commandFinished := func() {
if h.decrementActiveCommands() == 0 {
@@ -73,7 +75,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
}
if serverless, _ := options["serverless"]; serverless == "true" {
dlog.Server.Debug(h.user, "Enabling serverless mode")
- h.serverless = true
+ atomic.AddInt32(&h.serverless, 1)
}
switch commandName {
@@ -83,14 +85,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command.Start(ctx, argc, args, 1)
commandFinished()
}()
-
case "tail":
command := newReadCommand(h, omode.TailClient)
go func() {
command.Start(ctx, argc, args, 10)
commandFinished()
}()
-
case "map":
command, aggregate, err := newMapCommand(h, argc, args)
if err != nil {
@@ -99,19 +99,17 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
commandFinished()
return
}
-
h.aggregate = aggregate
go func() {
command.Start(ctx, h.maprMessages)
commandFinished()
}()
-
case ".ack":
h.handleAckCommand(argc, args)
commandFinished()
-
default:
- h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown user command", commandName, argc, args, options))
+ h.send(h.serverMessages, dlog.Server.Error(h.user,
+ "Received unknown user command", commandName, argc, args, options))
commandFinished()
}
}
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index ccb2225..0ba65f7 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -16,8 +16,7 @@ import (
gossh "golang.org/x/crypto/ssh"
)
-type scheduler struct {
-}
+type scheduler struct{}
func newScheduler() *scheduler {
return &scheduler{}
@@ -28,7 +27,6 @@ func (s *scheduler) start(ctx context.Context) {
// First run after just 10s!
time.Sleep(time.Second * 10)
s.runJobs(ctx)
-
for {
select {
case <-time.After(time.Minute):
@@ -45,13 +43,11 @@ func (s *scheduler) runJobs(ctx context.Context) {
dlog.Server.Debug(job.Name, "Not running job as not enabled")
continue
}
-
hour, err := strconv.Atoi(time.Now().Format("15"))
if err != nil {
dlog.Server.Error(job.Name, "Unable to create job", err)
continue
}
-
if hour < job.TimeRange[0] || hour >= job.TimeRange[1] {
dlog.Server.Debug(job.Name, "Not running job out of time range")
continue
@@ -59,7 +55,6 @@ func (s *scheduler) runJobs(ctx context.Context) {
files := fillDates(job.Files)
outfile := fillDates(job.Outfile)
-
_, err = os.Stat(outfile)
if !os.IsNotExist(err) {
dlog.Server.Debug(job.Name, "Not running job as outfile already exists", outfile)
@@ -70,7 +65,6 @@ func (s *scheduler) runJobs(ctx context.Context) {
if servers == "" {
servers = config.Server.SSHBindAddress
}
-
args := config.Args{
ConnectionsPerCPU: config.DefaultConnectionsPerCPU,
Discovery: job.Discovery,
@@ -81,7 +75,6 @@ func (s *scheduler) runJobs(ctx context.Context) {
}
args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name))
-
args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile)
client, err := clients.NewMaprClient(args, clients.CumulativeMode)
if err != nil {
diff --git a/internal/server/server.go b/internal/server/server.go
index b3d4bff..0cb5e27 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -24,9 +24,9 @@ type Server struct {
stats stats
// SSH server configuration.
sshServerConfig *gossh.ServerConfig
- // To control the max amount of concurrent cats (which can cause a lot of I/O on the server)
+ // To control the max amount of concurrent cats.
catLimiter chan struct{}
- // To control the max amount of concurrent tails
+ // To control the max amount of concurrent tails.
tailLimiter chan struct{}
// To run scheduled tasks (if configured)
sched *scheduler
@@ -61,7 +61,6 @@ func New() *Server {
// Start the server.
func (s *Server) Start(ctx context.Context) int {
dlog.Server.Info("Starting server")
-
bindAt := fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort)
dlog.Server.Info("Binding server", bindAt)
@@ -76,14 +75,12 @@ func (s *Server) Start(ctx context.Context) int {
go s.listenerLoop(ctx, listener)
<-ctx.Done()
-
// For future use.
return 0
}
func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) {
dlog.Server.Debug("Starting listener loop")
-
for {
conn, err := listener.Accept() // Blocking
if err != nil {
@@ -101,7 +98,6 @@ func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) {
conn.Close()
continue
}
-
go s.handleConnection(ctx, conn)
}
}
@@ -116,22 +112,23 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
}
s.stats.incrementConnections()
-
go gossh.DiscardRequests(reqs)
for newChannel := range chans {
go s.handleChannel(ctx, sshConn, newChannel)
}
}
-func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChannel gossh.NewChannel) {
+func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn,
+ newChannel gossh.NewChannel) {
+
user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String())
if err != nil {
dlog.Server.Error(user, err)
newChannel.Reject(gossh.Prohibited, err.Error())
return
}
- dlog.Server.Info(user, "Invoking channel handler")
+ dlog.Server.Info(user, "Invoking channel handler")
if newChannel.ChannelType() != "session" {
err := errors.New("Don'w allow other channel types than session")
dlog.Server.Error(user, err)
@@ -151,9 +148,10 @@ func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChann
}
}
-func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-chan *gossh.Request, channel gossh.Channel, user *user.User) error {
- dlog.Server.Info(user, "Invoking request handler")
+func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn,
+ in <-chan *gossh.Request, channel gossh.Channel, user *user.User) error {
+ dlog.Server.Info(user, "Invoking request handler")
for req := range in {
var payload = struct{ Value string }{}
gossh.Unmarshal(req.Payload, &payload)
@@ -167,7 +165,6 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
default:
handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
}
-
terminate := func() {
handler.Shutdown()
sshConn.Close()
@@ -178,13 +175,11 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
io.Copy(channel, handler)
terminate()
}()
-
go func() {
// Broken pipe, cancel
io.Copy(handler, channel)
terminate()
}()
-
go func() {
select {
case <-ctx.Done():
@@ -192,7 +187,6 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
}
terminate()
}()
-
go func() {
if err := sshConn.Wait(); err != nil && err != io.EOF {
dlog.Server.Error(user, err)
@@ -204,20 +198,19 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
// Only serving shell type
req.Reply(true, nil)
-
default:
req.Reply(false, nil)
-
return fmt.Errorf("Closing SSH connection as unknown request recieved|%s|%v",
req.Type, payload.Value)
}
}
-
return nil
}
// Callback for SSH authentication.
-func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) {
+func (s *Server) Callback(c gossh.ConnMetadata,
+ authPayload []byte) (*gossh.Permissions, error) {
+
user, err := user.New(c.User(), c.RemoteAddr().String())
if err != nil {
return nil, err
@@ -229,7 +222,6 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
}
authInfo := string(authPayload)
-
splitted := strings.Split(c.RemoteAddr().String(), ":")
remoteIP := splitted[0]
@@ -259,23 +251,26 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
return nil, fmt.Errorf("user %s not authorized", user)
}
-func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, allowedJobName string, allowFrom []string) bool {
- dlog.Server.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom)
+func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP,
+ allowedJobName string, allowFrom []string) bool {
+ dlog.Server.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom)
if jobName != allowedJobName {
- dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Job name does not match, skipping to next one...", allowedJobName)
+ dlog.Server.Debug(user, jobName, "backgroundCanSSH",
+ "Job name does not match, skipping to next one...", allowedJobName)
return false
}
for _, myAddr := range allowFrom {
ips, err := net.LookupIP(myAddr)
if err != nil {
- dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP address for allowed hosts lookup, skipping to next one...", myAddr, err)
+ dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP "+
+ "address for allowed hosts lookup, skipping to next one...", myAddr, err)
continue
}
-
for _, ip := range ips {
- dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", remoteIP, ip.String())
+ dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses",
+ remoteIP, ip.String())
if remoteIP == ip.String() {
return true
}
diff --git a/internal/server/stats.go b/internal/server/stats.go
index c07634d..99a644a 100644
--- a/internal/server/stats.go
+++ b/internal/server/stats.go
@@ -19,7 +19,6 @@ type stats struct {
func (s *stats) incrementConnections() {
defer s.logServerStats()
-
s.mutex.Lock()
s.currentConnections++
s.lifetimeConnections++
@@ -28,7 +27,6 @@ func (s *stats) incrementConnections() {
func (s *stats) decrementConnections() {
defer s.logServerStats()
-
s.mutex.Lock()
s.currentConnections--
s.mutex.Unlock()
@@ -40,8 +38,8 @@ func (s *stats) hasConnections() bool {
s.mutex.Unlock()
has := currentConnections > 0
- dlog.Server.Info("stats", "Server with open connections?", has, currentConnections)
-
+ dlog.Server.Info("stats", "Server with open connections?",
+ has, currentConnections)
return has
}
@@ -52,7 +50,6 @@ func (s *stats) logServerStats() {
data := make(map[string]interface{})
data["currentConnections"] = s.currentConnections
data["lifetimeConnections"] = s.lifetimeConnections
-
dlog.Server.Mapreduce("STATS", data)
}
@@ -61,9 +58,9 @@ func (s *stats) serverLimitExceeded() error {
defer s.mutex.Unlock()
if s.currentConnections >= config.Server.MaxConnections {
- return fmt.Errorf("Exceeded max allowed concurrent connections of %d", config.Server.MaxConnections)
+ return fmt.Errorf("Exceeded max allowed concurrent connections of %d",
+ config.Server.MaxConnections)
}
-
return nil
}