diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-06-17 12:59:15 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-06-17 12:59:15 +0100 |
| commit | b7b528277014879e436ae7fe1f3851024938fbd3 (patch) | |
| tree | 8be1cc0ecf3a25c390c9a7ad97afb5aa52a6c512 /internal | |
| parent | 4da9ed0f4ded049e28607cc7ea78c8b091ca721b (diff) | |
initial log monitoring support
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 2 | ||||
| -rw-r--r-- | internal/config/config.go | 4 | ||||
| -rw-r--r-- | internal/config/server.go | 15 | ||||
| -rw-r--r-- | internal/mapr/aggregateset.go | 12 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 6 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 22 | ||||
| -rw-r--r-- | internal/server/server.go | 14 | ||||
| -rw-r--r-- | internal/user/server/user.go | 4 | ||||
| -rw-r--r-- | internal/version/version.go | 6 |
9 files changed, 51 insertions, 34 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 874bb7d..df44ec9 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -63,7 +63,7 @@ func (h *MaprHandler) handleAggregateMessage(message string) { // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count) + logger.Debug("Received aggregate data", h.server, h.count, parts) h.aggregate.Aggregate(parts[2:]) logger.Debug("Aggregated aggregate data", h.server, h.count) } diff --git a/internal/config/config.go b/internal/config/config.go index 7241276..39149bc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,8 +9,8 @@ import ( // ControlUser is used for various DTail specific operations. const ControlUser string = "DTAIL-CONTROL-USER" -// ScheduleUser is used for scheduled queries. -const ScheduleUser string = "DTAIL-SCHEDULE-USER" +// BackgroundUser is used for non-interactive scheduled queries and log monitoring and such. +const BackgroundUser string = "DTAIL-BACKGROUND-USER" // Client holds a DTail client configuration. var Client *ClientConfig diff --git a/internal/config/server.go b/internal/config/server.go index bc2f40a..43b4c34 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -26,6 +26,19 @@ type Scheduled struct { AllowFrom []string `json:",omitempty"` } +// Monitoring on log files. +type Monitoring struct { + Name string + Enable bool + Files string + Query string + ExcludeRegexes []string `json:",omitempty"` + Outfile string `json:",omitempty"` + Discovery string `json:",omitempty"` + Servers []string `json:",omitempty"` + AllowFrom []string `json:",omitempty"` +} + // ServerConfig represents the server configuration. type ServerConfig struct { // The SSH server bind port. @@ -46,6 +59,8 @@ type ServerConfig struct { HostKeyBits int // Scheduled mapreduce jobs. Schedule []Scheduled `json:",omitempty"` + // Monitoring on log files. + Monitoring []Monitoring `json:",omitempty"` } // Create a new default server configuration. diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 7fb4c17..fdf8db2 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -2,9 +2,12 @@ package mapr import ( "context" + "encoding/base64" "fmt" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // AggregateSet represents aggregated key/value pairs from the @@ -82,6 +85,15 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- for k, v := range s.SValues { sb.WriteString(k) sb.WriteString("=") + if k == "$line" { + decoded, err := base64.StdEncoding.DecodeString(v) + if err != nil { + logger.Error("Unable to decode $line", err, v) + } + sb.WriteString(string(decoded)) + sb.WriteString("|") + continue + } sb.WriteString(v) sb.WriteString("|") } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9403aa9..80a464d 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -192,6 +192,12 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var addedSample bool for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { + /* + if sc.Field == "$line" { + // Complete log line as to arrive untouched on the client side. + val = base64.StdEncoding.EncodeToString([]byte(val)) + } + */ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { logger.Error(err) continue diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index db49d1b..8b073a6 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -16,9 +16,6 @@ import ( gossh "golang.org/x/crypto/ssh" ) -const authLength = 64 -const authCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$%^&*()_+[]" - type scheduler struct { } @@ -79,7 +76,7 @@ func (s *scheduler) runJobs(ctx context.Context) { ServersStr: servers, What: files, Mode: omode.MapClient, - UserName: config.ScheduleUser, + UserName: config.BackgroundUser, } args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(scheduled.Name)) @@ -111,20 +108,3 @@ func (s *scheduler) runJobs(ctx context.Context) { logger.Info(logMessage) } } - -func fillDates(str string) string { - yyyesterday := time.Now().Add(3 * -24 * time.Hour).Format("20060102") - str = strings.ReplaceAll(str, "$yyyesterday", yyyesterday) - - yyesterday := time.Now().Add(2 * -24 * time.Hour).Format("20060102") - str = strings.ReplaceAll(str, "$yyesterday", yyesterday) - - yesterday := time.Now().Add(1 * -24 * time.Hour).Format("20060102") - str = strings.ReplaceAll(str, "$yesterday", yesterday) - - today := time.Now().Format("20060102") - str = strings.ReplaceAll(str, "$today", today) - - tomorrow := time.Now().Add(1 * 24 * time.Hour).Format("20060102") - return strings.ReplaceAll(str, "$tomorrow", tomorrow) -} diff --git a/internal/server/server.go b/internal/server/server.go index 8e791c8..0377598 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -32,6 +32,8 @@ type Server struct { tailLimiter chan struct{} // To run scheduled tasks (if configured) sched *scheduler + // Mointor log files for pattern (if configured) + mon *monitoring // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} // Background jobs @@ -48,6 +50,7 @@ func New() *Server { tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), + mon: newMonitoring(), background: background.New(), } @@ -77,6 +80,7 @@ func (s *Server) Start(ctx context.Context) int { go s.stats.start(ctx) go s.sched.start(ctx) + go s.mon.start(ctx) go s.listenerLoop(ctx, listener) select { @@ -246,16 +250,16 @@ func (s *Server) backgroundUserCallback(c gossh.ConnMetadata, authPayload []byte return nil, nil } - if user.Name == config.ScheduleUser && s.schedueleUserCanHaveSSHSession(c.RemoteAddr().String(), user, authInfo) { - logger.Debug(user, "Granting SSH connection to schedule user") + if user.Name == config.BackgroundUser && s.backgroundJobUserCanHaveSSHSession(c.RemoteAddr().String(), user, authInfo) { + logger.Debug(user, "Granting SSH connection to background user") return nil, nil } return nil, fmt.Errorf("user %s not authorized", user) } -func (s *Server) schedueleUserCanHaveSSHSession(addr string, user *user.User, jobName string) bool { - logger.Debug("schedueleUserCanHaveSSHSession", user, jobName) +func (s *Server) backgroundJobUserCanHaveSSHSession(addr string, user *user.User, jobName string) bool { + logger.Debug("backgroundJobUserCanHaveSSHSession", user, jobName) splitted := strings.Split(addr, ":") ip := splitted[0] @@ -271,7 +275,7 @@ func (s *Server) schedueleUserCanHaveSSHSession(addr string, user *user.User, jo } for _, myIP := range myIPs { - logger.Debug("schedueleUserCanHaveSSHSession", "Comparing IP addresses", ip, myIP.String()) + logger.Debug("backgroundJobUserCanHaveSSHSession", "Comparing IP addresses", ip, myIP.String()) if ip == myIP.String() { return true } diff --git a/internal/user/server/user.go b/internal/user/server/user.go index 47dc3f1..29158df 100644 --- a/internal/user/server/user.go +++ b/internal/user/server/user.go @@ -41,8 +41,8 @@ func (u *User) String() string { func (u *User) HasFilePermission(filePath, permissionType string) (hasPermission bool) { logger.Debug(u, filePath, permissionType, "Checking config permissions") - if u.Name == config.ScheduleUser { - // Schedule user has same permissions as dtail process itself. + if u.Name == config.BackgroundUser { + // Background user has same permissions as dtail process itself. return true } diff --git a/internal/version/version.go b/internal/version/version.go index f7dcb65..deffd22 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -11,11 +11,11 @@ const ( // Name of DTail. Name string = "DTail" // Version of DTail. - Version string = "2.4.0" + Version string = "2.5.0" // Additional information for DTail - Additional string = "" + Additional string = "develop" // ProtocolCompat -ibility version. - ProtocolCompat string = "2" + ProtocolCompat string = "3" ) // String representation of the DTail version. |
