diff options
| -rw-r--r-- | cmd/dcat/main.go | 3 | ||||
| -rw-r--r-- | cmd/dgrep/main.go | 3 | ||||
| -rw-r--r-- | cmd/dmap/main.go | 8 | ||||
| -rw-r--r-- | cmd/dserver/main.go | 3 | ||||
| -rw-r--r-- | cmd/dtail/main.go | 16 | ||||
| -rw-r--r-- | cmd/dtailhealthcheck/main.go | 20 | ||||
| -rw-r--r-- | integrationtests/dcat.txt.expected (renamed from integrationtests/testdata.txt) | 0 | ||||
| -rw-r--r-- | integrationtests/dcat_test.go | 2 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 11 | ||||
| -rw-r--r-- | internal/config/args.go | 4 | ||||
| -rw-r--r-- | internal/config/common.go | 14 | ||||
| -rw-r--r-- | internal/config/config.go | 27 | ||||
| -rw-r--r-- | internal/config/initializer.go | 131 | ||||
| -rw-r--r-- | internal/io/dlog/dlog.go | 53 | ||||
| -rw-r--r-- | internal/io/dlog/level.go | 95 | ||||
| -rw-r--r-- | internal/io/dlog/loggers/factory.go | 36 | ||||
| -rw-r--r-- | internal/mapr/logformat/default_test.go | 2 | ||||
| -rw-r--r-- | internal/server/continuous.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 2 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 4 |
20 files changed, 242 insertions, 196 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index ee851ab..662a50d 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -39,6 +39,7 @@ func main() { flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") + flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect") @@ -58,7 +59,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel) + dlog.Start(ctx, &wg, source.Client) if pprof > -1 { // For debugging purposes only diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index 95db6f0..529331d 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -34,6 +34,7 @@ func main() { flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") + flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression") @@ -55,7 +56,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.Client, args.LogLevel) + dlog.Start(ctx, &wg, source.Client) if grep != "" { args.RegexStr = grep diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go index d32ccb0..acc1dc6 100644 --- a/cmd/dmap/main.go +++ b/cmd/dmap/main.go @@ -19,7 +19,6 @@ import ( // The evil begins here. func main() { var displayVersion bool - var queryStr string args := config.Args{ Mode: omode.MapClient, @@ -38,12 +37,13 @@ func main() { flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") + flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") + flag.StringVar(&args.QueryStr, "query", "", "Map reduce query") flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect") flag.StringVar(&args.UserName, "user", userName, "Your system user name") flag.StringVar(&args.What, "files", "", "File(s) to read") - flag.StringVar(&queryStr, "query", "", "Map reduce query") flag.Parse() config.Setup(source.Client, &args, flag.Args()) @@ -58,9 +58,9 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel) + dlog.Start(ctx, &wg, source.Client) - client, err := clients.NewMaprClient(args, queryStr, clients.DefaultMode) + client, err := clients.NewMaprClient(args, clients.DefaultMode) if err != nil { dlog.Client.FatalPanic(err) } diff --git a/cmd/dserver/main.go b/cmd/dserver/main.go index c1db2f2..780c6d5 100644 --- a/cmd/dserver/main.go +++ b/cmd/dserver/main.go @@ -39,6 +39,7 @@ func main() { flag.IntVar(&shutdownAfter, "shutdownAfter", 0, "Shutdown after so many seconds") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.LogDir, "logDir", "", "Log dir") + flag.StringVar(&args.Logger, "logger", config.DefaultServerLogger, "Logger name") flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") flag.Parse() @@ -68,7 +69,7 @@ func main() { var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.Server, config.Common.LogLevel) + dlog.Start(ctx, &wg, source.Server) if config.ServerRelaxedAuthEnable { dlog.Server.Fatal("SSH relaxed-auth mode enabled") diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index 301fc08..adfeaa5 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -31,7 +31,6 @@ func main() { var displayVersion bool var grep string var pprof int - var queryStr string var shutdownAfter int userName := user.Name() @@ -53,14 +52,15 @@ func main() { flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") + flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name") flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key") + flag.StringVar(&args.QueryStr, "query", "", "Map reduce query") flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression") flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect") flag.StringVar(&args.UserName, "user", userName, "Your system user name") flag.StringVar(&args.What, "files", "", "File(s) to read") flag.StringVar(&grep, "grep", "", "Alias for -regex") - flag.StringVar(&queryStr, "query", "", "Map reduce query") flag.Parse() if grep != "" { @@ -71,15 +71,15 @@ func main() { version.PrintAndExit() } if !args.Spartan { - if !checkHealth { - version.Print() - } if displayWideColorTable { color.TablePrintAndExit(true) } if displayColorTable { color.TablePrintAndExit(false) } + if !checkHealth { + version.Print() + } } ctx, cancel := context.WithCancel(context.Background()) @@ -90,7 +90,7 @@ func main() { var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel) + dlog.Start(ctx, &wg, source.Client) if checkHealth { fmt.Println("WARN: DTail health check has moved to separate binary dtailhealtcheck - please adjust the monitoring scripts!") @@ -109,13 +109,13 @@ func main() { var err error args.Mode = omode.TailClient - switch queryStr { + switch args.QueryStr { case "": if client, err = clients.NewTailClient(args); err != nil { panic(err) } default: - if client, err = clients.NewMaprClient(args, queryStr, clients.DefaultMode); err != nil { + if client, err = clients.NewMaprClient(args, clients.DefaultMode); err != nil { panic(err) } } diff --git a/cmd/dtailhealthcheck/main.go b/cmd/dtailhealthcheck/main.go index e0cb795..71c162e 100644 --- a/cmd/dtailhealthcheck/main.go +++ b/cmd/dtailhealthcheck/main.go @@ -3,9 +3,14 @@ package main import ( "context" "flag" + "fmt" "os" "sync" + "net/http" + _ "net/http" + _ "net/http/pprof" + "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" @@ -16,8 +21,14 @@ import ( // The evil begins here. func main() { var args config.Args + var pprof int + + flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") + flag.StringVar(&args.Logger, "logger", config.DefaultHealthCheckLogger, "Logger name") + flag.StringVar(&args.LogLevel, "logLevel", "", "Log level") flag.StringVar(&args.ServersStr, "server", "", "Remote server to connect") flag.Parse() + config.Setup(source.HealthCheck, &args, flag.Args()) ctx, cancel := context.WithCancel(context.Background()) @@ -25,7 +36,14 @@ func main() { var wg sync.WaitGroup wg.Add(1) - dlog.Start(ctx, &wg, source.HealthCheck, config.Common.LogLevel) + if pprof > -1 { + // For debugging purposes only + pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof) + go http.ListenAndServe(pprofArgs, nil) + dlog.Client.Info("Started PProf", pprofArgs) + } + + dlog.Start(ctx, &wg, source.HealthCheck) healthClient, _ := clients.NewHealthClient(args) os.Exit(healthClient.Start(ctx, signal.NoCh(ctx))) } diff --git a/integrationtests/testdata.txt b/integrationtests/dcat.txt.expected index 9e80424..9e80424 100644 --- a/integrationtests/testdata.txt +++ b/integrationtests/dcat.txt.expected diff --git a/integrationtests/dcat_test.go b/integrationtests/dcat_test.go index 370ea25..4394552 100644 --- a/integrationtests/dcat_test.go +++ b/integrationtests/dcat_test.go @@ -6,7 +6,7 @@ import ( ) func TestDCat(t *testing.T) { - testdataFile := "testdata.txt" + testdataFile := "dcat.txt.expected" stdoutFile := "dcat.out" if err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil { diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 92bbe39..412a219 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -31,8 +31,6 @@ const ( // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient - // Query string for mapr aggregations - queryStr string // Global group set for merged mapr aggregation results globalGroup *mapr.GlobalGroupSet // The query object (constructed from queryStr) @@ -44,14 +42,14 @@ type MaprClient struct { } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { - if queryStr == "" { +func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) { + if args.QueryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } - query, err := mapr.NewQuery(queryStr) + query, err := mapr.NewQuery(args.QueryStr) if err != nil { - dlog.Client.FatalPanic(queryStr, "Can't parse mapr query", err) + dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err) } // Don't retry connection if in tail mode and no outfile specified. @@ -77,7 +75,6 @@ func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientM retry: retry, }, query: query, - queryStr: queryStr, cumulative: cumulative, } diff --git a/internal/config/args.go b/internal/config/args.go index 3e2eb1f..a671ae3 100644 --- a/internal/config/args.go +++ b/internal/config/args.go @@ -17,10 +17,12 @@ type Args struct { ConnectionsPerCPU int Discovery string LogDir string + Logger string LogLevel string Mode omode.Mode NoColor bool PrivateKeyPathFile string + QueryStr string Quiet bool RegexInvert bool RegexStr string @@ -42,6 +44,7 @@ func (a *Args) String() string { sb.WriteString("Args(") sb.WriteString(fmt.Sprintf("%s:%s,", "LogDir", a.LogDir)) + sb.WriteString(fmt.Sprintf("%s:%s,", "Logger", a.Logger)) sb.WriteString(fmt.Sprintf("%s:%s,", "LogLevel", a.LogLevel)) sb.WriteString(fmt.Sprintf("%s:%v,", "Arguments", a.Arguments)) sb.WriteString(fmt.Sprintf("%s:%v,", "ConfigFile", a.ConfigFile)) @@ -50,6 +53,7 @@ func (a *Args) String() string { sb.WriteString(fmt.Sprintf("%s:%v,", "Mode", a.Mode)) sb.WriteString(fmt.Sprintf("%s:%v,", "NoColor", a.NoColor)) sb.WriteString(fmt.Sprintf("%s:%v,", "PrivateKeyPathFile", a.PrivateKeyPathFile)) + sb.WriteString(fmt.Sprintf("%s:%v,", "QueryStr", a.QueryStr)) sb.WriteString(fmt.Sprintf("%s:%v,", "Quiet", a.Quiet)) sb.WriteString(fmt.Sprintf("%s:%v,", "RegexInvert", a.RegexInvert)) sb.WriteString(fmt.Sprintf("%s:%v,", "RegexStr", a.RegexStr)) diff --git a/internal/config/common.go b/internal/config/common.go index 5e81bc9..9d22c95 100644 --- a/internal/config/common.go +++ b/internal/config/common.go @@ -6,14 +6,14 @@ type CommonConfig struct { SSHPort int // Enable experimental features (mainly for dev purposes) ExperimentalFeaturesEnable bool `json:",omitempty"` + // LogDir defines the log directory. + LogDir string + // Logger defines the name of the logger implementation. + Logger string // LogLevel defines how much is logged. LogLevel string `json:",omitempty"` - // The log strategy to use, one of - // stdout: only log to stdout (useful when used with systemd) - // daily: create a log file for every day + // LogStrategy defines the log rotation strategy. LogStrategy string - // The log directory - LogDir string // The cache directory CacheDir string // The temp directory @@ -26,7 +26,9 @@ func newDefaultCommonConfig() *CommonConfig { SSHPort: DefaultSSHPort, ExperimentalFeaturesEnable: false, LogDir: "log", - LogLevel: "INFO", + Logger: "stdout", + LogLevel: DefaultLogLevel, + LogStrategy: "daily", CacheDir: "cache", TmpDir: "/tmp", } diff --git a/internal/config/config.go b/internal/config/config.go index f216688..077a658 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,14 @@ const ( DefaultConnectionsPerCPU int = 10 // DTailSSHServerDefaultPort is the default DServer port. DefaultSSHPort int = 2222 + // DefaultLogLevel specifies the default log level (obviously) + DefaultLogLevel string = "INFO" + // DefaultClientLogger specifies the default logger for the client commands. + DefaultClientLogger string = "fout" + // DefaultServerLogger specifies the default logger for dtail server. + DefaultServerLogger string = "file" + // DefaultHealthCheckLogger specifies the default logger used for health checks. + DefaultHealthCheckLogger string = "none" ) // Client holds a DTail client configuration. @@ -33,12 +41,15 @@ func Setup(sourceProcess source.Source, args *Args, additionalArgs []string) { Server: newDefaultServerConfig(), Client: newDefaultClientConfig(), } - initializer.parseConfig(args) - Client, Server, Common = initializer.transformConfig( - sourceProcess, - args, additionalArgs, - initializer.Client, - initializer.Server, - initializer.Common, - ) + if err := initializer.parseConfig(args); err != nil { + panic(err) + } + if err := initializer.transformConfig(sourceProcess, args, additionalArgs); err != nil { + panic(err) + } + + // Make config accessible globally + Server = initializer.Server + Client = initializer.Client + Common = initializer.Common } diff --git a/internal/config/initializer.go b/internal/config/initializer.go index ec758c8..5247699 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -18,14 +18,15 @@ type initializer struct { Client *ClientConfig } -func (c *initializer) parseConfig(args *Args) { +type transformCb func(*initializer, *Args, []string) error + +func (c *initializer) parseConfig(args *Args) error { if strings.ToUpper(args.ConfigFile) == "NONE" { - return + return nil } if args.ConfigFile != "" { - c.parseSpecificConfig(args.ConfigFile) - return + return c.parseSpecificConfig(args.ConfigFile) } if homeDir, err := os.UserHomeDir(); err != nil { @@ -38,85 +39,119 @@ func (c *initializer) parseConfig(args *Args) { } } } + + return nil } -func (c *initializer) parseSpecificConfig(configFile string) { +func (c *initializer) parseSpecificConfig(configFile string) error { fd, err := os.Open(configFile) if err != nil { - panic(fmt.Sprintf("Unable to read config file: %v", err)) + return fmt.Errorf("Unable to read config file: %v", err) } defer fd.Close() cfgBytes, err := ioutil.ReadAll(fd) if err != nil { - panic(fmt.Sprintf("Unable to read config file %s: %v", configFile, err)) + return fmt.Errorf("Unable to read config file %s: %v", configFile, err) } - err = json.Unmarshal([]byte(cfgBytes), c) - if err != nil { - panic(fmt.Sprintf("Unable to parse config file %s: %v", configFile, err)) + if err := json.Unmarshal([]byte(cfgBytes), c); err != nil { + return fmt.Errorf("Unable to parse config file %s: %v", configFile, err) } + + return nil } -func (c *initializer) transformConfig(sourceProcess source.Source, args *Args, additionalArgs []string, - client *ClientConfig, server *ServerConfig, common *CommonConfig) (*ClientConfig, *ServerConfig, *CommonConfig) { - if args.LogDir != "" { - common.LogDir = args.LogDir - } - if strings.Contains(common.LogDir, "~/") { - homeDir, err := os.UserHomeDir() - if err != nil { - panic(err) - } - common.LogDir = strings.ReplaceAll(common.LogDir, "~/", fmt.Sprintf("%s/", homeDir)) - } - if common.LogStrategy == "" { - common.LogStrategy = "daily" +func (i *initializer) transformConfig(sourceProcess source.Source, args *Args, additionalArgs []string) error { + + switch sourceProcess { + case source.Server: + return i.optimusPrime(transformServer, args, additionalArgs) + case source.Client: + return i.optimusPrime(transformClient, args, additionalArgs) + case source.HealthCheck: + return i.optimusPrime(transformHealthCheck, args, additionalArgs) + default: + return fmt.Errorf("Unable to transform config, unknown source '%s'", sourceProcess) } +} - if args.Spartan { - args.Quiet = true - args.NoColor = true - if args.LogLevel == "" { - args.LogLevel = "ERROR" - } +func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalArgs []string) error { + // Copy args to config objects. + if args.SSHPort != DefaultSSHPort { + i.Common.SSHPort = args.SSHPort + } + if args.LogLevel != DefaultLogLevel { + i.Common.LogLevel = args.LogLevel } if args.NoColor { - client.TermColorsEnable = false + i.Client.TermColorsEnable = false } - - if args.LogLevel != "" { - common.LogLevel = args.LogLevel - } else if sourceProcess == source.Client && args.ServersStr == "" && args.Discovery == "" { - // We are in serverless mode. Default log level is WARN. - common.LogLevel = "WARN" + if args.LogDir != "" { + i.Common.LogDir = args.LogDir + } + if args.Logger != "" { + i.Common.Logger = args.Logger } - if args.SSHPort != DefaultSSHPort { - common.SSHPort = args.SSHPort + // Setup log directory. + if strings.Contains(i.Common.LogDir, "~/") { + homeDir, err := os.UserHomeDir() + if err != nil { + panic(err) + } + i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir)) } + // Serverless mode. if args.Discovery == "" && (args.ServersStr == "" || strings.ToLower(args.ServersStr) == "serverless") { // We are not connecting to any servers. args.Serverless = true + i.Common.LogLevel = "WARN" } - if sourceProcess == source.HealthCheck { - args.TrustAllHosts = true - if !args.Serverless && strings.ToLower(args.ServersStr) == "" { - args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort) + // Source type specific transormations. + sourceCb(i, args, additionalArgs) + + // Spartan mode. + if args.Spartan { + args.Quiet = true + args.NoColor = true + i.Client.TermColorsEnable = false + if args.LogLevel == "" { + args.LogLevel = "ERROR" + i.Common.LogLevel = "ERROR" } } - - // Interpret additional args as file list. + // Interpret additional args as file list or as query. if args.What == "" { var files []string - for _, file := range flag.Args() { - files = append(files, file) + for _, arg := range flag.Args() { + if args.QueryStr == "" && strings.Contains(strings.ToLower(arg), "select ") { + args.QueryStr = arg + continue + } + files = append(files, arg) } args.What = strings.Join(files, ",") } - return client, server, common + return nil +} + +func transformClient(i *initializer, args *Args, additionalArgs []string) error { + return nil +} + +func transformServer(i *initializer, args *Args, additionalArgs []string) error { + return nil +} + +func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error { + args.TrustAllHosts = true + if !args.Serverless && args.ServersStr == "" { + args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort) + } + return nil } diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index 2ae3b04..f3774ba 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -33,7 +33,7 @@ var mutex sync.Mutex var started bool // Start logger(s). -func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, logLevel string) { +func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) { mutex.Lock() defer mutex.Unlock() @@ -41,27 +41,18 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, Common.FatalPanic("Logger already started") } - strategy := loggers.GetStrategy(config.Common.LogStrategy) - level := newLevel(logLevel) - switch sourceProcess { case source.Client: - // This is a DTail client process running. - impl := loggers.FOUT - Client = New(source.Client, source.Client, level, impl, strategy) - Server = New(source.Client, source.Server, level, impl, strategy) + Client = New(source.Client, source.Client) + Server = New(source.Client, source.Server) Common = Client case source.Server: - // This is a DTail server process running. - impl := loggers.FILE - Client = New(source.Server, source.Client, level, impl, strategy) - Server = New(source.Server, source.Server, level, impl, strategy) + Client = New(source.Server, source.Client) + Server = New(source.Server, source.Server) Common = Server case source.HealthCheck: - // Health check isn't logging anything. - impl := loggers.NONE - Client = New(source.HealthCheck, source.Client, level, impl, strategy) - Server = New(source.HealthCheck, source.Server, level, impl, strategy) + Client = New(source.HealthCheck, source.Client) + Server = New(source.HealthCheck, source.Server) Common = Client } @@ -93,16 +84,20 @@ type DLog struct { } // New creates a new DTail logger. -func New(sourceProcess, sourcePackage source.Source, maxLevel level, impl loggers.Impl, strategy loggers.Strategy) *DLog { +func New(sourceProcess, sourcePackage source.Source) *DLog { hostname, err := os.Hostname() if err != nil { panic(err) } + strategy := loggers.GetStrategy(config.Common.LogStrategy) + loggerName := config.Common.Logger + level := newLevel(config.Common.LogLevel) + return &DLog{ - logger: loggers.Factory(sourceProcess.String(), impl, strategy), + logger: loggers.Factory(sourceProcess.String(), loggerName, strategy), sourceProcess: sourceProcess, sourcePackage: sourcePackage, - maxLevel: maxLevel, + maxLevel: level, hostname: hostname, } } @@ -168,7 +163,7 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) { } func (d *DLog) FatalPanic(args ...interface{}) { - d.log(FATAL, args) + d.log(Fatal, args) d.Flush() var sb strings.Builder @@ -177,37 +172,37 @@ func (d *DLog) FatalPanic(args ...interface{}) { } func (d *DLog) Fatal(args ...interface{}) string { - return d.log(FATAL, args) + return d.log(Fatal, args) } func (d *DLog) Error(args ...interface{}) string { - return d.log(ERROR, args) + return d.log(Error, args) } func (d *DLog) Warn(args ...interface{}) string { - return d.log(WARN, args) + return d.log(Warn, args) } func (d *DLog) Info(args ...interface{}) string { - return d.log(INFO, args) + return d.log(Info, args) } func (d *DLog) Verbose(args ...interface{}) string { - return d.log(VERBOSE, args) + return d.log(Verbose, args) } func (d *DLog) Debug(args ...interface{}) string { - return d.log(DEBUG, args) + return d.log(Debug, args) } func (d *DLog) Trace(args ...interface{}) string { _, file, line, _ := runtime.Caller(1) args = append(args, fmt.Sprintf("at %s:%d", file, line)) - return d.log(TRACE, args) + return d.log(Trace, args) } func (d *DLog) Devel(args ...interface{}) string { - return d.log(DEVEL, args) + return d.log(Devel, args) } func (d *DLog) Raw(message string) string { @@ -260,7 +255,7 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string { args[i] = fmt.Sprintf("%s=%v", k, v) i++ } - return d.log(INFO, args) + return d.log(Info, args) } func (d *DLog) Flush() { d.logger.Flush() } diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go index 84550f0..248ad83 100644 --- a/internal/io/dlog/level.go +++ b/internal/io/dlog/level.go @@ -8,80 +8,69 @@ import ( type level int const ( - FATAL level = iota - ERROR level = iota - WARN level = iota - INFO level = iota - DEFAULT level = iota - VERBOSE level = iota - DEBUG level = iota - DEVEL level = iota - TRACE level = iota - ALL level = iota + Fatal level = iota + Error level = iota + Warn level = iota + Info level = iota + Default level = iota + Verbose level = iota + Debug level = iota + Devel level = iota + Trace level = iota + All level = iota ) -var allLevels = []level{ - FATAL, - ERROR, - WARN, - INFO, - DEFAULT, - VERBOSE, - DEBUG, - DEVEL, - TRACE, - ALL, -} +var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, Devel, Trace, All} func newLevel(l string) level { - switch strings.ToUpper(l) { - case "FATAL": - return FATAL - case "ERROR": - return ERROR - case "WARN": - return WARN - case "INFO": - return INFO + switch strings.ToLower(l) { + case "fatal": + return Fatal + case "error": + return Error + case "warn": + return Warn + case "info": + return Info case "": fallthrough - case "DEFAULT": - return DEFAULT - case "VERBOSE": - return VERBOSE - case "DEBUG": - return DEBUG - case "DEVEL": - return DEVEL - case "TRACE": - return TRACE - case "ALL": - return ALL + case "default": + return Default + case "verbose": + return Verbose + case "debug": + return Debug + case "devel": + return Devel + case "trace": + return Trace + case "all": + return All } panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels)) } func (l level) String() string { switch l { - case FATAL: + case Fatal: return "FATAL" - case ERROR: + case Error: return "ERROR" - case WARN: + case Warn: return "WARN" - case INFO: + case Info: return "INFO" - case DEFAULT: + case Default: return "DEFAULT" - case VERBOSE: + case Verbose: return "VERBOSE" - case DEBUG: + case Debug: return "DEBUG" - case DEVEL: + case Devel: return "DEVEL" - case TRACE: + case Trace: return "TRACE" - case ALL: + case All: return "ALL" } diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go index 8697dc4..dda3ee6 100644 --- a/internal/io/dlog/loggers/factory.go +++ b/internal/io/dlog/loggers/factory.go @@ -2,45 +2,38 @@ package loggers import ( "fmt" + "strings" "sync" ) -type Impl int - -const ( - NONE Impl = iota - STDOUT Impl = iota - FILE Impl = iota - FOUT Impl = iota -) - var factoryMap map[string]Logger var factoryMutex sync.Mutex -func Factory(name string, impl Impl, strategy Strategy) Logger { +func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger { factoryMutex.Lock() defer factoryMutex.Unlock() - id := fmt.Sprintf("name:%s,fileBase:%s,impl:%v", name, strategy.FileBase, impl) - + id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, rotationStrategy.FileBase, loggerName) if factoryMap == nil { factoryMap = make(map[string]Logger) } singleton, ok := factoryMap[id] if !ok { - switch impl { - case NONE: + switch strings.ToLower(loggerName) { + case "none": singleton = none{} - case STDOUT: + case "stdout": singleton = newStdout() factoryMap[id] = singleton - case FILE: - singleton = newFile(strategy) + case "file": + singleton = newFile(rotationStrategy) factoryMap[id] = singleton - case FOUT: - singleton = newFout(strategy) + case "fout": + singleton = newFout(rotationStrategy) factoryMap[id] = singleton + default: + panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName)) } } @@ -53,8 +46,7 @@ func FactoryRotate() { if factoryMap == nil { return } - - for _, impl := range factoryMap { - impl.Rotate() + for _, logger := range factoryMap { + logger.Rotate() } } diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index 02c03a3..a777156 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -32,7 +32,7 @@ func TestDefaultLogFormat(t *testing.T) { if val, ok := fields["$severity"]; !ok { t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input) } else if val != "INFO" { - t.Errorf("Expected 'INFO' stored in field '$severity', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", val, input) } if val, ok := fields["$time"]; !ok { diff --git a/internal/server/continuous.go b/internal/server/continuous.go index 87c8889..5f84afc 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -71,8 +71,8 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - query := fmt.Sprintf("%s outfile %s", job.Query, outfile) - client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) + args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile) + client, err := clients.NewMaprClient(args, clients.NonCumulativeMode) if err != nil { dlog.Server.Error(fmt.Sprintf("Unable to create job %s", job.Name), err) return diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 4fa8f00..f73f82e 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -252,7 +252,7 @@ func (h *baseHandler) flush() { for i := 0; i < 10; i++ { if numUnsentMessages() == 0 { - dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h)) + dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) return } dlog.Server.Debug(h.user, "Still lines to be sent") diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 64e6573..ccb2225 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -82,8 +82,8 @@ func (s *scheduler) runJobs(ctx context.Context) { args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - query := fmt.Sprintf("%s outfile %s", job.Query, outfile) - client, err := clients.NewMaprClient(args, query, clients.CumulativeMode) + args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile) + client, err := clients.NewMaprClient(args, clients.CumulativeMode) if err != nil { dlog.Server.Error(fmt.Sprintf("Unable to create job %s", job.Name), err) continue |
