summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--internal/eventfilter.go2
-rw-r--r--internal/flamegraph/collapsed.go95
-rw-r--r--internal/flamegraph/flamegraph.go124
-rw-r--r--internal/flamegraph/worker.go52
6 files changed, 178 insertions, 101 deletions
diff --git a/go.mod b/go.mod
index 51d190a..efcee17 100644
--- a/go.mod
+++ b/go.mod
@@ -1,5 +1,5 @@
module ior
-go 1.18
+go 1.23
require github.com/aquasecurity/libbpfgo v0.6.0-libbpf-1.3.0.20240111220235-90dbffffbdab
diff --git a/go.sum b/go.sum
index be464d6..ff45e3b 100644
--- a/go.sum
+++ b/go.sum
@@ -1,6 +1,10 @@
github.com/aquasecurity/libbpfgo v0.6.0-libbpf-1.3.0.20240111220235-90dbffffbdab h1:w74AraWsnj+AgEOk2uERlLtECCWutMtuwCGCCWzpBBs=
github.com/aquasecurity/libbpfgo v0.6.0-libbpf-1.3.0.20240111220235-90dbffffbdab/go.mod h1:0rEApF1YBHGuZ4C8OYI9q5oDBVpgqtRqYATePl9mCDk=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/internal/eventfilter.go b/internal/eventfilter.go
index ae58cb2..c249f20 100644
--- a/internal/eventfilter.go
+++ b/internal/eventfilter.go
@@ -8,7 +8,7 @@ import (
"strings"
)
-// TODO: Move to event package
+// TODO: Move to event package?
type eventFilter struct {
commFilterEnable bool
commFilter string
diff --git a/internal/flamegraph/collapsed.go b/internal/flamegraph/collapsed.go
new file mode 100644
index 0000000..f61d917
--- /dev/null
+++ b/internal/flamegraph/collapsed.go
@@ -0,0 +1,95 @@
+package flamegraph
+
+import (
+ "fmt"
+ "ior/internal/types"
+ "os"
+ "strings"
+ "sync"
+)
+
+type counter struct {
+ count uint64
+ duration uint64
+}
+
+func (c *counter) merge(other counter) {
+ c.count += other.count
+ c.duration += other.duration
+}
+
+type collapsed map[string]map[types.TraceId]counter
+
+// TODO: Unit test this
+func (c collapsed) merge(other collapsed) (merged int) {
+ for k, v := range other {
+ if _, ok := c[k]; !ok {
+ c[k] = make(map[types.TraceId]counter)
+ }
+ for traceId, cnt := range v {
+ if existingCnt, ok := c[k][traceId]; ok {
+ existingCnt.merge(cnt)
+ merged++
+ c[k][traceId] = existingCnt
+ continue
+ }
+ c[k][traceId] = cnt
+ }
+ }
+ return
+}
+
+func (c collapsed) dump() {
+ var wg sync.WaitGroup
+ wg.Add(4)
+
+ go c.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt counter) uint64 {
+ return cnt.count
+ })
+ go c.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt counter) uint64 {
+ return cnt.duration
+ })
+ go c.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt counter) uint64 {
+ return cnt.count
+ })
+ go c.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt counter) uint64 {
+ return cnt.duration
+ })
+
+ wg.Wait()
+}
+
+func (c collapsed) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(counter) uint64) {
+ defer wg.Done()
+
+ fmt.Println("Dumping", outfile)
+ file, err := os.Create(outfile)
+ if err != nil {
+ panic(err)
+ }
+ defer file.Close()
+
+ for path, value := range c {
+ var sb strings.Builder
+
+ for i, part := range strings.Split(path, "/") {
+ if i > 1 {
+ sb.WriteString(";")
+ sb.WriteString("/")
+ }
+ sb.WriteString(part)
+ }
+
+ for traceId, cnt := range value {
+ var err error
+ if syscallAtTop {
+ _, err = fmt.Fprintf(file, "%s;syscall`%s %v\n", sb.String(), traceId.Name(), by(cnt))
+ } else {
+ _, err = fmt.Fprintf(file, "syscall`%s;%s %v\n", traceId.Name(), sb.String(), by(cnt))
+ }
+ if err != nil {
+ panic(err)
+ }
+ }
+ }
+}
diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go
index 9c51cb2..223389d 100644
--- a/internal/flamegraph/flamegraph.go
+++ b/internal/flamegraph/flamegraph.go
@@ -4,122 +4,48 @@ import (
"context"
"fmt"
"ior/internal/event"
- "ior/internal/types"
- "os"
- "strings"
+ "runtime"
"sync"
- "time"
)
-type counter struct {
- count uint64
- duration uint64
-}
-
// TODO: Add Command in path! Make it configurable? comm/syscall/path, or path/syscall/comm, etc...
// TODO: Idea, show time spent between the syscalls (off syscalls) as well, but in a different color
-// TODO: Profile for CPU usage. If too slow, can fan out into multiple maps and
-// then merge at the end the maps.
type Flamegraph struct {
- collapsed map[string]map[types.TraceId]counter
- Ch chan *event.Pair
- Done chan struct{}
+ Ch chan *event.Pair
+ Done chan struct{}
+ workers []worker
}
func New() Flamegraph {
- return Flamegraph{
- collapsed: make(map[string]map[types.TraceId]counter),
- Ch: make(chan *event.Pair, 4096),
- Done: make(chan struct{}),
+ f := Flamegraph{
+ Ch: make(chan *event.Pair, 4096),
+ Done: make(chan struct{}),
+ }
+ for range runtime.NumCPU() / 2 {
+ f.workers = append(f.workers, newWorker())
}
+ return f
}
func (f Flamegraph) Start(ctx context.Context) {
go func() {
- for {
- select {
- case ev := <-f.Ch:
- filePath := ev.File.Name()
- pathMap, ok := f.collapsed[filePath]
- if !ok {
- pathMap = make(map[types.TraceId]counter)
- }
-
- traceId := ev.EnterEv.GetTraceId()
- cnt := pathMap[traceId]
- cnt.count++
- cnt.duration += ev.Duration
- pathMap[traceId] = cnt
-
- f.collapsed[filePath] = pathMap
- ev.RecyclePrev()
-
- default:
- select {
- case <-ctx.Done():
- defer close(f.Done)
- fmt.Println("Flamegraph processed last event")
- f.dumpCollapsed()
- return
- default:
- time.Sleep(time.Millisecond * 10)
- }
- }
- }
- }()
-}
-
-func (f Flamegraph) dumpCollapsed() {
- var wg sync.WaitGroup
- wg.Add(4)
+ defer close(f.Done)
+ var wg sync.WaitGroup
+ wg.Add(len(f.workers))
- go f.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt counter) uint64 {
- return cnt.count
- })
- go f.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt counter) uint64 {
- return cnt.duration
- })
- go f.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt counter) uint64 {
- return cnt.count
- })
- go f.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt counter) uint64 {
- return cnt.duration
- })
-
- wg.Wait()
-}
-
-func (f Flamegraph) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(counter) uint64) {
- defer wg.Done()
-
- fmt.Println("Dumping", outfile)
- file, err := os.Create(outfile)
- if err != nil {
- panic(err)
- }
- defer file.Close()
-
- for path, value := range f.collapsed {
- var sb strings.Builder
-
- for i, part := range strings.Split(path, "/") {
- if i > 1 {
- sb.WriteString(";")
- sb.WriteString("/")
- }
- sb.WriteString(part)
+ for i, worker := range f.workers {
+ fmt.Println("Starting flamegraph worker", i)
+ go worker.run(ctx, &wg, f.Ch)
}
+ wg.Wait()
- for traceId, cnt := range value {
- var err error
- if syscallAtTop {
- _, err = fmt.Fprintf(file, "%s;syscall`%s %v\n", sb.String(), traceId.Name(), by(cnt))
- } else {
- _, err = fmt.Fprintf(file, "syscall`%s;%s %v\n", traceId.Name(), sb.String(), by(cnt))
- }
- if err != nil {
- panic(err)
+ collapsed := f.workers[0].collapsed
+ if len(f.workers) > 1 {
+ for i, c := range f.workers[1:] {
+ fmt.Println("Worker", i+1, "merged", collapsed.merge(c.collapsed),
+ "counters =>", len(collapsed), "total counters")
}
}
- }
+ collapsed.dump()
+ }()
}
diff --git a/internal/flamegraph/worker.go b/internal/flamegraph/worker.go
new file mode 100644
index 0000000..e590163
--- /dev/null
+++ b/internal/flamegraph/worker.go
@@ -0,0 +1,52 @@
+package flamegraph
+
+import (
+ "context"
+ "ior/internal/event"
+ "ior/internal/types"
+ "sync"
+ "time"
+)
+
+type worker struct {
+ collapsed collapsed
+ done chan struct{}
+}
+
+func newWorker() worker {
+ return worker{collapsed: make(collapsed)}
+}
+
+// Run until ch is closed or has no more events and ctx is done.
+func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pair) {
+ defer wg.Done()
+
+ for {
+ select {
+ case ev := <-ch:
+ filePath := ev.File.Name()
+ pathMap, ok := w.collapsed[filePath]
+ if !ok {
+ pathMap = make(map[types.TraceId]counter)
+ }
+
+ traceId := ev.EnterEv.GetTraceId()
+ cnt := pathMap[traceId]
+ cnt.count++
+ cnt.duration += ev.Duration
+ pathMap[traceId] = cnt
+
+ w.collapsed[filePath] = pathMap
+ // TODO: Enable Go race detector
+ ev.Recycle()
+
+ default:
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ time.Sleep(time.Millisecond * 10)
+ }
+ }
+ }
+}