summaryrefslogtreecommitdiff
path: root/internal/collector/collector.go
blob: 5c99347f0eed0fc8f7e9bd61da0099ed31756171 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package collector

import (
	"bufio"
	"bytes"
	"context"
	"fmt"
	"os/exec"
	"strings"
	"time"

	"codeberg.org/snonux/loadbars/internal/config"
)

// StatsStore is the interface for receiving parsed stats (implemented by app).
type StatsStore interface {
	SetLoadAvg(host, load1, load5, load15 string)
	SetCPU(host, name string, line CPULine)
	SetMem(host, key string, value int64)
	SetNet(host, iface string, net NetLine, stamp float64)
	SetDisk(host, device string, disk DiskLine, stamp float64)
}

// Run starts a collector for one host: runs the embedded remote script (local or over SSH)
// and parses the output stream into store. Host may be "host" or "host:user".
// It runs until ctx is cancelled or the command exits.
func Run(ctx context.Context, host string, cfg *config.Config, store StatsStore) error {
	hostKey, user := splitHostUser(host)

	// Select script: only Linux is supported for local monitoring.
	scriptBytes := LinuxScript
	if isLocal(hostKey) {
		scriptBytes = getLocalScript()
		if scriptBytes == nil {
			return fmt.Errorf("%s: local stats gathering requires Linux with /proc filesystem", hostKey)
		}
	}

	var (
		scanner *bufio.Scanner
		cmd     *exec.Cmd
		err     error
	)
	if isLocal(hostKey) {
		scanner, cmd, err = startLocalScanner(ctx, scriptBytes)
	} else {
		scanner, cmd, err = startRemoteScanner(ctx, hostKey, user, scriptBytes, cfg)
	}
	if err != nil {
		return fmt.Errorf("%s: %w", hostKey, err)
	}
	defer cmd.Wait()

	return parseCollectorStream(ctx, scanner, hostKey, store)
}

// startLocalScanner spawns "bash -s" with scriptBytes on stdin and returns a scanner
// over its stdout along with the Cmd for deferred Wait.
func startLocalScanner(ctx context.Context, scriptBytes []byte) (*bufio.Scanner, *exec.Cmd, error) {
	cmd := exec.CommandContext(ctx, "bash", "-s")
	cmd.Stdin = bytes.NewReader(scriptBytes)
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		return nil, nil, err
	}
	if err := cmd.Start(); err != nil {
		return nil, nil, err
	}
	return bufio.NewScanner(stdout), cmd, nil
}

// startRemoteScanner spawns "ssh host bash -s" with scriptBytes on stdin and returns
// a scanner over its stdout along with the Cmd for deferred Wait.
func startRemoteScanner(ctx context.Context, hostKey, user string, scriptBytes []byte, cfg *config.Config) (*bufio.Scanner, *exec.Cmd, error) {
	args := []string{"-o", "StrictHostKeyChecking=no"}
	if cfg.SSHOpts != "" {
		args = append(args, strings.Fields(cfg.SSHOpts)...)
	}
	if user != "" {
		args = append(args, "-l", user)
	}
	args = append(args, hostKey, "bash -s")
	cmd := exec.CommandContext(ctx, "ssh", args...)
	cmd.Stdin = bytes.NewReader(scriptBytes)
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		return nil, nil, err
	}
	if err := cmd.Start(); err != nil {
		return nil, nil, err
	}
	return bufio.NewScanner(stdout), cmd, nil
}

// parseCollectorStream reads lines from scanner and dispatches parsed stats to store.
// Always collects all CPU lines (cpu, cpu0, cpu1, ...) so display can toggle per-core
// view with key 1 without a reconnect.
func parseCollectorStream(ctx context.Context, scanner *bufio.Scanner, hostKey string, store StatsStore) error {
	mode := ""
	for scanner.Scan() {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		line := strings.TrimSpace(scanner.Text())
		if strings.HasPrefix(line, "M ") {
			mode = line
			continue
		}
		dispatchCollectorLine(mode, line, hostKey, store)
	}
	if err := scanner.Err(); err != nil {
		return fmt.Errorf("%s: read: %w", hostKey, err)
	}
	return nil
}

// dispatchCollectorLine routes one parsed line to the appropriate store setter
// based on the current protocol mode marker.
func dispatchCollectorLine(mode, line, hostKey string, store StatsStore) {
	switch mode {
	case ModeLoadAvg:
		l := ParseLoadAvg(line)
		store.SetLoadAvg(hostKey, l.Load1, l.Load5, l.Load15)
	case ModeMemStats:
		if mem, ok := ParseMemLine(line); ok {
			store.SetMem(hostKey, mem.Key, mem.Value)
		}
	case ModeNetStats:
		if idx := strings.Index(line, ":"); idx >= 0 {
			iface := strings.TrimSpace(line[:idx])
			net, err := ParseNetLine(iface + ":" + line[idx+1:])
			if err == nil {
				store.SetNet(hostKey, net.Iface, net, float64(time.Now().UnixNano())/1e9)
			}
		}
	case ModeDiskStats:
		if d, err := ParseDiskLine(line); err == nil {
			store.SetDisk(hostKey, d.Device, d, float64(time.Now().UnixNano())/1e9)
		}
	case ModeCPUStats:
		if strings.HasPrefix(line, "cpu") {
			if cu, err := ParseCPULine(line); err == nil {
				store.SetCPU(hostKey, cu.Name, cu)
			}
		}
	}
}

// splitHostUser splits "host:user" into (host, user). If no colon, returns (host, "").
func splitHostUser(host string) (h, u string) {
	idx := strings.Index(host, ":")
	if idx < 0 {
		return strings.TrimSpace(host), ""
	}
	return strings.TrimSpace(host[:idx]), strings.TrimSpace(host[idx+1:])
}

func isLocal(h string) bool {
	return h == "localhost" || h == "127.0.0.1"
}

// getLocalScript returns the appropriate script for the local OS
func getLocalScript() []byte {
	// Check if /proc exists (Linux/Unix)
	if _, err := exec.Command("test", "-d", "/proc").CombinedOutput(); err == nil {
		return LinuxScript
	}
	// /proc not found - unsupported OS for local stats gathering
	return nil
}