summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-03 20:04:49 +0200
committerPaul Buetow <paul@buetow.org>2026-03-03 20:04:49 +0200
commit9c7b4517c3d7bf73a0f0b32df1bc061c9e8d8120 (patch)
tree5544688f2ab15b21eb35da9fa96a592e0dff2d4c
parent1207e1e274288cdd4f52636740760705c0f6329d (diff)
refactor(storage): separate DB operations into storage package (task 334)
-rw-r--r--internal/goprecords/aggregate.go9
-rw-r--r--internal/goprecords/aggregate_map.go10
-rw-r--r--internal/goprecords/db.go140
-rw-r--r--internal/storage/db.go189
4 files changed, 216 insertions, 132 deletions
diff --git a/internal/goprecords/aggregate.go b/internal/goprecords/aggregate.go
index e75d486..7f7528c 100644
--- a/internal/goprecords/aggregate.go
+++ b/internal/goprecords/aggregate.go
@@ -96,15 +96,6 @@ func processRecordsFile(ctx context.Context, path, host string, out *Aggregates)
return nil
}
-func getOrNewAggregate(m map[string]*Aggregate, name string) *Aggregate {
- if a, ok := m[name]; ok {
- return a
- }
- a := NewAggregate(name)
- m[name] = a
- return a
-}
-
func lastKernelFromFile(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
diff --git a/internal/goprecords/aggregate_map.go b/internal/goprecords/aggregate_map.go
new file mode 100644
index 0000000..5c4f49d
--- /dev/null
+++ b/internal/goprecords/aggregate_map.go
@@ -0,0 +1,10 @@
+package goprecords
+
+func getOrNewAggregate(m map[string]*Aggregate, name string) *Aggregate {
+ if a, ok := m[name]; ok {
+ return a
+ }
+ a := NewAggregate(name)
+ m[name] = a
+ return a
+}
diff --git a/internal/goprecords/db.go b/internal/goprecords/db.go
index 8fa8b60..a07f6be 100644
--- a/internal/goprecords/db.go
+++ b/internal/goprecords/db.go
@@ -1,135 +1,40 @@
package goprecords
import (
- "bufio"
"context"
"database/sql"
- "fmt"
- "os"
- "path/filepath"
- "strings"
+ "github.com/goprecords/internal/storage"
_ "modernc.org/sqlite"
)
-const schemaSQL = `
-CREATE TABLE IF NOT EXISTS record (
- host TEXT NOT NULL,
- uptime_sec INTEGER NOT NULL,
- boot_time INTEGER NOT NULL,
- os TEXT NOT NULL,
- os_kernel_name TEXT NOT NULL,
- os_kernel_major TEXT NOT NULL
-);
-CREATE INDEX IF NOT EXISTS idx_record_host ON record(host);
-CREATE INDEX IF NOT EXISTS idx_record_os ON record(os);
-CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name);
-CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major);
-`
-
// OpenDB opens the SQLite database at path, creating the file if needed.
func OpenDB(path string) (*sql.DB, error) {
- db, err := sql.Open("sqlite", path)
- if err != nil {
- return nil, err
- }
- if _, err := db.Exec("PRAGMA foreign_keys = OFF"); err != nil {
- db.Close()
- return nil, err
- }
- return db, nil
+ return storage.Open(path)
}
// CreateSchema creates the record table and indexes (idempotent).
func CreateSchema(ctx context.Context, db *sql.DB) error {
- _, err := db.ExecContext(ctx, schemaSQL)
- return err
+ return storage.CreateSchema(ctx, db)
}
// ResetRecords removes all rows so import is repeatable.
func ResetRecords(ctx context.Context, db *sql.DB) error {
- _, err := db.ExecContext(ctx, "DELETE FROM record")
- return err
+ return storage.ResetRecords(ctx, db)
}
// ImportFromDir reads all .records files from statsDir and inserts into the DB.
// Resets the record table first so the run is repeatable.
func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error {
- if err := ResetRecords(ctx, db); err != nil {
- return fmt.Errorf("reset records: %w", err)
- }
- entries, err := os.ReadDir(statsDir)
- if err != nil {
- return fmt.Errorf("read dir: %w", err)
- }
-
- // Use a transaction for better performance with multiple inserts
- tx, err := db.BeginTx(ctx, nil)
- if err != nil {
- return fmt.Errorf("begin transaction: %w", err)
- }
- defer tx.Rollback()
-
- insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)")
- if err != nil {
- return fmt.Errorf("prepare insert: %w", err)
- }
- defer insert.Close()
-
- for _, e := range entries {
- if e.IsDir() || !strings.HasSuffix(e.Name(), ".records") {
- continue
- }
- path := filepath.Join(statsDir, e.Name())
- info, err := os.Stat(path)
- if err != nil || info.Size() == 0 {
- continue
- }
- host := strings.TrimSuffix(e.Name(), filepath.Ext(e.Name()))
- if idx := strings.Index(host, "."); idx > 0 {
- host = host[:idx]
- }
- if err := importFile(ctx, insert, path, host); err != nil {
- return err
- }
- }
-
- if err := tx.Commit(); err != nil {
- return fmt.Errorf("commit transaction: %w", err)
- }
- return nil
-}
-
-func importFile(ctx context.Context, insert *sql.Stmt, path, host string) error {
- f, err := os.Open(path)
- if err != nil {
- return fmt.Errorf("open %s: %w", path, err)
- }
- defer f.Close()
- sc := bufio.NewScanner(f)
- for sc.Scan() {
- rec, ok := parseRecordLine(sc.Text())
- if !ok {
- continue
- }
- if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil {
- return fmt.Errorf("insert: %w", err)
- }
- }
- if err := sc.Err(); err != nil {
- return fmt.Errorf("scan %s: %w", path, err)
- }
- return nil
+ return storage.ImportFromDir(ctx, db, statsDir)
}
// LoadAggregates reads all rows from the DB and builds Aggregates (same shape as file-based aggregation).
func LoadAggregates(ctx context.Context, db *sql.DB) (*Aggregates, error) {
- rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time")
+ records, err := storage.LoadRecords(ctx, db)
if err != nil {
- return nil, fmt.Errorf("query: %w", err)
+ return nil, err
}
- defer rows.Close()
-
out := &Aggregates{
Host: make(map[string]*HostAggregate),
Kernel: make(map[string]*Aggregate),
@@ -139,29 +44,18 @@ func LoadAggregates(ctx context.Context, db *sql.DB) (*Aggregates, error) {
hostMaxBoot := make(map[string]int64)
hostLastKernel := make(map[string]string)
- for rows.Next() {
- var host string
- var uptimeSec, bootTime int64
- var osStr, osKernelName, osKernelMajor string
- if err := rows.Scan(&host, &uptimeSec, &bootTime, &osStr, &osKernelName, &osKernelMajor); err != nil {
- return nil, fmt.Errorf("scan row: %w", err)
+ for _, rec := range records {
+ if rec.BootTime >= uint64(hostMaxBoot[rec.Host]) {
+ hostMaxBoot[rec.Host] = int64(rec.BootTime)
+ hostLastKernel[rec.Host] = rec.OS
}
- uptime := uint64(uptimeSec)
- boot := uint64(bootTime)
- if boot >= uint64(hostMaxBoot[host]) {
- hostMaxBoot[host] = int64(boot)
- hostLastKernel[host] = osStr
+ if _, ok := out.Host[rec.Host]; !ok {
+ out.Host[rec.Host] = NewHostAggregate(rec.Host, "")
}
- if _, ok := out.Host[host]; !ok {
- out.Host[host] = NewHostAggregate(host, "")
- }
- out.Host[host].AddRecord(uptime, boot)
- getOrNewAggregate(out.Kernel, osStr).AddRecord(uptime, boot)
- getOrNewAggregate(out.KernelName, osKernelName).AddRecord(uptime, boot)
- getOrNewAggregate(out.KernelMajor, osKernelMajor).AddRecord(uptime, boot)
- }
- if err := rows.Err(); err != nil {
- return nil, fmt.Errorf("rows: %w", err)
+ out.Host[rec.Host].AddRecord(rec.Uptime, rec.BootTime)
+ getOrNewAggregate(out.Kernel, rec.OS).AddRecord(rec.Uptime, rec.BootTime)
+ getOrNewAggregate(out.KernelName, rec.KernelName).AddRecord(rec.Uptime, rec.BootTime)
+ getOrNewAggregate(out.KernelMajor, rec.KernelMajor).AddRecord(rec.Uptime, rec.BootTime)
}
for host, h := range out.Host {
h.LastKernel = hostLastKernel[host]
diff --git a/internal/storage/db.go b/internal/storage/db.go
new file mode 100644
index 0000000..127f45d
--- /dev/null
+++ b/internal/storage/db.go
@@ -0,0 +1,189 @@
+package storage
+
+import (
+ "bufio"
+ "context"
+ "database/sql"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+
+ _ "modernc.org/sqlite"
+)
+
+const schemaSQL = `
+CREATE TABLE IF NOT EXISTS record (
+ host TEXT NOT NULL,
+ uptime_sec INTEGER NOT NULL,
+ boot_time INTEGER NOT NULL,
+ os TEXT NOT NULL,
+ os_kernel_name TEXT NOT NULL,
+ os_kernel_major TEXT NOT NULL
+);
+CREATE INDEX IF NOT EXISTS idx_record_host ON record(host);
+CREATE INDEX IF NOT EXISTS idx_record_os ON record(os);
+CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name);
+CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major);
+`
+
+type Record struct {
+ Host string
+ Uptime uint64
+ BootTime uint64
+ OS string
+ KernelName string
+ KernelMajor string
+}
+
+func Open(path string) (*sql.DB, error) {
+ db, err := sql.Open("sqlite", path)
+ if err != nil {
+ return nil, err
+ }
+ if _, err := db.Exec("PRAGMA foreign_keys = OFF"); err != nil {
+ db.Close()
+ return nil, err
+ }
+ return db, nil
+}
+
+func CreateSchema(ctx context.Context, db *sql.DB) error {
+ _, err := db.ExecContext(ctx, schemaSQL)
+ return err
+}
+
+func ResetRecords(ctx context.Context, db *sql.DB) error {
+ _, err := db.ExecContext(ctx, "DELETE FROM record")
+ return err
+}
+
+func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error {
+ if err := ResetRecords(ctx, db); err != nil {
+ return fmt.Errorf("reset records: %w", err)
+ }
+ entries, err := os.ReadDir(statsDir)
+ if err != nil {
+ return fmt.Errorf("read dir: %w", err)
+ }
+ tx, err := db.BeginTx(ctx, nil)
+ if err != nil {
+ return fmt.Errorf("begin transaction: %w", err)
+ }
+ defer tx.Rollback()
+ insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)")
+ if err != nil {
+ return fmt.Errorf("prepare insert: %w", err)
+ }
+ defer insert.Close()
+ for _, e := range entries {
+ if e.IsDir() || !strings.HasSuffix(e.Name(), ".records") {
+ continue
+ }
+ path := filepath.Join(statsDir, e.Name())
+ info, err := os.Stat(path)
+ if err != nil || info.Size() == 0 {
+ continue
+ }
+ host := strings.TrimSuffix(e.Name(), filepath.Ext(e.Name()))
+ if idx := strings.Index(host, "."); idx > 0 {
+ host = host[:idx]
+ }
+ if err := importFile(ctx, insert, path, host); err != nil {
+ return err
+ }
+ }
+ if err := tx.Commit(); err != nil {
+ return fmt.Errorf("commit transaction: %w", err)
+ }
+ return nil
+}
+
+func LoadRecords(ctx context.Context, db *sql.DB) ([]Record, error) {
+ rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time")
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+ var out []Record
+ for rows.Next() {
+ var rec Record
+ var uptimeSec, bootTime int64
+ if err := rows.Scan(&rec.Host, &uptimeSec, &bootTime, &rec.OS, &rec.KernelName, &rec.KernelMajor); err != nil {
+ return nil, fmt.Errorf("scan row: %w", err)
+ }
+ rec.Uptime = uint64(uptimeSec)
+ rec.BootTime = uint64(bootTime)
+ out = append(out, rec)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows: %w", err)
+ }
+ return out, nil
+}
+
+type recordLine struct {
+ Uptime uint64
+ BootTime uint64
+ OS string
+ KernelName string
+ KernelMajor string
+}
+
+func importFile(ctx context.Context, insert *sql.Stmt, path, host string) error {
+ f, err := os.Open(path)
+ if err != nil {
+ return fmt.Errorf("open %s: %w", path, err)
+ }
+ defer f.Close()
+ sc := bufio.NewScanner(f)
+ for sc.Scan() {
+ rec, ok := parseRecordLine(sc.Text())
+ if !ok {
+ continue
+ }
+ if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil {
+ return fmt.Errorf("insert: %w", err)
+ }
+ }
+ if err := sc.Err(); err != nil {
+ return fmt.Errorf("scan %s: %w", path, err)
+ }
+ return nil
+}
+
+func parseRecordLine(line string) (recordLine, bool) {
+ line = strings.TrimSpace(line)
+ if line == "" {
+ return recordLine{}, false
+ }
+ parts := strings.SplitN(line, ":", 3)
+ if len(parts) != 3 {
+ return recordLine{}, false
+ }
+ uptime, _ := strconv.ParseUint(parts[0], 10, 64)
+ bootTime, _ := strconv.ParseUint(parts[1], 10, 64)
+ osStr := parts[2]
+ kernelName := osStr
+ if i := strings.Index(osStr, " "); i > 0 {
+ kernelName = osStr[:i]
+ }
+ kernelMajor := kernelName + " "
+ rest := osStr
+ if i := strings.Index(osStr, " "); i >= 0 {
+ rest = osStr[i+1:]
+ }
+ if j := strings.Index(rest, "."); j >= 0 {
+ kernelMajor += rest[:j] + "..."
+ } else {
+ kernelMajor += rest + "..."
+ }
+ return recordLine{
+ Uptime: uptime,
+ BootTime: bootTime,
+ OS: osStr,
+ KernelName: kernelName,
+ KernelMajor: kernelMajor,
+ }, true
+}