summaryrefslogtreecommitdiff
path: root/internal/storage/db.go
blob: 45eb71a6272a3664949c9fff349077b7682acca4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package storage

import (
	"bufio"
	"context"
	"database/sql"
	"fmt"
	"io/fs"
	"os"

	"codeberg.org/snonux/goprecords/internal/recordline"
	"codeberg.org/snonux/goprecords/internal/recordsdir"
	_ "modernc.org/sqlite"
)

const schemaSQL = `
CREATE TABLE IF NOT EXISTS record (
	host TEXT NOT NULL,
	uptime_sec INTEGER NOT NULL,
	boot_time INTEGER NOT NULL,
	os TEXT NOT NULL,
	os_kernel_name TEXT NOT NULL,
	os_kernel_major TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_record_host ON record(host);
CREATE INDEX IF NOT EXISTS idx_record_os ON record(os);
CREATE INDEX IF NOT EXISTS idx_record_os_kernel_name ON record(os_kernel_name);
CREATE INDEX IF NOT EXISTS idx_record_os_kernel_major ON record(os_kernel_major);
`

type Record struct {
	Host        string
	Uptime      uint64
	BootTime    uint64
	OS          string
	KernelName  string
	KernelMajor string
}

func Open(ctx context.Context, path string) (*sql.DB, error) {
	db, err := sql.Open("sqlite", path)
	if err != nil {
		return nil, fmt.Errorf("open sqlite: %w", err)
	}
	if err := db.PingContext(ctx); err != nil {
		db.Close()
		return nil, fmt.Errorf("ping sqlite: %w", err)
	}
	if _, err := db.ExecContext(ctx, "PRAGMA foreign_keys = OFF"); err != nil {
		db.Close()
		return nil, fmt.Errorf("pragma foreign_keys: %w", err)
	}
	return db, nil
}

func CreateSchema(ctx context.Context, db *sql.DB) error {
	_, err := db.ExecContext(ctx, schemaSQL)
	return err
}

func ResetRecords(ctx context.Context, db *sql.DB) error {
	_, err := db.ExecContext(ctx, "DELETE FROM record")
	return err
}

func ImportFromDir(ctx context.Context, db *sql.DB, statsDir string) error {
	return ImportFromFS(ctx, db, os.DirFS(statsDir))
}

// ImportFromFS reads non-empty .records files from the root of fsys into the database.
func ImportFromFS(ctx context.Context, db *sql.DB, fsys fs.FS) error {
	if err := ResetRecords(ctx, db); err != nil {
		return fmt.Errorf("reset records: %w", err)
	}
	files, err := recordsdir.ListNonEmptyFilesFS(fsys, ".")
	if err != nil {
		return fmt.Errorf("read dir: %w", err)
	}
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("begin transaction: %w", err)
	}
	defer tx.Rollback()
	insert, err := tx.PrepareContext(ctx, "INSERT INTO record (host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major) VALUES (?, ?, ?, ?, ?, ?)")
	if err != nil {
		return fmt.Errorf("prepare insert: %w", err)
	}
	defer insert.Close()
	for _, f := range files {
		if err := importFile(ctx, insert, fsys, f.Path, f.Host); err != nil {
			return err
		}
	}
	if err := tx.Commit(); err != nil {
		return fmt.Errorf("commit transaction: %w", err)
	}
	return nil
}

func LoadRecords(ctx context.Context, db *sql.DB) ([]Record, error) {
	rows, err := db.QueryContext(ctx, "SELECT host, uptime_sec, boot_time, os, os_kernel_name, os_kernel_major FROM record ORDER BY host, boot_time")
	if err != nil {
		return nil, fmt.Errorf("query: %w", err)
	}
	defer rows.Close()
	var out []Record
	for rows.Next() {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}
		var rec Record
		var uptimeSec, bootTime int64
		if err := rows.Scan(&rec.Host, &uptimeSec, &bootTime, &rec.OS, &rec.KernelName, &rec.KernelMajor); err != nil {
			return nil, fmt.Errorf("scan row: %w", err)
		}
		rec.Uptime = uint64(uptimeSec)
		rec.BootTime = uint64(bootTime)
		out = append(out, rec)
	}
	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("rows: %w", err)
	}
	return out, nil
}

func importFile(ctx context.Context, insert *sql.Stmt, fsys fs.FS, relPath, host string) error {
	f, err := fsys.Open(relPath)
	if err != nil {
		return fmt.Errorf("open %s: %w", relPath, err)
	}
	defer f.Close()
	sc := bufio.NewScanner(f)
	for sc.Scan() {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		rec, ok := recordline.Parse(sc.Text())
		if !ok {
			continue
		}
		if _, err := insert.ExecContext(ctx, host, rec.Uptime, rec.BootTime, rec.OS, rec.KernelName, rec.KernelMajor); err != nil {
			return fmt.Errorf("insert: %w", err)
		}
	}
	if err := sc.Err(); err != nil {
		return fmt.Errorf("scan %s: %w", relPath, err)
	}
	return nil
}