diff options
| author | Paul Buetow <git@mx.buetow.org> | 2021-02-07 10:43:10 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2021-02-07 10:43:10 +0000 |
| commit | 07b9fd5044a4eb470a74048bf2878bc9d75afa1d (patch) | |
| tree | eca342e9c7ea67925e242aaab70597af8d4df480 | |
| parent | 742e6c444f7236ca3c9953050b0704bc88283ed3 (diff) | |
add rbuffer data structure
| -rw-r--r-- | internal/datas/rbuffer.go | 49 | ||||
| -rw-r--r-- | internal/datas/rbuffer_test.go | 106 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 68 |
3 files changed, 210 insertions, 13 deletions
diff --git a/internal/datas/rbuffer.go b/internal/datas/rbuffer.go new file mode 100644 index 0000000..df8f622 --- /dev/null +++ b/internal/datas/rbuffer.go @@ -0,0 +1,49 @@ +package datas + +import "fmt" + +// RBuffer is a simple circular string ring buffer data structure. +type RBuffer struct { + Capacity int + size int + readPos int + writePos int + data []string +} + +// NewRBuffer creates a new string ring buffer. +func NewRBuffer(capacity int) (*RBuffer, error) { + if capacity < 1 { + return nil, fmt.Errorf("RBuffer capacity must not be less than 1") + } + + r := RBuffer{ + Capacity: capacity, + size: capacity + 1, + data: make([]string, capacity+1), + } + + return &r, nil +} + +// Add a value. +func (r *RBuffer) Add(value string) { + r.data[r.writePos] = value + r.writePos = (r.writePos + 1) % r.size + + if r.writePos == r.readPos { + r.readPos = (r.readPos + 1) % r.size + } +} + +// Get a value. +func (r *RBuffer) Get() (string, bool) { + if r.readPos == r.writePos { + // RBuffer is empty. + return "", false + } + + value := r.data[r.readPos] + r.readPos = (r.readPos + 1) % r.size + return value, true +} diff --git a/internal/datas/rbuffer_test.go b/internal/datas/rbuffer_test.go new file mode 100644 index 0000000..456511a --- /dev/null +++ b/internal/datas/rbuffer_test.go @@ -0,0 +1,106 @@ +package datas + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func TestRBufferOneElement(t *testing.T) { + r, err := NewRBuffer(1) + if err != nil { + t.Errorf("Expected error creating ring buffer with capacity 1") + } + + testRBufferValues(t, r, []string{"Hello world"}) + testRBufferValues(t, r, []string{"Hello world", "Hello universe"}) +} + +func TestRBuffer(t *testing.T) { + if _, err := NewRBuffer(0); err == nil { + t.Errorf("Expected error creating ring buffer with capacity 0") + } + + r, err := NewRBuffer(10) + if err != nil { + t.Errorf("Error creating ring buffer with capacity 10: %v", err) + } + + fiveValues := []string{ + "42 is the answer!", + "Scroption: Get over here!", + "Have you swiped your nectar card?", + "Please mind the gap between the train and the platform!", + "Visit DTail at https://dtail.dev", + } + testRBufferValues(t, r, fiveValues) + + moreFiveValues := []string{ + "I love Golang", + "As a contrast, I also love Perl", + "Mimecast: Stop Bad Things From Happening to Good Organizations", + "We are the Buetow Brothers", + "London is calling", + } + tenValues := append(fiveValues, moreFiveValues...) + testRBufferValues(t, r, tenValues) +} + +func TestRandomRBuffer(t *testing.T) { + for i := 0; i < 100; i++ { + testRandomRBuffer(t) + } +} + +func testRandomRBuffer(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + maxCapacity := 1000 + minCapacity := 1 + capacity := rand.Intn(maxCapacity-minCapacity) + minCapacity + r, err := NewRBuffer(capacity) + if err != nil { + t.Errorf("Error creating ring buffer with capacity %d: %v", capacity, err) + } + + numValues := rand.Intn(capacity * 2) + values := make([]string, numValues) + for i := 0; i < numValues; i++ { + values = append(values, fmt.Sprintf("%d.%d", i, rand.Int())) + } + + testRBufferValues(t, r, values) +} + +func testRBufferValues(t *testing.T, r *RBuffer, values []string) { + value, ok := r.Get() + if ok { + t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value) + } + + for _, value := range values { + r.Add(value) + } + + expectedValues := values + overCapacity := len(values) - r.Capacity + if overCapacity > 0 { + expectedValues = values[overCapacity:] + } + + for _, expected := range expectedValues { + value, ok := r.Get() + if !ok { + t.Errorf("Expected value '%s' but got nothing", expected) + } + if value != expected { + t.Errorf("Expected value '%s' but got value '%v'", expected, value) + } + } + + value, ok = r.Get() + if ok { + t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value) + } +} diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 4b2af7c..4ac82d8 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -231,54 +231,96 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { defer wg.Done() - /* - beforeContext := make([]string, lContext.BeforeContext) - afterContext := make([]string, lContext.AfterContext) - */ + for { + select { + case rawLine, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + + line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) + if transmittable { + select { + case lines <- line: + continue + case <-ctx.Done(): + return + } + } + } + } +} + +// Filter log lines matching a given regular expression. +func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { + defer wg.Done() + + var bPos, bCount int + before := make([]*[]byte, lContext.BeforeContext) for { select { - case line, ok := <-rawLines: + case rawLine, ok := <-rawLines: f.updatePosition() if !ok { return } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + + if lContext.BeforeContext > 0 { + before[bPos] = &rawLine + bPos = (bPos + 1) % lContext.BeforeContext + if bCount < lContext.BeforeContext { + bCount++ + } + } + + line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) + if transmittable { + if lContext.BeforeContext > 0 { + for bCount > 0 { + bCount-- + } + } select { - case lines <- filteredLine: + case lines <- line: + continue case <-ctx.Done(): return } } + // before[bPos] = line + // bPos = (bPos+1) % lContext.BeforeContext + // bCount = (bCount+1) % lContext.BeforeContext } } } -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { +func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) { var read line.Line - if !re.Match(lineBytes) { + if !re.Match(rawLine) { f.updateLineNotMatched() f.updateLineNotTransmitted() - return read, false + return read, false, false } f.updateLineMatched() // Can we actually send more messages, channel capacity reached? if f.canSkipLines && length >= capacity { f.updateLineNotTransmitted() - return read, false + return read, true, false } f.updateLineTransmitted() read = line.Line{ - Content: lineBytes, + Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), } - return read, true + return read, true, true } // Check wether log file is truncated. Returns nil if not. |
