summaryrefslogtreecommitdiff
path: root/internal/stats/stats.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-09-17 21:33:45 +0300
committerPaul Buetow <paul@buetow.org>2025-09-17 21:33:45 +0300
commit88103657fb230bb41217a06aa5602ae23e7acb8b (patch)
tree524c437e4e40ee5d6713b6ea5414ad975654cc52 /internal/stats/stats.go
parent2b6232704ecc90630196b9f829f966533e5cdccd (diff)
feat(stats,tmux): global Σ@window stats across processes with flocked cache; width mitigation (narrow/maxlen); configurable [stats] window_minutes; robust coverage parsing; docs update\n\n- Add internal/stats with windowed event cache + flock + atomic writes\n- Wire stats into LSP/CLI/Tmux Action; tmux shows Σ@window with per-model tail\n- HEXAI_TMUX_STATUS_NARROW and HEXAI_TMUX_STATUS_MAXLEN for width control\n- Add [stats] window_minutes to config and apply on startup\n- Improve Magefile coverage handling; add tests to lift coverage >85%\n- Update docs/tmux.md and config example
Diffstat (limited to 'internal/stats/stats.go')
-rw-r--r--internal/stats/stats.go247
1 files changed, 247 insertions, 0 deletions
diff --git a/internal/stats/stats.go b/internal/stats/stats.go
new file mode 100644
index 0000000..3a9a9ab
--- /dev/null
+++ b/internal/stats/stats.go
@@ -0,0 +1,247 @@
+// Package stats provides a simple, process-safe, on-disk cache of Hexai LLM usage
+// statistics shared across all binaries. It appends compact events (ts, provider,
+// model, sent, recv) to a JSON file guarded by an advisory file lock, prunes
+// entries older than the configured window (default 1h), and computes aggregated
+// snapshots for display in logs and tmux status.
+package stats
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "sync/atomic"
+ "syscall"
+ "time"
+)
+
+const (
+ fileName = "stats.json"
+ lockFileName = "stats.lock"
+ fileVersion = 1
+ defaultWindow = time.Hour
+)
+
+var windowSeconds int64 = int64(defaultWindow.Seconds())
+
+// SetWindow sets the sliding window used for pruning and aggregation.
+func SetWindow(d time.Duration) {
+ if d < time.Second {
+ d = time.Second
+ }
+ if d > 24*time.Hour {
+ d = 24 * time.Hour
+ }
+ atomic.StoreInt64(&windowSeconds, int64(d.Seconds()))
+}
+
+// Window returns the current sliding window.
+func Window() time.Duration { return time.Duration(atomic.LoadInt64(&windowSeconds)) * time.Second }
+
+// Event represents a single request/response with sizes.
+type Event struct {
+ TS time.Time `json:"ts"`
+ Provider string `json:"provider"`
+ Model string `json:"model"`
+ Sent int64 `json:"sent"`
+ Recv int64 `json:"recv"`
+}
+
+// File is the on-disk JSON structure.
+type File struct {
+ Version int `json:"version"`
+ UpdatedAt time.Time `json:"updated_at"`
+ WindowSeconds int `json:"window_seconds"`
+ Events []Event `json:"events"`
+}
+
+// Counters and Snapshot represent computed aggregates for the current window.
+type Counters struct{ Reqs, Sent, Recv int64 }
+
+type ProviderEntry struct {
+ Totals Counters
+ Models map[string]Counters
+}
+
+type Snapshot struct {
+ Global Counters
+ Providers map[string]ProviderEntry
+ RPM float64
+ Window time.Duration
+}
+
+// Update appends one event and prunes old entries under lock.
+func Update(ctx context.Context, provider, model string, sentBytes, recvBytes int) error {
+ dir, err := CacheDir()
+ if err != nil {
+ return err
+ }
+ if err := os.MkdirAll(dir, 0o755); err != nil {
+ return err
+ }
+ lockPath := filepath.Join(dir, lockFileName)
+ f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0o600)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ // Acquire exclusive flock; best-effort ctx support via polling
+ for {
+ if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err == nil {
+ defer syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+ break
+ }
+ // Wait a bit or exit if context canceled
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(5 * time.Millisecond):
+ }
+ }
+ // Read existing file (if any)
+ path := filepath.Join(dir, fileName)
+ var sf File
+ if b, rerr := os.ReadFile(path); rerr == nil {
+ _ = json.Unmarshal(b, &sf)
+ }
+ if sf.Version != fileVersion {
+ sf = File{Version: fileVersion}
+ }
+ now := time.Now()
+ win := Window()
+ sf.WindowSeconds = int(win.Seconds())
+ // Append event
+ sf.Events = append(sf.Events, Event{TS: now, Provider: provider, Model: model, Sent: int64(sentBytes), Recv: int64(recvBytes)})
+ // Prune old
+ cutoff := now.Add(-win)
+ if len(sf.Events) > 0 {
+ // Find first >= cutoff
+ i := 0
+ for ; i < len(sf.Events); i++ {
+ if !sf.Events[i].TS.Before(cutoff) {
+ break
+ }
+ }
+ if i > 0 {
+ sf.Events = append([]Event(nil), sf.Events[i:]...)
+ }
+ }
+ sf.UpdatedAt = now
+ // Write atomically
+ tmp, err := os.CreateTemp(dir, fileName+".tmp.")
+ if err != nil {
+ return err
+ }
+ enc := json.NewEncoder(tmp)
+ enc.SetEscapeHTML(false)
+ if err := enc.Encode(&sf); err != nil {
+ tmp.Close()
+ os.Remove(tmp.Name())
+ return err
+ }
+ if err := tmp.Sync(); err != nil {
+ tmp.Close()
+ os.Remove(tmp.Name())
+ return err
+ }
+ if err := tmp.Close(); err != nil {
+ os.Remove(tmp.Name())
+ return err
+ }
+ if err := os.Rename(tmp.Name(), path); err != nil {
+ os.Remove(tmp.Name())
+ return err
+ }
+ return nil
+}
+
+// Snapshot reads and aggregates events within the configured window.
+func TakeSnapshot() (Snapshot, error) {
+ dir, err := CacheDir()
+ if err != nil {
+ return Snapshot{}, err
+ }
+ path := filepath.Join(dir, fileName)
+ b, err := os.ReadFile(path)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ return Snapshot{Providers: map[string]ProviderEntry{}, Window: Window()}, nil
+ }
+ return Snapshot{}, err
+ }
+ var sf File
+ if err := json.Unmarshal(b, &sf); err != nil {
+ return Snapshot{}, err
+ }
+ win := time.Duration(sf.WindowSeconds) * time.Second
+ if win <= 0 {
+ win = Window()
+ } else {
+ SetWindow(win) // align process with file window if changed elsewhere
+ }
+ cutoff := time.Now().Add(-win)
+ snap := Snapshot{Providers: make(map[string]ProviderEntry), Window: win}
+ for _, ev := range sf.Events {
+ if ev.TS.Before(cutoff) {
+ continue
+ }
+ snap.Global.Reqs++
+ snap.Global.Sent += ev.Sent
+ snap.Global.Recv += ev.Recv
+ pe := snap.Providers[ev.Provider]
+ if pe.Models == nil {
+ pe.Models = make(map[string]Counters)
+ }
+ pe.Totals.Reqs++
+ pe.Totals.Sent += ev.Sent
+ pe.Totals.Recv += ev.Recv
+ mc := pe.Models[ev.Model]
+ mc.Reqs++
+ mc.Sent += ev.Sent
+ mc.Recv += ev.Recv
+ pe.Models[ev.Model] = mc
+ snap.Providers[ev.Provider] = pe
+ }
+ mins := win.Minutes()
+ if mins <= 0 {
+ mins = 0.001
+ }
+ snap.RPM = float64(snap.Global.Reqs) / mins
+ return snap, nil
+}
+
+// CacheDir resolves the cache directory for stats.
+func CacheDir() (string, error) {
+ if x := os.Getenv("XDG_CACHE_HOME"); stringsTrim(x) != "" {
+ return filepath.Join(x, "hexai"), nil
+ }
+ home, err := os.UserHomeDir()
+ if err != nil {
+ return "", fmt.Errorf("cannot resolve home: %w", err)
+ }
+ return filepath.Join(home, ".cache", "hexai"), nil
+}
+
+// stringsTrim is a tiny helper to avoid importing strings everywhere here.
+func stringsTrim(s string) string {
+ i := 0
+ j := len(s)
+ for i < j && (s[i] == ' ' || s[i] == '\t' || s[i] == '\n' || s[i] == '\r') {
+ i++
+ }
+ for j > i && (s[j-1] == ' ' || s[j-1] == '\t' || s[j-1] == '\n' || s[j-1] == '\r') {
+ j--
+ }
+ if i == 0 && j == len(s) {
+ return s
+ }
+ return s[i:j]
+}
+
+// DebugString returns a compact single-line view of a snapshot (useful for logs).
+func (s Snapshot) DebugString() string {
+ return "Σ reqs=" + strconv.FormatInt(s.Global.Reqs, 10) + " rpm=" + fmt.Sprintf("%.2f", s.RPM)
+}