summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2021-02-07 10:43:10 +0000
committerPaul Buetow <git@mx.buetow.org>2021-02-07 10:43:10 +0000
commit07b9fd5044a4eb470a74048bf2878bc9d75afa1d (patch)
treeeca342e9c7ea67925e242aaab70597af8d4df480
parent742e6c444f7236ca3c9953050b0704bc88283ed3 (diff)
add rbuffer data structure
-rw-r--r--internal/datas/rbuffer.go49
-rw-r--r--internal/datas/rbuffer_test.go106
-rw-r--r--internal/io/fs/readfile.go68
3 files changed, 210 insertions, 13 deletions
diff --git a/internal/datas/rbuffer.go b/internal/datas/rbuffer.go
new file mode 100644
index 0000000..df8f622
--- /dev/null
+++ b/internal/datas/rbuffer.go
@@ -0,0 +1,49 @@
+package datas
+
+import "fmt"
+
+// RBuffer is a simple circular string ring buffer data structure.
+type RBuffer struct {
+ Capacity int
+ size int
+ readPos int
+ writePos int
+ data []string
+}
+
+// NewRBuffer creates a new string ring buffer.
+func NewRBuffer(capacity int) (*RBuffer, error) {
+ if capacity < 1 {
+ return nil, fmt.Errorf("RBuffer capacity must not be less than 1")
+ }
+
+ r := RBuffer{
+ Capacity: capacity,
+ size: capacity + 1,
+ data: make([]string, capacity+1),
+ }
+
+ return &r, nil
+}
+
+// Add a value.
+func (r *RBuffer) Add(value string) {
+ r.data[r.writePos] = value
+ r.writePos = (r.writePos + 1) % r.size
+
+ if r.writePos == r.readPos {
+ r.readPos = (r.readPos + 1) % r.size
+ }
+}
+
+// Get a value.
+func (r *RBuffer) Get() (string, bool) {
+ if r.readPos == r.writePos {
+ // RBuffer is empty.
+ return "", false
+ }
+
+ value := r.data[r.readPos]
+ r.readPos = (r.readPos + 1) % r.size
+ return value, true
+}
diff --git a/internal/datas/rbuffer_test.go b/internal/datas/rbuffer_test.go
new file mode 100644
index 0000000..456511a
--- /dev/null
+++ b/internal/datas/rbuffer_test.go
@@ -0,0 +1,106 @@
+package datas
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+)
+
+func TestRBufferOneElement(t *testing.T) {
+ r, err := NewRBuffer(1)
+ if err != nil {
+ t.Errorf("Expected error creating ring buffer with capacity 1")
+ }
+
+ testRBufferValues(t, r, []string{"Hello world"})
+ testRBufferValues(t, r, []string{"Hello world", "Hello universe"})
+}
+
+func TestRBuffer(t *testing.T) {
+ if _, err := NewRBuffer(0); err == nil {
+ t.Errorf("Expected error creating ring buffer with capacity 0")
+ }
+
+ r, err := NewRBuffer(10)
+ if err != nil {
+ t.Errorf("Error creating ring buffer with capacity 10: %v", err)
+ }
+
+ fiveValues := []string{
+ "42 is the answer!",
+ "Scroption: Get over here!",
+ "Have you swiped your nectar card?",
+ "Please mind the gap between the train and the platform!",
+ "Visit DTail at https://dtail.dev",
+ }
+ testRBufferValues(t, r, fiveValues)
+
+ moreFiveValues := []string{
+ "I love Golang",
+ "As a contrast, I also love Perl",
+ "Mimecast: Stop Bad Things From Happening to Good Organizations",
+ "We are the Buetow Brothers",
+ "London is calling",
+ }
+ tenValues := append(fiveValues, moreFiveValues...)
+ testRBufferValues(t, r, tenValues)
+}
+
+func TestRandomRBuffer(t *testing.T) {
+ for i := 0; i < 100; i++ {
+ testRandomRBuffer(t)
+ }
+}
+
+func testRandomRBuffer(t *testing.T) {
+ rand.Seed(time.Now().UnixNano())
+
+ maxCapacity := 1000
+ minCapacity := 1
+ capacity := rand.Intn(maxCapacity-minCapacity) + minCapacity
+ r, err := NewRBuffer(capacity)
+ if err != nil {
+ t.Errorf("Error creating ring buffer with capacity %d: %v", capacity, err)
+ }
+
+ numValues := rand.Intn(capacity * 2)
+ values := make([]string, numValues)
+ for i := 0; i < numValues; i++ {
+ values = append(values, fmt.Sprintf("%d.%d", i, rand.Int()))
+ }
+
+ testRBufferValues(t, r, values)
+}
+
+func testRBufferValues(t *testing.T, r *RBuffer, values []string) {
+ value, ok := r.Get()
+ if ok {
+ t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value)
+ }
+
+ for _, value := range values {
+ r.Add(value)
+ }
+
+ expectedValues := values
+ overCapacity := len(values) - r.Capacity
+ if overCapacity > 0 {
+ expectedValues = values[overCapacity:]
+ }
+
+ for _, expected := range expectedValues {
+ value, ok := r.Get()
+ if !ok {
+ t.Errorf("Expected value '%s' but got nothing", expected)
+ }
+ if value != expected {
+ t.Errorf("Expected value '%s' but got value '%v'", expected, value)
+ }
+ }
+
+ value, ok = r.Get()
+ if ok {
+ t.Errorf("Expected not ok reading from empty ring buffer but got ok and value '%s'", value)
+ }
+}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 4b2af7c..4ac82d8 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -231,54 +231,96 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
defer wg.Done()
- /*
- beforeContext := make([]string, lContext.BeforeContext)
- afterContext := make([]string, lContext.AfterContext)
- */
+ for {
+ select {
+ case rawLine, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+
+ line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
+ if transmittable {
+ select {
+ case lines <- line:
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression.
+func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+ defer wg.Done()
+
+ var bPos, bCount int
+ before := make([]*[]byte, lContext.BeforeContext)
for {
select {
- case line, ok := <-rawLines:
+ case rawLine, ok := <-rawLines:
f.updatePosition()
if !ok {
return
}
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
+
+ if lContext.BeforeContext > 0 {
+ before[bPos] = &rawLine
+ bPos = (bPos + 1) % lContext.BeforeContext
+ if bCount < lContext.BeforeContext {
+ bCount++
+ }
+ }
+
+ line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
+ if transmittable {
+ if lContext.BeforeContext > 0 {
+ for bCount > 0 {
+ bCount--
+ }
+ }
select {
- case lines <- filteredLine:
+ case lines <- line:
+ continue
case <-ctx.Done():
return
}
}
+ // before[bPos] = line
+ // bPos = (bPos+1) % lContext.BeforeContext
+ // bCount = (bCount+1) % lContext.BeforeContext
}
}
}
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
+func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) {
var read line.Line
- if !re.Match(lineBytes) {
+ if !re.Match(rawLine) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
- return read, false
+ return read, false, false
}
f.updateLineMatched()
// Can we actually send more messages, channel capacity reached?
if f.canSkipLines && length >= capacity {
f.updateLineNotTransmitted()
- return read, false
+ return read, true, false
}
f.updateLineTransmitted()
read = line.Line{
- Content: lineBytes,
+ Content: rawLine,
SourceID: f.globID,
Count: f.totalLineCount(),
TransmittedPerc: f.transmittedPerc(),
}
- return read, true
+ return read, true, true
}
// Check wether log file is truncated. Returns nil if not.