diff options
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | go.sum | 4 | ||||
| -rw-r--r-- | internal/eventfilter.go | 2 | ||||
| -rw-r--r-- | internal/flamegraph/collapsed.go | 95 | ||||
| -rw-r--r-- | internal/flamegraph/flamegraph.go | 124 | ||||
| -rw-r--r-- | internal/flamegraph/worker.go | 52 |
6 files changed, 178 insertions, 101 deletions
@@ -1,5 +1,5 @@ module ior -go 1.18 +go 1.23 require github.com/aquasecurity/libbpfgo v0.6.0-libbpf-1.3.0.20240111220235-90dbffffbdab @@ -1,6 +1,10 @@ github.com/aquasecurity/libbpfgo v0.6.0-libbpf-1.3.0.20240111220235-90dbffffbdab h1:w74AraWsnj+AgEOk2uERlLtECCWutMtuwCGCCWzpBBs= github.com/aquasecurity/libbpfgo v0.6.0-libbpf-1.3.0.20240111220235-90dbffffbdab/go.mod h1:0rEApF1YBHGuZ4C8OYI9q5oDBVpgqtRqYATePl9mCDk= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/eventfilter.go b/internal/eventfilter.go index ae58cb2..c249f20 100644 --- a/internal/eventfilter.go +++ b/internal/eventfilter.go @@ -8,7 +8,7 @@ import ( "strings" ) -// TODO: Move to event package +// TODO: Move to event package? type eventFilter struct { commFilterEnable bool commFilter string diff --git a/internal/flamegraph/collapsed.go b/internal/flamegraph/collapsed.go new file mode 100644 index 0000000..f61d917 --- /dev/null +++ b/internal/flamegraph/collapsed.go @@ -0,0 +1,95 @@ +package flamegraph + +import ( + "fmt" + "ior/internal/types" + "os" + "strings" + "sync" +) + +type counter struct { + count uint64 + duration uint64 +} + +func (c *counter) merge(other counter) { + c.count += other.count + c.duration += other.duration +} + +type collapsed map[string]map[types.TraceId]counter + +// TODO: Unit test this +func (c collapsed) merge(other collapsed) (merged int) { + for k, v := range other { + if _, ok := c[k]; !ok { + c[k] = make(map[types.TraceId]counter) + } + for traceId, cnt := range v { + if existingCnt, ok := c[k][traceId]; ok { + existingCnt.merge(cnt) + merged++ + c[k][traceId] = existingCnt + continue + } + c[k][traceId] = cnt + } + } + return +} + +func (c collapsed) dump() { + var wg sync.WaitGroup + wg.Add(4) + + go c.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt counter) uint64 { + return cnt.count + }) + go c.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt counter) uint64 { + return cnt.duration + }) + go c.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt counter) uint64 { + return cnt.count + }) + go c.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt counter) uint64 { + return cnt.duration + }) + + wg.Wait() +} + +func (c collapsed) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(counter) uint64) { + defer wg.Done() + + fmt.Println("Dumping", outfile) + file, err := os.Create(outfile) + if err != nil { + panic(err) + } + defer file.Close() + + for path, value := range c { + var sb strings.Builder + + for i, part := range strings.Split(path, "/") { + if i > 1 { + sb.WriteString(";") + sb.WriteString("/") + } + sb.WriteString(part) + } + + for traceId, cnt := range value { + var err error + if syscallAtTop { + _, err = fmt.Fprintf(file, "%s;syscall`%s %v\n", sb.String(), traceId.Name(), by(cnt)) + } else { + _, err = fmt.Fprintf(file, "syscall`%s;%s %v\n", traceId.Name(), sb.String(), by(cnt)) + } + if err != nil { + panic(err) + } + } + } +} diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go index 9c51cb2..223389d 100644 --- a/internal/flamegraph/flamegraph.go +++ b/internal/flamegraph/flamegraph.go @@ -4,122 +4,48 @@ import ( "context" "fmt" "ior/internal/event" - "ior/internal/types" - "os" - "strings" + "runtime" "sync" - "time" ) -type counter struct { - count uint64 - duration uint64 -} - // TODO: Add Command in path! Make it configurable? comm/syscall/path, or path/syscall/comm, etc... // TODO: Idea, show time spent between the syscalls (off syscalls) as well, but in a different color -// TODO: Profile for CPU usage. If too slow, can fan out into multiple maps and -// then merge at the end the maps. type Flamegraph struct { - collapsed map[string]map[types.TraceId]counter - Ch chan *event.Pair - Done chan struct{} + Ch chan *event.Pair + Done chan struct{} + workers []worker } func New() Flamegraph { - return Flamegraph{ - collapsed: make(map[string]map[types.TraceId]counter), - Ch: make(chan *event.Pair, 4096), - Done: make(chan struct{}), + f := Flamegraph{ + Ch: make(chan *event.Pair, 4096), + Done: make(chan struct{}), + } + for range runtime.NumCPU() / 2 { + f.workers = append(f.workers, newWorker()) } + return f } func (f Flamegraph) Start(ctx context.Context) { go func() { - for { - select { - case ev := <-f.Ch: - filePath := ev.File.Name() - pathMap, ok := f.collapsed[filePath] - if !ok { - pathMap = make(map[types.TraceId]counter) - } - - traceId := ev.EnterEv.GetTraceId() - cnt := pathMap[traceId] - cnt.count++ - cnt.duration += ev.Duration - pathMap[traceId] = cnt - - f.collapsed[filePath] = pathMap - ev.RecyclePrev() - - default: - select { - case <-ctx.Done(): - defer close(f.Done) - fmt.Println("Flamegraph processed last event") - f.dumpCollapsed() - return - default: - time.Sleep(time.Millisecond * 10) - } - } - } - }() -} - -func (f Flamegraph) dumpCollapsed() { - var wg sync.WaitGroup - wg.Add(4) + defer close(f.Done) + var wg sync.WaitGroup + wg.Add(len(f.workers)) - go f.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt counter) uint64 { - return cnt.count - }) - go f.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt counter) uint64 { - return cnt.duration - }) - go f.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt counter) uint64 { - return cnt.count - }) - go f.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt counter) uint64 { - return cnt.duration - }) - - wg.Wait() -} - -func (f Flamegraph) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(counter) uint64) { - defer wg.Done() - - fmt.Println("Dumping", outfile) - file, err := os.Create(outfile) - if err != nil { - panic(err) - } - defer file.Close() - - for path, value := range f.collapsed { - var sb strings.Builder - - for i, part := range strings.Split(path, "/") { - if i > 1 { - sb.WriteString(";") - sb.WriteString("/") - } - sb.WriteString(part) + for i, worker := range f.workers { + fmt.Println("Starting flamegraph worker", i) + go worker.run(ctx, &wg, f.Ch) } + wg.Wait() - for traceId, cnt := range value { - var err error - if syscallAtTop { - _, err = fmt.Fprintf(file, "%s;syscall`%s %v\n", sb.String(), traceId.Name(), by(cnt)) - } else { - _, err = fmt.Fprintf(file, "syscall`%s;%s %v\n", traceId.Name(), sb.String(), by(cnt)) - } - if err != nil { - panic(err) + collapsed := f.workers[0].collapsed + if len(f.workers) > 1 { + for i, c := range f.workers[1:] { + fmt.Println("Worker", i+1, "merged", collapsed.merge(c.collapsed), + "counters =>", len(collapsed), "total counters") } } - } + collapsed.dump() + }() } diff --git a/internal/flamegraph/worker.go b/internal/flamegraph/worker.go new file mode 100644 index 0000000..e590163 --- /dev/null +++ b/internal/flamegraph/worker.go @@ -0,0 +1,52 @@ +package flamegraph + +import ( + "context" + "ior/internal/event" + "ior/internal/types" + "sync" + "time" +) + +type worker struct { + collapsed collapsed + done chan struct{} +} + +func newWorker() worker { + return worker{collapsed: make(collapsed)} +} + +// Run until ch is closed or has no more events and ctx is done. +func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pair) { + defer wg.Done() + + for { + select { + case ev := <-ch: + filePath := ev.File.Name() + pathMap, ok := w.collapsed[filePath] + if !ok { + pathMap = make(map[types.TraceId]counter) + } + + traceId := ev.EnterEv.GetTraceId() + cnt := pathMap[traceId] + cnt.count++ + cnt.duration += ev.Duration + pathMap[traceId] = cnt + + w.collapsed[filePath] = pathMap + // TODO: Enable Go race detector + ev.Recycle() + + default: + select { + case <-ctx.Done(): + return + default: + time.Sleep(time.Millisecond * 10) + } + } + } +} |
