diff options
| author | Paul Buetow <paul@buetow.org> | 2026-04-14 11:19:50 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-04-14 11:19:50 +0300 |
| commit | 55593e14ee2a4225d1db1058da9d8d1f663225b6 (patch) | |
| tree | 5fdd9830dd6411f348be28dbed2085e1806148ce /internal | |
| parent | 2bc4e64acf93f04c8871d964d75f041ada57f89d (diff) | |
refactor: use fs.FS for aggregate and DB import (ask v3)
Decouple Aggregator and ImportFromDir from direct os.Open by routing
through io/fs: NewAggregatorFS, ImportFromFS, recordsdir.ListNonEmptyFilesFS.
NewAggregator and ImportFromDir wrap os.DirFS for backward compatibility.
Add fstest.MapFS tests for recordsdir, aggregate, and ImportFromFS.
Made-with: Cursor
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/goprecords/aggregate.go | 33 | ||||
| -rw-r--r-- | internal/goprecords/aggregate_test.go | 50 | ||||
| -rw-r--r-- | internal/goprecords/db.go | 7 | ||||
| -rw-r--r-- | internal/goprecords/db_test.go | 33 | ||||
| -rw-r--r-- | internal/recordsdir/recordsdir.go | 45 | ||||
| -rw-r--r-- | internal/recordsdir/recordsdir_test.go | 16 | ||||
| -rw-r--r-- | internal/storage/db.go | 18 |
7 files changed, 170 insertions, 32 deletions
diff --git a/internal/goprecords/aggregate.go b/internal/goprecords/aggregate.go index 31a0e1c..aa9c3fe 100644 --- a/internal/goprecords/aggregate.go +++ b/internal/goprecords/aggregate.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "fmt" + "io/fs" "os" "codeberg.org/snonux/goprecords/internal/recordline" @@ -20,12 +21,18 @@ type Aggregates struct { // Aggregator reads .records files from a directory and builds Aggregates. type Aggregator struct { - statsDir string + fsys fs.FS + root string } // NewAggregator returns an Aggregator for the given stats directory. func NewAggregator(statsDir string) *Aggregator { - return &Aggregator{statsDir: statsDir} + return NewAggregatorFS(os.DirFS(statsDir)) +} + +// NewAggregatorFS returns an Aggregator that reads non-empty .records files from the root of fsys. +func NewAggregatorFS(fsys fs.FS) *Aggregator { + return &Aggregator{fsys: fsys, root: "."} } // Aggregate reads all .records files and returns aggregated data. @@ -36,32 +43,32 @@ func (ag *Aggregator) Aggregate(ctx context.Context) (*Aggregates, error) { KernelMajor: make(map[string]*Aggregate), KernelName: make(map[string]*Aggregate), } - files, err := recordsdir.ListNonEmptyFiles(ag.statsDir) + files, err := recordsdir.ListNonEmptyFilesFS(ag.fsys, ag.root) if err != nil { return nil, fmt.Errorf("read stats dir: %w", err) } for _, f := range files { host := f.Host - path := f.Path + relPath := f.Path if _, exists := out.Host[host]; exists { return nil, fmt.Errorf("record file for %s already processed - duplicate inputs?", host) } - lastKernel, err := lastKernelFromFile(ctx, path) + lastKernel, err := lastKernelFromFile(ctx, ag.fsys, relPath) if err != nil { - return nil, fmt.Errorf("last kernel %s: %w", path, err) + return nil, fmt.Errorf("last kernel %s: %w", relPath, err) } out.Host[host] = NewHostAggregate(host, lastKernel) - if err := processRecordsFile(ctx, path, host, out); err != nil { + if err := processRecordsFile(ctx, ag.fsys, relPath, host, out); err != nil { return nil, err } } return out, nil } -func processRecordsFile(ctx context.Context, path, host string, out *Aggregates) error { - f, err := os.Open(path) +func processRecordsFile(ctx context.Context, fsys fs.FS, relPath, host string, out *Aggregates) error { + f, err := fsys.Open(relPath) if err != nil { - return fmt.Errorf("open %s: %w", path, err) + return fmt.Errorf("open %s: %w", relPath, err) } defer f.Close() @@ -82,13 +89,13 @@ func processRecordsFile(ctx context.Context, path, host string, out *Aggregates) getOrNewAggregate(out.KernelMajor, rec.KernelMajor).AddRecord(rec.Uptime, rec.BootTime) } if err := sc.Err(); err != nil { - return fmt.Errorf("scan %s: %w", path, err) + return fmt.Errorf("scan %s: %w", relPath, err) } return nil } -func lastKernelFromFile(ctx context.Context, path string) (string, error) { - f, err := os.Open(path) +func lastKernelFromFile(ctx context.Context, fsys fs.FS, relPath string) (string, error) { + f, err := fsys.Open(relPath) if err != nil { return "", err } diff --git a/internal/goprecords/aggregate_test.go b/internal/goprecords/aggregate_test.go index dfc9c91..c7e75b4 100644 --- a/internal/goprecords/aggregate_test.go +++ b/internal/goprecords/aggregate_test.go @@ -5,12 +5,29 @@ import ( "os" "path/filepath" "testing" + "testing/fstest" ) -func TestNewAggregator(t *testing.T) { - agg := NewAggregator("./test") - if agg.statsDir != "./test" { - t.Errorf("expected statsDir ./test, got %q", agg.statsDir) +func TestNewAggregatorMatchesDirFS(t *testing.T) { + tmpDir := t.TempDir() + content := []byte("86400:1000000:Linux 5.10.0-test\n") + if err := os.WriteFile(filepath.Join(tmpDir, "h1.records"), content, 0o644); err != nil { + t.Fatal(err) + } + ctx := context.Background() + a, err := NewAggregator(tmpDir).Aggregate(ctx) + if err != nil { + t.Fatalf("NewAggregator: %v", err) + } + b, err := NewAggregatorFS(os.DirFS(tmpDir)).Aggregate(ctx) + if err != nil { + t.Fatalf("NewAggregatorFS: %v", err) + } + if len(a.Host) != 1 || len(b.Host) != 1 { + t.Fatalf("hosts: a=%d b=%d", len(a.Host), len(b.Host)) + } + if a.Host["h1"].Boots != b.Host["h1"].Boots || a.Host["h1"].Uptime != b.Host["h1"].Uptime { + t.Fatalf("mismatch: %#v vs %#v", a.Host["h1"], b.Host["h1"]) } } @@ -24,6 +41,23 @@ func TestAggregateInvalidDir(t *testing.T) { } } +func TestAggregateMapFS(t *testing.T) { + m := fstest.MapFS{ + "box.records": &fstest.MapFile{ + Data: []byte("86400:1000000:Linux 5.10.0-test\n86400:1000001:Linux 5.11.0-test\n"), + Mode: 0o644, + }, + } + aggs, err := NewAggregatorFS(m).Aggregate(context.Background()) + if err != nil { + t.Fatalf("Aggregate: %v", err) + } + h := aggs.Host["box"] + if h == nil || h.Boots != 2 || h.LastKernel != "Linux 5.11.0-test" { + t.Fatalf("host box: %#v", h) + } +} + func TestAggregateFixtures(t *testing.T) { fixturesPath := "fixtures" if _, err := os.Stat(fixturesPath); err != nil { @@ -118,7 +152,9 @@ func TestLastKernelFromFile(t *testing.T) { t.Skipf("skipping test, fixture file not found") } - kernel, err := lastKernelFromFile(context.Background(), testFile) + fixturesDir := filepath.Dir(testFile) + base := filepath.Base(testFile) + kernel, err := lastKernelFromFile(context.Background(), os.DirFS(fixturesDir), base) if err != nil { t.Fatalf("failed to get last kernel: %v", err) } @@ -129,7 +165,7 @@ func TestLastKernelFromFile(t *testing.T) { } func TestLastKernelFromFileNonExistent(t *testing.T) { - _, err := lastKernelFromFile(context.Background(), "/nonexistent/file.records") + _, err := lastKernelFromFile(context.Background(), os.DirFS(t.TempDir()), "missing.records") if err == nil { t.Error("expected error for non-existent file") } @@ -158,7 +194,7 @@ func TestProcessRecordsFile(t *testing.T) { aggs.Host["test"] = NewHostAggregate("test", "") ctx := context.Background() - err := processRecordsFile(ctx, testFile, "test", aggs) + err := processRecordsFile(ctx, os.DirFS(tmpDir), "test.records", "test", aggs) if err != nil { t.Fatalf("failed to process records: %v", err) diff --git a/internal/goprecords/db.go b/internal/goprecords/db.go index 904923e..2bcfb78 100644 --- a/internal/goprecords/db.go +++ b/internal/goprecords/db.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "io/fs" "codeberg.org/snonux/goprecords/internal/storage" _ "modernc.org/sqlite" @@ -30,6 +31,12 @@ func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error { return storage.ImportFromDir(ctx, db, statsDir) } +// ImportFromFS reads all non-empty .records files from the root of fsys and inserts into the DB. +// Resets the record table first so the run is repeatable. +func ImportFromFS(ctx context.Context, db *sql.DB, fsys fs.FS) error { + return storage.ImportFromFS(ctx, db, fsys) +} + // LoadAggregates reads all rows from the DB and builds Aggregates (same shape as file-based aggregation). func LoadAggregates(ctx context.Context, db *sql.DB) (*Aggregates, error) { records, err := storage.LoadRecords(ctx, db) diff --git a/internal/goprecords/db_test.go b/internal/goprecords/db_test.go index 204d0ca..0edbae9 100644 --- a/internal/goprecords/db_test.go +++ b/internal/goprecords/db_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" "testing" + "testing/fstest" ) func TestOpenDB(t *testing.T) { @@ -133,6 +134,38 @@ func TestImportFromDir(t *testing.T) { } } +func TestImportFromFS_MapFS(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test.db") + db, err := OpenDB(context.Background(), dbPath) + if err != nil { + t.Fatalf("open DB: %v", err) + } + defer db.Close() + ctx := context.Background() + if err := CreateSchema(ctx, db); err != nil { + t.Fatalf("schema: %v", err) + } + m := fstest.MapFS{ + "testhost.records": &fstest.MapFile{ + Data: []byte("86400:1000000:Linux 5.10.0-test\n" + + "86400:1000001:Linux 5.10.0-test\n" + + "86400:1000002:Linux 5.10.0-test\n"), + Mode: 0o644, + }, + } + if err := ImportFromFS(ctx, db, m); err != nil { + t.Fatalf("ImportFromFS: %v", err) + } + var count int + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM record").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 3 { + t.Errorf("count = %d, want 3", count) + } +} + func TestImportFromDirInvalidPath(t *testing.T) { tmpDir := t.TempDir() dbPath := filepath.Join(tmpDir, "test.db") diff --git a/internal/recordsdir/recordsdir.go b/internal/recordsdir/recordsdir.go index 9f3ce5b..94f7f0e 100644 --- a/internal/recordsdir/recordsdir.go +++ b/internal/recordsdir/recordsdir.go @@ -1,7 +1,9 @@ package recordsdir import ( + "io/fs" "os" + "path" "path/filepath" "strings" ) @@ -19,22 +21,53 @@ func HostFromFileName(name string) string { return host } -func ListNonEmptyFiles(dir string) ([]Entry, error) { - entries, err := os.ReadDir(dir) +func listRecordsFileNames(fsys fs.FS, root string) ([]string, error) { + entries, err := fs.ReadDir(fsys, root) if err != nil { return nil, err } - var out []Entry + var names []string for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".records") { continue } - path := filepath.Join(dir, e.Name()) - info, err := os.Stat(path) + rel := path.Join(root, e.Name()) + info, err := fs.Stat(fsys, rel) if err != nil || info.Size() == 0 { continue } - out = append(out, Entry{Path: path, Host: HostFromFileName(e.Name())}) + names = append(names, e.Name()) + } + return names, nil +} + +// ListNonEmptyFilesFS returns non-empty .records files under root within fsys. +func ListNonEmptyFilesFS(fsys fs.FS, root string) ([]Entry, error) { + names, err := listRecordsFileNames(fsys, root) + if err != nil { + return nil, err + } + var out []Entry + for _, name := range names { + out = append(out, Entry{ + Path: path.Join(root, name), + Host: HostFromFileName(name), + }) + } + return out, nil +} + +func ListNonEmptyFiles(dir string) ([]Entry, error) { + names, err := listRecordsFileNames(os.DirFS(dir), ".") + if err != nil { + return nil, err + } + var out []Entry + for _, name := range names { + out = append(out, Entry{ + Path: filepath.Join(dir, name), + Host: HostFromFileName(name), + }) } return out, nil } diff --git a/internal/recordsdir/recordsdir_test.go b/internal/recordsdir/recordsdir_test.go index bee8d67..5e72d9d 100644 --- a/internal/recordsdir/recordsdir_test.go +++ b/internal/recordsdir/recordsdir_test.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "testing" + "testing/fstest" ) func TestHostFromFileName(t *testing.T) { @@ -56,3 +57,18 @@ func TestListNonEmptyFiles_ReadError(t *testing.T) { t.Fatal("expected error") } } + +func TestListNonEmptyFilesFS_MapFS(t *testing.T) { + m := fstest.MapFS{ + "skip.txt": &fstest.MapFile{Data: []byte("x"), Mode: 0o644}, + "empty.records": &fstest.MapFile{Data: nil, Mode: 0o644}, + "h1.records": &fstest.MapFile{Data: []byte("line\n"), Mode: 0o644}, + } + entries, err := ListNonEmptyFilesFS(m, ".") + if err != nil { + t.Fatal(err) + } + if len(entries) != 1 || entries[0].Host != "h1" || entries[0].Path != "h1.records" { + t.Fatalf("got %#v", entries) + } +} diff --git a/internal/storage/db.go b/internal/storage/db.go index fe789d6..45eb71a 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "fmt" + "io/fs" "os" "codeberg.org/snonux/goprecords/internal/recordline" @@ -63,10 +64,15 @@ func ResetRecords(ctx context.Context, db *sql.DB) error { } func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error { + return ImportFromFS(ctx, db, os.DirFS(statsDir)) +} + +// ImportFromFS reads non-empty .records files from the root of fsys into the database. +func ImportFromFS(ctx context.Context, db *sql.DB, fsys fs.FS) error { if err := ResetRecords(ctx, db); err != nil { return fmt.Errorf("reset records: %w", err) } - files, err := recordsdir.ListNonEmptyFiles(statsDir) + files, err := recordsdir.ListNonEmptyFilesFS(fsys, ".") if err != nil { return fmt.Errorf("read dir: %w", err) } @@ -81,7 +87,7 @@ func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error { } defer insert.Close() for _, f := range files { - if err := importFile(ctx, insert, f.Path, f.Host); err != nil { + if err := importFile(ctx, insert, fsys, f.Path, f.Host); err != nil { return err } } @@ -119,10 +125,10 @@ func LoadRecords(ctx context.Context, db *sql.DB) ([]Record, error) { return out, nil } -func importFile(ctx context.Context, insert *sql.Stmt, path, host string) error { - f, err := os.Open(path) +func importFile(ctx context.Context, insert *sql.Stmt, fsys fs.FS, relPath, host string) error { + f, err := fsys.Open(relPath) if err != nil { - return fmt.Errorf("open %s: %w", path, err) + return fmt.Errorf("open %s: %w", relPath, err) } defer f.Close() sc := bufio.NewScanner(f) @@ -141,7 +147,7 @@ func importFile(ctx context.Context, insert *sql.Stmt, path, host string) error } } if err := sc.Err(); err != nil { - return fmt.Errorf("scan %s: %w", path, err) + return fmt.Errorf("scan %s: %w", relPath, err) } return nil } |
