diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-03 20:04:49 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-03 20:04:49 +0200 |
| commit | 9c7b4517c3d7bf73a0f0b32df1bc061c9e8d8120 (patch) | |
| tree | 5544688f2ab15b21eb35da9fa96a592e0dff2d4c | |
| parent | 1207e1e274288cdd4f52636740760705c0f6329d (diff) | |
refactor(storage): separate DB operations into storage package (task 334)
| -rw-r--r-- | internal/goprecords/aggregate.go | 9 | ||||
| -rw-r--r-- | internal/goprecords/aggregate_map.go | 10 | ||||
| -rw-r--r-- | internal/goprecords/db.go | 140 | ||||
| -rw-r--r-- | internal/storage/db.go | 189 |
4 files changed, 216 insertions, 132 deletions
diff --git a/internal/goprecords/aggregate.go b/internal/goprecords/aggregate.go index e75d486..7f7528c 100644 --- a/internal/goprecords/aggregate.go +++ b/internal/goprecords/aggregate.go @@ -96,15 +96,6 @@ func processRecordsFile(ctx context.Context, path, host string, out *Aggregates) return nil } -func getOrNewAggregate(m map[string]*Aggregate, name string) *Aggregate { - if a, ok := m[name]; ok { - return a - } - a := NewAggregate(name) - m[name] = a - return a -} - func lastKernelFromFile(path string) (string, error) { f, err := os.Open(path) if err != nil { diff --git a/internal/goprecords/aggregate_map.go b/internal/goprecords/aggregate_map.go new file mode 100644 index 0000000..5c4f49d --- /dev/null +++ b/internal/goprecords/aggregate_map.go @@ -0,0 +1,10 @@ +package goprecords + +func getOrNewAggregate(m map[string]*Aggregate, name string) *Aggregate { + if a, ok := m[name]; ok { + return a + } + a := NewAggregate(name) + m[name] = a + return a +} diff --git a/internal/goprecords/db.go b/internal/goprecords/db.go index 8fa8b60..a07f6be 100644 --- a/internal/goprecords/db.go +++ b/internal/goprecords/db.go @@ -1,135 +1,40 @@ package goprecords import ( - "bufio" "context" "database/sql" - "fmt" - "os" - "path/filepath" - "strings" + "github.com/goprecords/internal/storage" _ "modernc.org/sqlite" ) -const schemaSQL = ` -CREATE TABLE IF NOT EXISTS record ( - host TEXT NOT NULL, - uptime_sec INTEGER NOT NULL, - boot_time INTEGER NOT NULL, - os TEXT NOT NULL, - os_kernel_name TEXT NOT NULL, - os_kernel_major TEXT NOT NULL -); -CREATE INDEX IF NOT EXISTS idx_record_host ON record(host); -CREATE INDEX IF NOT EXISTS idx_record_os ON record(os); -CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name); -CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major); -` - // OpenDB opens the SQLite database at path, creating the file if needed. func OpenDB(path string) (*sql.DB, error) { - db, err := sql.Open("sqlite", path) - if err != nil { - return nil, err - } - if _, err := db.Exec("PRAGMA foreign_keys = OFF"); err != nil { - db.Close() - return nil, err - } - return db, nil + return storage.Open(path) } // CreateSchema creates the record table and indexes (idempotent). func CreateSchema(ctx context.Context, db *sql.DB) error { - _, err := db.ExecContext(ctx, schemaSQL) - return err + return storage.CreateSchema(ctx, db) } // ResetRecords removes all rows so import is repeatable. func ResetRecords(ctx context.Context, db *sql.DB) error { - _, err := db.ExecContext(ctx, "DELETE FROM record") - return err + return storage.ResetRecords(ctx, db) } // ImportFromDir reads all .records files from statsDir and inserts into the DB. // Resets the record table first so the run is repeatable. func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error { - if err := ResetRecords(ctx, db); err != nil { - return fmt.Errorf("reset records: %w", err) - } - entries, err := os.ReadDir(statsDir) - if err != nil { - return fmt.Errorf("read dir: %w", err) - } - - // Use a transaction for better performance with multiple inserts - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("begin transaction: %w", err) - } - defer tx.Rollback() - - insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)") - if err != nil { - return fmt.Errorf("prepare insert: %w", err) - } - defer insert.Close() - - for _, e := range entries { - if e.IsDir() || !strings.HasSuffix(e.Name(), ".records") { - continue - } - path := filepath.Join(statsDir, e.Name()) - info, err := os.Stat(path) - if err != nil || info.Size() == 0 { - continue - } - host := strings.TrimSuffix(e.Name(), filepath.Ext(e.Name())) - if idx := strings.Index(host, "."); idx > 0 { - host = host[:idx] - } - if err := importFile(ctx, insert, path, host); err != nil { - return err - } - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit transaction: %w", err) - } - return nil -} - -func importFile(ctx context.Context, insert *sql.Stmt, path, host string) error { - f, err := os.Open(path) - if err != nil { - return fmt.Errorf("open %s: %w", path, err) - } - defer f.Close() - sc := bufio.NewScanner(f) - for sc.Scan() { - rec, ok := parseRecordLine(sc.Text()) - if !ok { - continue - } - if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil { - return fmt.Errorf("insert: %w", err) - } - } - if err := sc.Err(); err != nil { - return fmt.Errorf("scan %s: %w", path, err) - } - return nil + return storage.ImportFromDir(ctx, db, statsDir) } // LoadAggregates reads all rows from the DB and builds Aggregates (same shape as file-based aggregation). func LoadAggregates(ctx context.Context, db *sql.DB) (*Aggregates, error) { - rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time") + records, err := storage.LoadRecords(ctx, db) if err != nil { - return nil, fmt.Errorf("query: %w", err) + return nil, err } - defer rows.Close() - out := &Aggregates{ Host: make(map[string]*HostAggregate), Kernel: make(map[string]*Aggregate), @@ -139,29 +44,18 @@ func LoadAggregates(ctx context.Context, db *sql.DB) (*Aggregates, error) { hostMaxBoot := make(map[string]int64) hostLastKernel := make(map[string]string) - for rows.Next() { - var host string - var uptimeSec, bootTime int64 - var osStr, osKernelName, osKernelMajor string - if err := rows.Scan(&host, &uptimeSec, &bootTime, &osStr, &osKernelName, &osKernelMajor); err != nil { - return nil, fmt.Errorf("scan row: %w", err) + for _, rec := range records { + if rec.BootTime >= uint64(hostMaxBoot[rec.Host]) { + hostMaxBoot[rec.Host] = int64(rec.BootTime) + hostLastKernel[rec.Host] = rec.OS } - uptime := uint64(uptimeSec) - boot := uint64(bootTime) - if boot >= uint64(hostMaxBoot[host]) { - hostMaxBoot[host] = int64(boot) - hostLastKernel[host] = osStr + if _, ok := out.Host[rec.Host]; !ok { + out.Host[rec.Host] = NewHostAggregate(rec.Host, "") } - if _, ok := out.Host[host]; !ok { - out.Host[host] = NewHostAggregate(host, "") - } - out.Host[host].AddRecord(uptime, boot) - getOrNewAggregate(out.Kernel, osStr).AddRecord(uptime, boot) - getOrNewAggregate(out.KernelName, osKernelName).AddRecord(uptime, boot) - getOrNewAggregate(out.KernelMajor, osKernelMajor).AddRecord(uptime, boot) - } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("rows: %w", err) + out.Host[rec.Host].AddRecord(rec.Uptime, rec.BootTime) + getOrNewAggregate(out.Kernel, rec.OS).AddRecord(rec.Uptime, rec.BootTime) + getOrNewAggregate(out.KernelName, rec.KernelName).AddRecord(rec.Uptime, rec.BootTime) + getOrNewAggregate(out.KernelMajor, rec.KernelMajor).AddRecord(rec.Uptime, rec.BootTime) } for host, h := range out.Host { h.LastKernel = hostLastKernel[host] diff --git a/internal/storage/db.go b/internal/storage/db.go new file mode 100644 index 0000000..127f45d --- /dev/null +++ b/internal/storage/db.go @@ -0,0 +1,189 @@ +package storage + +import ( + "bufio" + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + _ "modernc.org/sqlite" +) + +const schemaSQL = ` +CREATE TABLE IF NOT EXISTS record ( + host TEXT NOT NULL, + uptime_sec INTEGER NOT NULL, + boot_time INTEGER NOT NULL, + os TEXT NOT NULL, + os_kernel_name TEXT NOT NULL, + os_kernel_major TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_record_host ON record(host); +CREATE INDEX IF NOT EXISTS idx_record_os ON record(os); +CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name); +CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major); +` + +type Record struct { + Host string + Uptime uint64 + BootTime uint64 + OS string + KernelName string + KernelMajor string +} + +func Open(path string) (*sql.DB, error) { + db, err := sql.Open("sqlite", path) + if err != nil { + return nil, err + } + if _, err := db.Exec("PRAGMA foreign_keys = OFF"); err != nil { + db.Close() + return nil, err + } + return db, nil +} + +func CreateSchema(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext(ctx, schemaSQL) + return err +} + +func ResetRecords(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext(ctx, "DELETE FROM record") + return err +} + +func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error { + if err := ResetRecords(ctx, db); err != nil { + return fmt.Errorf("reset records: %w", err) + } + entries, err := os.ReadDir(statsDir) + if err != nil { + return fmt.Errorf("read dir: %w", err) + } + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer tx.Rollback() + insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)") + if err != nil { + return fmt.Errorf("prepare insert: %w", err) + } + defer insert.Close() + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".records") { + continue + } + path := filepath.Join(statsDir, e.Name()) + info, err := os.Stat(path) + if err != nil || info.Size() == 0 { + continue + } + host := strings.TrimSuffix(e.Name(), filepath.Ext(e.Name())) + if idx := strings.Index(host, "."); idx > 0 { + host = host[:idx] + } + if err := importFile(ctx, insert, path, host); err != nil { + return err + } + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + return nil +} + +func LoadRecords(ctx context.Context, db *sql.DB) ([]Record, error) { + rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time") + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + var out []Record + for rows.Next() { + var rec Record + var uptimeSec, bootTime int64 + if err := rows.Scan(&rec.Host, &uptimeSec, &bootTime, &rec.OS, &rec.KernelName, &rec.KernelMajor); err != nil { + return nil, fmt.Errorf("scan row: %w", err) + } + rec.Uptime = uint64(uptimeSec) + rec.BootTime = uint64(bootTime) + out = append(out, rec) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows: %w", err) + } + return out, nil +} + +type recordLine struct { + Uptime uint64 + BootTime uint64 + OS string + KernelName string + KernelMajor string +} + +func importFile(ctx context.Context, insert *sql.Stmt, path, host string) error { + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("open %s: %w", path, err) + } + defer f.Close() + sc := bufio.NewScanner(f) + for sc.Scan() { + rec, ok := parseRecordLine(sc.Text()) + if !ok { + continue + } + if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil { + return fmt.Errorf("insert: %w", err) + } + } + if err := sc.Err(); err != nil { + return fmt.Errorf("scan %s: %w", path, err) + } + return nil +} + +func parseRecordLine(line string) (recordLine, bool) { + line = strings.TrimSpace(line) + if line == "" { + return recordLine{}, false + } + parts := strings.SplitN(line, ":", 3) + if len(parts) != 3 { + return recordLine{}, false + } + uptime, _ := strconv.ParseUint(parts[0], 10, 64) + bootTime, _ := strconv.ParseUint(parts[1], 10, 64) + osStr := parts[2] + kernelName := osStr + if i := strings.Index(osStr, " "); i > 0 { + kernelName = osStr[:i] + } + kernelMajor := kernelName + " " + rest := osStr + if i := strings.Index(osStr, " "); i >= 0 { + rest = osStr[i+1:] + } + if j := strings.Index(rest, "."); j >= 0 { + kernelMajor += rest[:j] + "..." + } else { + kernelMajor += rest + "..." + } + return recordLine{ + Uptime: uptime, + BootTime: bootTime, + OS: osStr, + KernelName: kernelName, + KernelMajor: kernelMajor, + }, true +} |
