diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-09-04 14:42:38 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-09-04 14:42:38 +0300 |
| commit | b45e7d33de6f2e5089ef5fa6cd8220bcfe4ccae2 (patch) | |
| tree | eccadaab2b051496a76e2a642766d77e6634b456 /internal | |
| parent | 47401f94f6c429cb17a0fb32fb06d8b29156f396 (diff) | |
server side support for new regex package
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/args.go | 3 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 6 | ||||
| -rw-r--r-- | internal/clients/catclient.go | 3 | ||||
| -rw-r--r-- | internal/clients/grepclient.go | 4 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 6 | ||||
| -rw-r--r-- | internal/io/fs/filereader.go | 3 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 27 | ||||
| -rw-r--r-- | internal/regex/regex.go | 21 | ||||
| -rw-r--r-- | internal/regex/regex_test.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 31 |
10 files changed, 60 insertions, 46 deletions
diff --git a/internal/clients/args.go b/internal/clients/args.go index 1145a4b..34fcfa2 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -13,7 +13,8 @@ type Args struct { UserName string What string Arguments []string - Regex string + RegexStr string + RegexInvert bool TrustAllHosts bool Discovery string ConnectionsPerCPU int diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 7c6bce5..102fd7c 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -50,7 +50,11 @@ func (c *baseClient) init(maker maker) { c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) } - regex, err := regex.New(c.Args.Regex, regex.Default) + flag := regex.Default + if c.Args.RegexInvert { + flag = regex.Invert + } + regex, err := regex.New(c.Args.RegexStr, flag) if err != nil { logger.FatalExit(c.Regex, "invalid regex!", err, regex) } diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index 7fd6bdc..f089e32 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -17,11 +17,10 @@ type CatClient struct { // NewCatClient returns a new cat client. func NewCatClient(args Args) (*CatClient, error) { - if args.Regex != "" { + if args.RegexStr != "" { return nil, errors.New("Can't use regex with 'cat' operating mode") } - args.Regex = "." args.Mode = omode.CatClient c := CatClient{ diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 8d11458..9f6b666 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -17,7 +17,7 @@ type GrepClient struct { // NewGrepClient creates a new grep client. func NewGrepClient(args Args) (*GrepClient, error) { - if args.Regex == "" { + if args.RegexStr == "" { return nil, errors.New("No regex specified, use '-regex' flag") } args.Mode = omode.GrepClient @@ -41,7 +41,7 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { func (c GrepClient) makeCommands() (commands []string) { for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) + commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) } return } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index c6c341b..85fa7ee 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -79,11 +79,11 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* switch c.query.Table { case "", ".": - c.Regex = "." + c.RegexStr = "." case "*": - c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|") + c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:\\|") default: - c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) + c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) } c.globalGroup = mapr.NewGlobalGroupSet() diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 05e58a1..0774837 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,11 +4,12 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, regex string) error + Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index cb16ec1..6757bd6 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -8,13 +8,13 @@ import ( "fmt" "io" "os" - "regexp" "strings" "sync" "time" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" ) @@ -25,8 +25,6 @@ type readFile struct { stats // Path of log file to tail. filePath string - // Only consider all log lines matching this regular expression. - re *regexp.Regexp // The glob identifier of the file. globID string // Channel to send a server message to the dtail client @@ -61,7 +59,7 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, regex string) error { +func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { logger.Debug("readFile", f) defer func() { select { @@ -98,7 +96,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, regex strin wg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, regex) + go f.filter(ctx, &wg, rawLines, lines, re) err = f.read(ctx, fd, rawLines, truncate) close(rawLines) @@ -229,20 +227,9 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, regex string) { +func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { defer wg.Done() - if regex == "" { - regex = "." - } - - re, err := regexp.Compile(regex) - if err != nil { - logger.Error(regex, "Can't compile regex, using '.' instead", err) - re = regexp.MustCompile(".") - } - f.re = re - for { select { case line, ok := <-rawLines: @@ -250,7 +237,7 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha if !ok { return } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok { + if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { select { case lines <- filteredLine: case <-ctx.Done(): @@ -261,10 +248,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha } } -func (f readFile) transmittable(lineBytes []byte, length, capacity int) (line.Line, bool) { +func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line - if !f.re.Match(lineBytes) { + if !re.Match(lineBytes) { f.updateLineNotMatched() f.updateLineNotTransmitted() return read, false diff --git a/internal/regex/regex.go b/internal/regex/regex.go index 800bc31..e870797 100644 --- a/internal/regex/regex.go +++ b/internal/regex/regex.go @@ -27,6 +27,9 @@ func NewNoop() Regex { } func New(regexStr string, flag Flag) (Regex, error) { + if regexStr == "" || regexStr == "." || regexStr == ".*" { + return NewNoop(), nil + } return new(regexStr, []Flag{flag}) } @@ -46,11 +49,24 @@ func new(regexStr string, flags []Flag) (Regex, error) { return r, nil } +func (r Regex) Match(bytes []byte) bool { + switch r.flags[0] { + case Default: + return r.re.Match(bytes) + case Invert: + return !r.re.Match(bytes) + case Noop: + return true + default: + return false + } +} + func (r Regex) MatchString(str string) bool { switch r.flags[0] { case Default: return r.re.MatchString(str) - case Negate: + case Invert: return !r.re.MatchString(str) case Noop: return true @@ -72,7 +88,8 @@ func Deserialize(str string) (Regex, error) { // Get regex string s := strings.SplitN(str, " ", 2) if len(s) < 2 { - return Regex{}, fmt.Errorf("unable to deserialize regex '%s'", str) + logger.Debug("Using noop regex", str) + return NewNoop(), nil } flagsStr := s[0] diff --git a/internal/regex/regex_test.go b/internal/regex/regex_test.go index acdd518..a5e7faf 100644 --- a/internal/regex/regex_test.go +++ b/internal/regex/regex_test.go @@ -29,7 +29,7 @@ func TestRegex(t *testing.T) { r2.String(), r.String()) } - r, err = New(".hello", Negate) + r, err = New(".hello", Invert) if err != nil { t.Errorf("unable to create regex: %v\n", err) } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index d4c871c..d59afe7 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/regex" ) type readCommand struct { @@ -25,21 +26,25 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } func (r *readCommand) Start(ctx context.Context, argc int, args []string) { - regex := "." + re := regex.NewNoop() + if argc >= 4 { - switch args[2] { + deserializedRegex, err := regex.Deserialize(strings.Join(args[3:], " ")) + if err != nil { + logger.Error(err) + r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err)) + return } - regex = strings.Join(args[3:], " ") - logger.Debug("Joined regex", regex) + re = deserializedRegex } if argc < 3 { r.server.sendServerMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) return } - r.readGlob(ctx, args[1], regex) + r.readGlob(ctx, args[1], re) } -func (r *readCommand) readGlob(ctx context.Context, glob string, regex string) { +func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -70,23 +75,23 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, regex string) { continue } - r.readFiles(ctx, paths, glob, regex, retryInterval) + r.readFiles(ctx, paths, glob, re, retryInterval) break } } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, regex string, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { - go r.readFileIfPermissions(ctx, &wg, path, glob, regex) + go r.readFileIfPermissions(ctx, &wg, path, glob, re) } wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob, regex string) { +func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() globID := r.makeGlobID(path, glob) @@ -96,10 +101,10 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr return } - r.readFile(ctx, path, globID, regex) + r.readFile(ctx, path, globID, re) } -func (r *readCommand) readFile(ctx context.Context, path, globID, regex string) { +func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) { logger.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader @@ -120,7 +125,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID, regex string) } for { - if err := reader.Start(ctx, lines, regex); err != nil { + if err := reader.Start(ctx, lines, re); err != nil { logger.Error(r.server.user, path, globID, err) } |
