1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
package storage
import (
"bufio"
"context"
"database/sql"
"fmt"
"io/fs"
"os"
"codeberg.org/snonux/goprecords/internal/recordline"
"codeberg.org/snonux/goprecords/internal/recordsdir"
_ "modernc.org/sqlite"
)
const schemaSQL = `
CREATE TABLE IF NOT EXISTS record (
host TEXT NOT NULL,
uptime_sec INTEGER NOT NULL,
boot_time INTEGER NOT NULL,
os TEXT NOT NULL,
os_kernel_name TEXT NOT NULL,
os_kernel_major TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_record_host ON record(host);
CREATE INDEX IF NOT EXISTS idx_record_os ON record(os);
CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name);
CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major);
`
// Record is one uptimed boot row stored in the record table.
type Record struct {
Host string
Uptime uint64
BootTime uint64
OS string
KernelName string
KernelMajor string
}
// Open opens a SQLite database at path and verifies connectivity.
func Open(ctx context.Context, path string) (*sql.DB, error) {
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, fmt.Errorf("open sqlite: %w", err)
}
if err := db.PingContext(ctx); err != nil {
db.Close()
return nil, fmt.Errorf("ping sqlite: %w", err)
}
if _, err := db.ExecContext(ctx, "PRAGMA foreign_keys = OFF"); err != nil {
db.Close()
return nil, fmt.Errorf("pragma foreign_keys: %w", err)
}
return db, nil
}
// CreateSchema creates the record table and indexes if they do not exist.
func CreateSchema(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, schemaSQL)
return err
}
// ResetRecords deletes all rows from the record table.
func ResetRecords(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, "DELETE FROM record")
return err
}
// ImportFromDir imports non-empty .records files from statsDir into the database,
// replacing existing rows. It is equivalent to ImportFromFS with os.DirFS(statsDir).
func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error {
return ImportFromFS(ctx, db, os.DirFS(statsDir))
}
// ImportFromFS reads non-empty .records files from the root of fsys into the database.
func ImportFromFS(ctx context.Context, db *sql.DB, fsys fs.FS) error {
if err := ResetRecords(ctx, db); err != nil {
return fmt.Errorf("reset records: %w", err)
}
files, err := recordsdir.ListNonEmptyFilesFS(fsys, ".")
if err != nil {
return fmt.Errorf("read dir: %w", err)
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("prepare insert: %w", err)
}
defer insert.Close()
for _, f := range files {
if err := importFile(ctx, insert, fsys, f.Path, f.Host); err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}
return nil
}
// LoadRecords returns all rows from the record table ordered by host and boot time.
func LoadRecords(ctx context.Context, db *sql.DB) ([]Record, error) {
var n int
if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM record").Scan(&n); err != nil {
return nil, fmt.Errorf("count records: %w", err)
}
rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time")
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
out := make([]Record, 0, n)
for rows.Next() {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
var rec Record
var uptimeSec, bootTime int64
if err := rows.Scan(&rec.Host, &uptimeSec, &bootTime, &rec.OS, &rec.KernelName, &rec.KernelMajor); err != nil {
return nil, fmt.Errorf("scan row: %w", err)
}
rec.Uptime = uint64(uptimeSec)
rec.BootTime = uint64(bootTime)
out = append(out, rec)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows: %w", err)
}
return out, nil
}
func importFile(ctx context.Context, insert *sql.Stmt, fsys fs.FS, relPath, host string) error {
f, err := fsys.Open(relPath)
if err != nil {
return fmt.Errorf("open %s: %w", relPath, err)
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rec, ok := recordline.Parse(sc.Text())
if !ok {
continue
}
if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil {
return fmt.Errorf("insert: %w", err)
}
}
if err := sc.Err(); err != nil {
return fmt.Errorf("scan %s: %w", relPath, err)
}
return nil
}
|