summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/dcat/main.go3
-rw-r--r--cmd/dgrep/main.go3
-rw-r--r--cmd/dmap/main.go8
-rw-r--r--cmd/dserver/main.go3
-rw-r--r--cmd/dtail/main.go16
-rw-r--r--cmd/dtailhealthcheck/main.go20
-rw-r--r--integrationtests/dcat.txt.expected (renamed from integrationtests/testdata.txt)0
-rw-r--r--integrationtests/dcat_test.go2
-rw-r--r--internal/clients/maprclient.go11
-rw-r--r--internal/config/args.go4
-rw-r--r--internal/config/common.go14
-rw-r--r--internal/config/config.go27
-rw-r--r--internal/config/initializer.go131
-rw-r--r--internal/io/dlog/dlog.go53
-rw-r--r--internal/io/dlog/level.go95
-rw-r--r--internal/io/dlog/loggers/factory.go36
-rw-r--r--internal/mapr/logformat/default_test.go2
-rw-r--r--internal/server/continuous.go4
-rw-r--r--internal/server/handlers/basehandler.go2
-rw-r--r--internal/server/scheduler.go4
20 files changed, 242 insertions, 196 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go
index ee851ab..662a50d 100644
--- a/cmd/dcat/main.go
+++ b/cmd/dcat/main.go
@@ -39,6 +39,7 @@ func main() {
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
+ flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
@@ -58,7 +59,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
- dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel)
+ dlog.Start(ctx, &wg, source.Client)
if pprof > -1 {
// For debugging purposes only
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index 95db6f0..529331d 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -34,6 +34,7 @@ func main() {
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
+ flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
@@ -55,7 +56,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
- dlog.Start(ctx, &wg, source.Client, args.LogLevel)
+ dlog.Start(ctx, &wg, source.Client)
if grep != "" {
args.RegexStr = grep
diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go
index d32ccb0..acc1dc6 100644
--- a/cmd/dmap/main.go
+++ b/cmd/dmap/main.go
@@ -19,7 +19,6 @@ import (
// The evil begins here.
func main() {
var displayVersion bool
- var queryStr string
args := config.Args{
Mode: omode.MapClient,
@@ -38,12 +37,13 @@ func main() {
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
+ flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
+ flag.StringVar(&args.QueryStr, "query", "", "Map reduce query")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
- flag.StringVar(&queryStr, "query", "", "Map reduce query")
flag.Parse()
config.Setup(source.Client, &args, flag.Args())
@@ -58,9 +58,9 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
- dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel)
+ dlog.Start(ctx, &wg, source.Client)
- client, err := clients.NewMaprClient(args, queryStr, clients.DefaultMode)
+ client, err := clients.NewMaprClient(args, clients.DefaultMode)
if err != nil {
dlog.Client.FatalPanic(err)
}
diff --git a/cmd/dserver/main.go b/cmd/dserver/main.go
index c1db2f2..780c6d5 100644
--- a/cmd/dserver/main.go
+++ b/cmd/dserver/main.go
@@ -39,6 +39,7 @@ func main() {
flag.IntVar(&shutdownAfter, "shutdownAfter", 0, "Shutdown after so many seconds")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.LogDir, "logDir", "", "Log dir")
+ flag.StringVar(&args.Logger, "logger", config.DefaultServerLogger, "Logger name")
flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.Parse()
@@ -68,7 +69,7 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)
- dlog.Start(ctx, &wg, source.Server, config.Common.LogLevel)
+ dlog.Start(ctx, &wg, source.Server)
if config.ServerRelaxedAuthEnable {
dlog.Server.Fatal("SSH relaxed-auth mode enabled")
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index 301fc08..adfeaa5 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -31,7 +31,6 @@ func main() {
var displayVersion bool
var grep string
var pprof int
- var queryStr string
var shutdownAfter int
userName := user.Name()
@@ -53,14 +52,15 @@ func main() {
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
+ flag.StringVar(&args.Logger, "logger", config.DefaultClientLogger, "Logger name")
flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.PrivateKeyPathFile, "key", "", "Path to private key")
+ flag.StringVar(&args.QueryStr, "query", "", "Map reduce query")
flag.StringVar(&args.RegexStr, "regex", ".", "Regular expression")
flag.StringVar(&args.ServersStr, "servers", "", "Remote servers to connect")
flag.StringVar(&args.UserName, "user", userName, "Your system user name")
flag.StringVar(&args.What, "files", "", "File(s) to read")
flag.StringVar(&grep, "grep", "", "Alias for -regex")
- flag.StringVar(&queryStr, "query", "", "Map reduce query")
flag.Parse()
if grep != "" {
@@ -71,15 +71,15 @@ func main() {
version.PrintAndExit()
}
if !args.Spartan {
- if !checkHealth {
- version.Print()
- }
if displayWideColorTable {
color.TablePrintAndExit(true)
}
if displayColorTable {
color.TablePrintAndExit(false)
}
+ if !checkHealth {
+ version.Print()
+ }
}
ctx, cancel := context.WithCancel(context.Background())
@@ -90,7 +90,7 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)
- dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel)
+ dlog.Start(ctx, &wg, source.Client)
if checkHealth {
fmt.Println("WARN: DTail health check has moved to separate binary dtailhealtcheck - please adjust the monitoring scripts!")
@@ -109,13 +109,13 @@ func main() {
var err error
args.Mode = omode.TailClient
- switch queryStr {
+ switch args.QueryStr {
case "":
if client, err = clients.NewTailClient(args); err != nil {
panic(err)
}
default:
- if client, err = clients.NewMaprClient(args, queryStr, clients.DefaultMode); err != nil {
+ if client, err = clients.NewMaprClient(args, clients.DefaultMode); err != nil {
panic(err)
}
}
diff --git a/cmd/dtailhealthcheck/main.go b/cmd/dtailhealthcheck/main.go
index e0cb795..71c162e 100644
--- a/cmd/dtailhealthcheck/main.go
+++ b/cmd/dtailhealthcheck/main.go
@@ -3,9 +3,14 @@ package main
import (
"context"
"flag"
+ "fmt"
"os"
"sync"
+ "net/http"
+ _ "net/http"
+ _ "net/http/pprof"
+
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -16,8 +21,14 @@ import (
// The evil begins here.
func main() {
var args config.Args
+ var pprof int
+
+ flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port")
+ flag.StringVar(&args.Logger, "logger", config.DefaultHealthCheckLogger, "Logger name")
+ flag.StringVar(&args.LogLevel, "logLevel", "", "Log level")
flag.StringVar(&args.ServersStr, "server", "", "Remote server to connect")
flag.Parse()
+
config.Setup(source.HealthCheck, &args, flag.Args())
ctx, cancel := context.WithCancel(context.Background())
@@ -25,7 +36,14 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)
- dlog.Start(ctx, &wg, source.HealthCheck, config.Common.LogLevel)
+ if pprof > -1 {
+ // For debugging purposes only
+ pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof)
+ go http.ListenAndServe(pprofArgs, nil)
+ dlog.Client.Info("Started PProf", pprofArgs)
+ }
+
+ dlog.Start(ctx, &wg, source.HealthCheck)
healthClient, _ := clients.NewHealthClient(args)
os.Exit(healthClient.Start(ctx, signal.NoCh(ctx)))
}
diff --git a/integrationtests/testdata.txt b/integrationtests/dcat.txt.expected
index 9e80424..9e80424 100644
--- a/integrationtests/testdata.txt
+++ b/integrationtests/dcat.txt.expected
diff --git a/integrationtests/dcat_test.go b/integrationtests/dcat_test.go
index 370ea25..4394552 100644
--- a/integrationtests/dcat_test.go
+++ b/integrationtests/dcat_test.go
@@ -6,7 +6,7 @@ import (
)
func TestDCat(t *testing.T) {
- testdataFile := "testdata.txt"
+ testdataFile := "dcat.txt.expected"
stdoutFile := "dcat.out"
if err := runCommand(t, "../dcat", []string{"-spartan", testdataFile}, stdoutFile); err != nil {
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 92bbe39..412a219 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -31,8 +31,6 @@ const (
// MaprClient is used for running mapreduce aggregations on remote files.
type MaprClient struct {
baseClient
- // Query string for mapr aggregations
- queryStr string
// Global group set for merged mapr aggregation results
globalGroup *mapr.GlobalGroupSet
// The query object (constructed from queryStr)
@@ -44,14 +42,14 @@ type MaprClient struct {
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
- if queryStr == "" {
+func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) {
+ if args.QueryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
- query, err := mapr.NewQuery(queryStr)
+ query, err := mapr.NewQuery(args.QueryStr)
if err != nil {
- dlog.Client.FatalPanic(queryStr, "Can't parse mapr query", err)
+ dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err)
}
// Don't retry connection if in tail mode and no outfile specified.
@@ -77,7 +75,6 @@ func NewMaprClient(args config.Args, queryStr string, maprClientMode MaprClientM
retry: retry,
},
query: query,
- queryStr: queryStr,
cumulative: cumulative,
}
diff --git a/internal/config/args.go b/internal/config/args.go
index 3e2eb1f..a671ae3 100644
--- a/internal/config/args.go
+++ b/internal/config/args.go
@@ -17,10 +17,12 @@ type Args struct {
ConnectionsPerCPU int
Discovery string
LogDir string
+ Logger string
LogLevel string
Mode omode.Mode
NoColor bool
PrivateKeyPathFile string
+ QueryStr string
Quiet bool
RegexInvert bool
RegexStr string
@@ -42,6 +44,7 @@ func (a *Args) String() string {
sb.WriteString("Args(")
sb.WriteString(fmt.Sprintf("%s:%s,", "LogDir", a.LogDir))
+ sb.WriteString(fmt.Sprintf("%s:%s,", "Logger", a.Logger))
sb.WriteString(fmt.Sprintf("%s:%s,", "LogLevel", a.LogLevel))
sb.WriteString(fmt.Sprintf("%s:%v,", "Arguments", a.Arguments))
sb.WriteString(fmt.Sprintf("%s:%v,", "ConfigFile", a.ConfigFile))
@@ -50,6 +53,7 @@ func (a *Args) String() string {
sb.WriteString(fmt.Sprintf("%s:%v,", "Mode", a.Mode))
sb.WriteString(fmt.Sprintf("%s:%v,", "NoColor", a.NoColor))
sb.WriteString(fmt.Sprintf("%s:%v,", "PrivateKeyPathFile", a.PrivateKeyPathFile))
+ sb.WriteString(fmt.Sprintf("%s:%v,", "QueryStr", a.QueryStr))
sb.WriteString(fmt.Sprintf("%s:%v,", "Quiet", a.Quiet))
sb.WriteString(fmt.Sprintf("%s:%v,", "RegexInvert", a.RegexInvert))
sb.WriteString(fmt.Sprintf("%s:%v,", "RegexStr", a.RegexStr))
diff --git a/internal/config/common.go b/internal/config/common.go
index 5e81bc9..9d22c95 100644
--- a/internal/config/common.go
+++ b/internal/config/common.go
@@ -6,14 +6,14 @@ type CommonConfig struct {
SSHPort int
// Enable experimental features (mainly for dev purposes)
ExperimentalFeaturesEnable bool `json:",omitempty"`
+ // LogDir defines the log directory.
+ LogDir string
+ // Logger defines the name of the logger implementation.
+ Logger string
// LogLevel defines how much is logged.
LogLevel string `json:",omitempty"`
- // The log strategy to use, one of
- // stdout: only log to stdout (useful when used with systemd)
- // daily: create a log file for every day
+ // LogStrategy defines the log rotation strategy.
LogStrategy string
- // The log directory
- LogDir string
// The cache directory
CacheDir string
// The temp directory
@@ -26,7 +26,9 @@ func newDefaultCommonConfig() *CommonConfig {
SSHPort: DefaultSSHPort,
ExperimentalFeaturesEnable: false,
LogDir: "log",
- LogLevel: "INFO",
+ Logger: "stdout",
+ LogLevel: DefaultLogLevel,
+ LogStrategy: "daily",
CacheDir: "cache",
TmpDir: "/tmp",
}
diff --git a/internal/config/config.go b/internal/config/config.go
index f216688..077a658 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -15,6 +15,14 @@ const (
DefaultConnectionsPerCPU int = 10
// DTailSSHServerDefaultPort is the default DServer port.
DefaultSSHPort int = 2222
+ // DefaultLogLevel specifies the default log level (obviously)
+ DefaultLogLevel string = "INFO"
+ // DefaultClientLogger specifies the default logger for the client commands.
+ DefaultClientLogger string = "fout"
+ // DefaultServerLogger specifies the default logger for dtail server.
+ DefaultServerLogger string = "file"
+ // DefaultHealthCheckLogger specifies the default logger used for health checks.
+ DefaultHealthCheckLogger string = "none"
)
// Client holds a DTail client configuration.
@@ -33,12 +41,15 @@ func Setup(sourceProcess source.Source, args *Args, additionalArgs []string) {
Server: newDefaultServerConfig(),
Client: newDefaultClientConfig(),
}
- initializer.parseConfig(args)
- Client, Server, Common = initializer.transformConfig(
- sourceProcess,
- args, additionalArgs,
- initializer.Client,
- initializer.Server,
- initializer.Common,
- )
+ if err := initializer.parseConfig(args); err != nil {
+ panic(err)
+ }
+ if err := initializer.transformConfig(sourceProcess, args, additionalArgs); err != nil {
+ panic(err)
+ }
+
+ // Make config accessible globally
+ Server = initializer.Server
+ Client = initializer.Client
+ Common = initializer.Common
}
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index ec758c8..5247699 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -18,14 +18,15 @@ type initializer struct {
Client *ClientConfig
}
-func (c *initializer) parseConfig(args *Args) {
+type transformCb func(*initializer, *Args, []string) error
+
+func (c *initializer) parseConfig(args *Args) error {
if strings.ToUpper(args.ConfigFile) == "NONE" {
- return
+ return nil
}
if args.ConfigFile != "" {
- c.parseSpecificConfig(args.ConfigFile)
- return
+ return c.parseSpecificConfig(args.ConfigFile)
}
if homeDir, err := os.UserHomeDir(); err != nil {
@@ -38,85 +39,119 @@ func (c *initializer) parseConfig(args *Args) {
}
}
}
+
+ return nil
}
-func (c *initializer) parseSpecificConfig(configFile string) {
+func (c *initializer) parseSpecificConfig(configFile string) error {
fd, err := os.Open(configFile)
if err != nil {
- panic(fmt.Sprintf("Unable to read config file: %v", err))
+ return fmt.Errorf("Unable to read config file: %v", err)
}
defer fd.Close()
cfgBytes, err := ioutil.ReadAll(fd)
if err != nil {
- panic(fmt.Sprintf("Unable to read config file %s: %v", configFile, err))
+ return fmt.Errorf("Unable to read config file %s: %v", configFile, err)
}
- err = json.Unmarshal([]byte(cfgBytes), c)
- if err != nil {
- panic(fmt.Sprintf("Unable to parse config file %s: %v", configFile, err))
+ if err := json.Unmarshal([]byte(cfgBytes), c); err != nil {
+ return fmt.Errorf("Unable to parse config file %s: %v", configFile, err)
}
+
+ return nil
}
-func (c *initializer) transformConfig(sourceProcess source.Source, args *Args, additionalArgs []string,
- client *ClientConfig, server *ServerConfig, common *CommonConfig) (*ClientConfig, *ServerConfig, *CommonConfig) {
- if args.LogDir != "" {
- common.LogDir = args.LogDir
- }
- if strings.Contains(common.LogDir, "~/") {
- homeDir, err := os.UserHomeDir()
- if err != nil {
- panic(err)
- }
- common.LogDir = strings.ReplaceAll(common.LogDir, "~/", fmt.Sprintf("%s/", homeDir))
- }
- if common.LogStrategy == "" {
- common.LogStrategy = "daily"
+func (i *initializer) transformConfig(sourceProcess source.Source, args *Args, additionalArgs []string) error {
+
+ switch sourceProcess {
+ case source.Server:
+ return i.optimusPrime(transformServer, args, additionalArgs)
+ case source.Client:
+ return i.optimusPrime(transformClient, args, additionalArgs)
+ case source.HealthCheck:
+ return i.optimusPrime(transformHealthCheck, args, additionalArgs)
+ default:
+ return fmt.Errorf("Unable to transform config, unknown source '%s'", sourceProcess)
}
+}
- if args.Spartan {
- args.Quiet = true
- args.NoColor = true
- if args.LogLevel == "" {
- args.LogLevel = "ERROR"
- }
+func (i *initializer) optimusPrime(sourceCb transformCb, args *Args, additionalArgs []string) error {
+ // Copy args to config objects.
+ if args.SSHPort != DefaultSSHPort {
+ i.Common.SSHPort = args.SSHPort
+ }
+ if args.LogLevel != DefaultLogLevel {
+ i.Common.LogLevel = args.LogLevel
}
if args.NoColor {
- client.TermColorsEnable = false
+ i.Client.TermColorsEnable = false
}
-
- if args.LogLevel != "" {
- common.LogLevel = args.LogLevel
- } else if sourceProcess == source.Client && args.ServersStr == "" && args.Discovery == "" {
- // We are in serverless mode. Default log level is WARN.
- common.LogLevel = "WARN"
+ if args.LogDir != "" {
+ i.Common.LogDir = args.LogDir
+ }
+ if args.Logger != "" {
+ i.Common.Logger = args.Logger
}
- if args.SSHPort != DefaultSSHPort {
- common.SSHPort = args.SSHPort
+ // Setup log directory.
+ if strings.Contains(i.Common.LogDir, "~/") {
+ homeDir, err := os.UserHomeDir()
+ if err != nil {
+ panic(err)
+ }
+ i.Common.LogDir = strings.ReplaceAll(i.Common.LogDir, "~/", fmt.Sprintf("%s/", homeDir))
}
+ // Serverless mode.
if args.Discovery == "" && (args.ServersStr == "" ||
strings.ToLower(args.ServersStr) == "serverless") {
// We are not connecting to any servers.
args.Serverless = true
+ i.Common.LogLevel = "WARN"
}
- if sourceProcess == source.HealthCheck {
- args.TrustAllHosts = true
- if !args.Serverless && strings.ToLower(args.ServersStr) == "" {
- args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort)
+ // Source type specific transormations.
+ sourceCb(i, args, additionalArgs)
+
+ // Spartan mode.
+ if args.Spartan {
+ args.Quiet = true
+ args.NoColor = true
+ i.Client.TermColorsEnable = false
+ if args.LogLevel == "" {
+ args.LogLevel = "ERROR"
+ i.Common.LogLevel = "ERROR"
}
}
-
- // Interpret additional args as file list.
+ // Interpret additional args as file list or as query.
if args.What == "" {
var files []string
- for _, file := range flag.Args() {
- files = append(files, file)
+ for _, arg := range flag.Args() {
+ if args.QueryStr == "" && strings.Contains(strings.ToLower(arg), "select ") {
+ args.QueryStr = arg
+ continue
+ }
+ files = append(files, arg)
}
args.What = strings.Join(files, ",")
}
- return client, server, common
+ return nil
+}
+
+func transformClient(i *initializer, args *Args, additionalArgs []string) error {
+ return nil
+}
+
+func transformServer(i *initializer, args *Args, additionalArgs []string) error {
+ return nil
+}
+
+func transformHealthCheck(i *initializer, args *Args, additionalArgs []string) error {
+ args.TrustAllHosts = true
+ if !args.Serverless && args.ServersStr == "" {
+ args.ServersStr = fmt.Sprintf("localhost:%d", DefaultSSHPort)
+ }
+ return nil
}
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
index 2ae3b04..f3774ba 100644
--- a/internal/io/dlog/dlog.go
+++ b/internal/io/dlog/dlog.go
@@ -33,7 +33,7 @@ var mutex sync.Mutex
var started bool
// Start logger(s).
-func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source, logLevel string) {
+func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source) {
mutex.Lock()
defer mutex.Unlock()
@@ -41,27 +41,18 @@ func Start(ctx context.Context, wg *sync.WaitGroup, sourceProcess source.Source,
Common.FatalPanic("Logger already started")
}
- strategy := loggers.GetStrategy(config.Common.LogStrategy)
- level := newLevel(logLevel)
-
switch sourceProcess {
case source.Client:
- // This is a DTail client process running.
- impl := loggers.FOUT
- Client = New(source.Client, source.Client, level, impl, strategy)
- Server = New(source.Client, source.Server, level, impl, strategy)
+ Client = New(source.Client, source.Client)
+ Server = New(source.Client, source.Server)
Common = Client
case source.Server:
- // This is a DTail server process running.
- impl := loggers.FILE
- Client = New(source.Server, source.Client, level, impl, strategy)
- Server = New(source.Server, source.Server, level, impl, strategy)
+ Client = New(source.Server, source.Client)
+ Server = New(source.Server, source.Server)
Common = Server
case source.HealthCheck:
- // Health check isn't logging anything.
- impl := loggers.NONE
- Client = New(source.HealthCheck, source.Client, level, impl, strategy)
- Server = New(source.HealthCheck, source.Server, level, impl, strategy)
+ Client = New(source.HealthCheck, source.Client)
+ Server = New(source.HealthCheck, source.Server)
Common = Client
}
@@ -93,16 +84,20 @@ type DLog struct {
}
// New creates a new DTail logger.
-func New(sourceProcess, sourcePackage source.Source, maxLevel level, impl loggers.Impl, strategy loggers.Strategy) *DLog {
+func New(sourceProcess, sourcePackage source.Source) *DLog {
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
+ strategy := loggers.GetStrategy(config.Common.LogStrategy)
+ loggerName := config.Common.Logger
+ level := newLevel(config.Common.LogLevel)
+
return &DLog{
- logger: loggers.Factory(sourceProcess.String(), impl, strategy),
+ logger: loggers.Factory(sourceProcess.String(), loggerName, strategy),
sourceProcess: sourceProcess,
sourcePackage: sourcePackage,
- maxLevel: maxLevel,
+ maxLevel: level,
hostname: hostname,
}
}
@@ -168,7 +163,7 @@ func (d *DLog) writeArgStrings(sb *strings.Builder, args []interface{}) {
}
func (d *DLog) FatalPanic(args ...interface{}) {
- d.log(FATAL, args)
+ d.log(Fatal, args)
d.Flush()
var sb strings.Builder
@@ -177,37 +172,37 @@ func (d *DLog) FatalPanic(args ...interface{}) {
}
func (d *DLog) Fatal(args ...interface{}) string {
- return d.log(FATAL, args)
+ return d.log(Fatal, args)
}
func (d *DLog) Error(args ...interface{}) string {
- return d.log(ERROR, args)
+ return d.log(Error, args)
}
func (d *DLog) Warn(args ...interface{}) string {
- return d.log(WARN, args)
+ return d.log(Warn, args)
}
func (d *DLog) Info(args ...interface{}) string {
- return d.log(INFO, args)
+ return d.log(Info, args)
}
func (d *DLog) Verbose(args ...interface{}) string {
- return d.log(VERBOSE, args)
+ return d.log(Verbose, args)
}
func (d *DLog) Debug(args ...interface{}) string {
- return d.log(DEBUG, args)
+ return d.log(Debug, args)
}
func (d *DLog) Trace(args ...interface{}) string {
_, file, line, _ := runtime.Caller(1)
args = append(args, fmt.Sprintf("at %s:%d", file, line))
- return d.log(TRACE, args)
+ return d.log(Trace, args)
}
func (d *DLog) Devel(args ...interface{}) string {
- return d.log(DEVEL, args)
+ return d.log(Devel, args)
}
func (d *DLog) Raw(message string) string {
@@ -260,7 +255,7 @@ func (d *DLog) Mapreduce(table string, data map[string]interface{}) string {
args[i] = fmt.Sprintf("%s=%v", k, v)
i++
}
- return d.log(INFO, args)
+ return d.log(Info, args)
}
func (d *DLog) Flush() { d.logger.Flush() }
diff --git a/internal/io/dlog/level.go b/internal/io/dlog/level.go
index 84550f0..248ad83 100644
--- a/internal/io/dlog/level.go
+++ b/internal/io/dlog/level.go
@@ -8,80 +8,69 @@ import (
type level int
const (
- FATAL level = iota
- ERROR level = iota
- WARN level = iota
- INFO level = iota
- DEFAULT level = iota
- VERBOSE level = iota
- DEBUG level = iota
- DEVEL level = iota
- TRACE level = iota
- ALL level = iota
+ Fatal level = iota
+ Error level = iota
+ Warn level = iota
+ Info level = iota
+ Default level = iota
+ Verbose level = iota
+ Debug level = iota
+ Devel level = iota
+ Trace level = iota
+ All level = iota
)
-var allLevels = []level{
- FATAL,
- ERROR,
- WARN,
- INFO,
- DEFAULT,
- VERBOSE,
- DEBUG,
- DEVEL,
- TRACE,
- ALL,
-}
+var allLevels = []level{Fatal, Error, Warn, Info, Default, Verbose, Debug, Devel, Trace, All}
func newLevel(l string) level {
- switch strings.ToUpper(l) {
- case "FATAL":
- return FATAL
- case "ERROR":
- return ERROR
- case "WARN":
- return WARN
- case "INFO":
- return INFO
+ switch strings.ToLower(l) {
+ case "fatal":
+ return Fatal
+ case "error":
+ return Error
+ case "warn":
+ return Warn
+ case "info":
+ return Info
case "":
fallthrough
- case "DEFAULT":
- return DEFAULT
- case "VERBOSE":
- return VERBOSE
- case "DEBUG":
- return DEBUG
- case "DEVEL":
- return DEVEL
- case "TRACE":
- return TRACE
- case "ALL":
- return ALL
+ case "default":
+ return Default
+ case "verbose":
+ return Verbose
+ case "debug":
+ return Debug
+ case "devel":
+ return Devel
+ case "trace":
+ return Trace
+ case "all":
+ return All
}
panic(fmt.Sprintf("Unknown log level %s, must be one of: %v", l, allLevels))
}
func (l level) String() string {
switch l {
- case FATAL:
+ case Fatal:
return "FATAL"
- case ERROR:
+ case Error:
return "ERROR"
- case WARN:
+ case Warn:
return "WARN"
- case INFO:
+ case Info:
return "INFO"
- case DEFAULT:
+ case Default:
return "DEFAULT"
- case VERBOSE:
+ case Verbose:
return "VERBOSE"
- case DEBUG:
+ case Debug:
return "DEBUG"
- case DEVEL:
+ case Devel:
return "DEVEL"
- case TRACE:
+ case Trace:
return "TRACE"
- case ALL:
+ case All:
return "ALL"
}
diff --git a/internal/io/dlog/loggers/factory.go b/internal/io/dlog/loggers/factory.go
index 8697dc4..dda3ee6 100644
--- a/internal/io/dlog/loggers/factory.go
+++ b/internal/io/dlog/loggers/factory.go
@@ -2,45 +2,38 @@ package loggers
import (
"fmt"
+ "strings"
"sync"
)
-type Impl int
-
-const (
- NONE Impl = iota
- STDOUT Impl = iota
- FILE Impl = iota
- FOUT Impl = iota
-)
-
var factoryMap map[string]Logger
var factoryMutex sync.Mutex
-func Factory(name string, impl Impl, strategy Strategy) Logger {
+func Factory(sourceName, loggerName string, rotationStrategy Strategy) Logger {
factoryMutex.Lock()
defer factoryMutex.Unlock()
- id := fmt.Sprintf("name:%s,fileBase:%s,impl:%v", name, strategy.FileBase, impl)
-
+ id := fmt.Sprintf("sourceName:%s,fileBase:%s,loggerName:%s", sourceName, rotationStrategy.FileBase, loggerName)
if factoryMap == nil {
factoryMap = make(map[string]Logger)
}
singleton, ok := factoryMap[id]
if !ok {
- switch impl {
- case NONE:
+ switch strings.ToLower(loggerName) {
+ case "none":
singleton = none{}
- case STDOUT:
+ case "stdout":
singleton = newStdout()
factoryMap[id] = singleton
- case FILE:
- singleton = newFile(strategy)
+ case "file":
+ singleton = newFile(rotationStrategy)
factoryMap[id] = singleton
- case FOUT:
- singleton = newFout(strategy)
+ case "fout":
+ singleton = newFout(rotationStrategy)
factoryMap[id] = singleton
+ default:
+ panic(fmt.Sprintf("Unsupported logger type '%s'", loggerName))
}
}
@@ -53,8 +46,7 @@ func FactoryRotate() {
if factoryMap == nil {
return
}
-
- for _, impl := range factoryMap {
- impl.Rotate()
+ for _, logger := range factoryMap {
+ logger.Rotate()
}
}
diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go
index 02c03a3..a777156 100644
--- a/internal/mapr/logformat/default_test.go
+++ b/internal/mapr/logformat/default_test.go
@@ -32,7 +32,7 @@ func TestDefaultLogFormat(t *testing.T) {
if val, ok := fields["$severity"]; !ok {
t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input)
} else if val != "INFO" {
- t.Errorf("Expected 'INFO' stored in field '$severity', but got '%s' in '%s'\n", val, input)
+ t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", val, input)
}
if val, ok := fields["$time"]; !ok {
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index 87c8889..5f84afc 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -71,8 +71,8 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name))
- query := fmt.Sprintf("%s outfile %s", job.Query, outfile)
- client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode)
+ args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile)
+ client, err := clients.NewMaprClient(args, clients.NonCumulativeMode)
if err != nil {
dlog.Server.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
return
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 4fa8f00..f73f82e 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -252,7 +252,7 @@ func (h *baseHandler) flush() {
for i := 0; i < 10; i++ {
if numUnsentMessages() == 0 {
- dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h))
+ dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
}
dlog.Server.Debug(h.user, "Still lines to be sent")
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index 64e6573..ccb2225 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -82,8 +82,8 @@ func (s *scheduler) runJobs(ctx context.Context) {
args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name))
- query := fmt.Sprintf("%s outfile %s", job.Query, outfile)
- client, err := clients.NewMaprClient(args, query, clients.CumulativeMode)
+ args.QueryStr = fmt.Sprintf("%s outfile %s", job.Query, outfile)
+ client, err := clients.NewMaprClient(args, clients.CumulativeMode)
if err != nil {
dlog.Server.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
continue