summaryrefslogtreecommitdiff
path: root/lib/hyperstack/watcher.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/hyperstack/watcher.rb')
-rw-r--r--lib/hyperstack/watcher.rb425
1 files changed, 425 insertions, 0 deletions
diff --git a/lib/hyperstack/watcher.rb b/lib/hyperstack/watcher.rb
new file mode 100644
index 0000000..de3d71e
--- /dev/null
+++ b/lib/hyperstack/watcher.rb
@@ -0,0 +1,425 @@
+# frozen_string_literal: true
+
+require 'json'
+require 'net/http'
+require 'open3'
+require 'socket'
+require 'timeout'
+
+module HyperstackVM
+ class VllmWatcher
+ REFRESH_INTERVAL = 5
+
+ # ANSI escape helpers
+ BOLD = "\033[1m"
+ DIM = "\033[2m"
+ GREEN = "\033[32m"
+ YELLOW = "\033[33m"
+ CYAN = "\033[36m"
+ RED = "\033[31m"
+ RESET = "\033[0m"
+ CLEAR = "\033[2J\033[H"
+
+ # Snapshot of one VM's stats at a point in time.
+ # service_type is :vllm or :comfyui — controls which metrics section is rendered.
+ VmSnapshot = Struct.new(
+ :label, :wg_host, :service_type,
+ :vllm_model, :container_name,
+ :metrics, :gpus,
+ :vllm_error, :gpu_error,
+ :fetched_at,
+ keyword_init: true
+ )
+
+ # Parsed per-GPU row from nvidia-smi.
+ GpuInfo = Struct.new(
+ :index, :name, :temp_c, :util_pct, :power_w,
+ :mem_used_mib, :mem_total_mib,
+ keyword_init: true
+ )
+
+ def initialize(config_loaders:)
+ @config_loaders = config_loaders
+ end
+
+ # Runs the watch loop until the user presses Ctrl-C.
+ def run
+ $stdout.print "\033[?25l" # hide cursor
+ loop do
+ snapshots = fetch_all_parallel
+ draw(snapshots)
+ sleep REFRESH_INTERVAL
+ end
+ rescue Interrupt
+ nil
+ ensure
+ $stdout.print "\033[?25h\n" # restore cursor
+ end
+
+ private
+
+ # Fetches stats for every VM concurrently and returns an array of VmSnapshot.
+ def fetch_all_parallel
+ threads = @config_loaders.map { |loader| Thread.new { fetch_vm(loader) } }
+ threads.map(&:value)
+ end
+
+ # Fetches GPU stats and service stats for a single VM via one SSH session.
+ # Routes to fetch_comfyui_vm or fetch_vllm_vm based on config.
+ def fetch_vm(loader)
+ config = loader.config
+ label = File.basename(loader.path, '.toml')
+ wg_host = config.wireguard_gateway_hostname
+ state = load_state(config.state_file)
+
+ unless state
+ svc = config.comfyui_install_enabled? ? :comfyui : :vllm
+ return VmSnapshot.new(label: label, wg_host: wg_host, service_type: svc,
+ vllm_model: nil, container_name: nil,
+ metrics: nil, gpus: nil,
+ vllm_error: 'no state file', gpu_error: nil,
+ fetched_at: Time.now)
+ end
+
+ if config.comfyui_install_enabled?
+ fetch_comfyui_vm(config, label, wg_host)
+ else
+ fetch_vllm_vm(config, label, wg_host, state)
+ end
+ rescue StandardError => e
+ VmSnapshot.new(label: label || '?', wg_host: wg_host || '?', service_type: :vllm,
+ vllm_model: nil, container_name: nil,
+ metrics: nil, gpus: nil,
+ vllm_error: e.message, gpu_error: nil,
+ fetched_at: Time.now)
+ end
+
+ # Fetches GPU + vLLM container stats for a vLLM VM.
+ def fetch_vllm_vm(config, label, wg_host, state)
+ vllm_model = state['vllm_model'] || config.vllm_model
+ container_name = state['vllm_container_name'] || config.vllm_container_name
+
+ gpus, metrics, ssh_error = fetch_vm_stats(config, wg_host, container_name)
+
+ VmSnapshot.new(label: label, wg_host: wg_host, service_type: :vllm,
+ vllm_model: vllm_model, container_name: container_name,
+ metrics: metrics, gpus: gpus,
+ vllm_error: ssh_error, gpu_error: ssh_error,
+ fetched_at: Time.now)
+ end
+
+ # Fetches GPU + ComfyUI queue stats for a ComfyUI VM.
+ # Returns queue running/pending counts and total outputs produced so far.
+ def fetch_comfyui_vm(config, label, wg_host)
+ gpus, metrics, ssh_error = fetch_comfyui_stats(config, wg_host, config.comfyui_port)
+
+ VmSnapshot.new(label: label, wg_host: wg_host, service_type: :comfyui,
+ vllm_model: nil, container_name: nil,
+ metrics: metrics, gpus: gpus,
+ vllm_error: ssh_error, gpu_error: ssh_error,
+ fetched_at: Time.now)
+ end
+
+ def load_state(path)
+ JSON.parse(File.read(path))
+ rescue Errno::ENOENT, JSON::ParserError
+ nil
+ end
+
+ # Single SSH call: nvidia-smi + ComfyUI queue + output file count.
+ # Sections separated by sentinel lines so we can split the output cleanly.
+ # Returns [gpus, metrics_hash, error_or_nil].
+ def fetch_comfyui_stats(config, wg_host, port)
+ gpu_query = 'index,name,temperature.gpu,utilization.gpu,power.draw,memory.used,memory.total'
+ script = <<~BASH
+ nvidia-smi --query-gpu=#{gpu_query} --format=csv,noheader,nounits
+ echo ===COMFYUI===
+ curl -s http://localhost:#{port}/queue 2>/dev/null
+ echo
+ echo ===HISTORY===
+ curl -s http://localhost:#{port}/history 2>/dev/null | python3 -c \
+ "import json,sys; h=json.load(sys.stdin); print(len(h))" 2>/dev/null || echo 0
+ BASH
+
+ ssh = build_ssh_command(config, wg_host)
+ stdout, stderr, status = Timeout.timeout(15) { Open3.capture3(*ssh, stdin_data: script) }
+ return [nil, nil, "exit #{status.exitstatus}: #{stderr.strip}"] unless status.success?
+
+ gpu_section, rest = stdout.split("===COMFYUI===\n", 2)
+ queue_section, hist_section = rest.to_s.split("===HISTORY===\n", 2)
+ gpus = parse_nvidia_smi(gpu_section.to_s)
+ metrics = parse_comfyui_queue(queue_section.to_s.strip, hist_section.to_s.strip)
+ [gpus, metrics, nil]
+ end
+
+ # Parse ComfyUI /queue JSON into a plain Hash.
+ def parse_comfyui_queue(queue_json, history_count_str)
+ q = begin
+ JSON.parse(queue_json)
+ rescue StandardError
+ {}
+ end
+ {
+ 'queue_running' => Array(q['queue_running']).size,
+ 'queue_pending' => Array(q['queue_pending']).size,
+ 'history_count' => history_count_str.to_i
+ }
+ end
+
+ # Single SSH call that runs nvidia-smi and tails the vLLM container logs.
+ # The two sections are separated by a sentinel line so we can split them.
+ # Returns [gpus, metrics, error_or_nil].
+ def fetch_vm_stats(config, wg_host, container_name)
+ gpu_query = 'index,name,temperature.gpu,utilization.gpu,power.draw,memory.used,memory.total'
+ # --tail 200 instead of --since N so we always get the last stats line
+ # even when the VM has been idle for longer than the refresh interval.
+ script = <<~BASH
+ nvidia-smi --query-gpu=#{gpu_query} --format=csv,noheader,nounits
+ echo ===VLLM===
+ docker logs --tail 200 #{container_name} 2>&1 | grep 'Engine 0' | tail -1
+ BASH
+
+ ssh = build_ssh_command(config, wg_host)
+ stdout, stderr, status = Timeout.timeout(15) { Open3.capture3(*ssh, stdin_data: script) }
+ return [nil, nil, "exit #{status.exitstatus}: #{stderr.strip}"] unless status.success?
+
+ gpu_section, vllm_section = stdout.split("===VLLM===\n", 2)
+ gpus = parse_nvidia_smi(gpu_section.to_s)
+ metrics = parse_engine_log_line(vllm_section.to_s.strip)
+ [gpus, metrics, nil]
+ end
+
+ # Parse a vLLM "Engine 0" log line into a plain Hash.
+ # Actual log format (loggers.py):
+ # (APIServer pid=1) INFO ... [loggers.py:259] Engine 000:
+ # Avg prompt throughput: 6154.6 tokens/s,
+ # Avg generation throughput: 27.4 tokens/s,
+ # Running: 1 reqs, Waiting: 0 reqs,
+ # GPU KV cache usage: 0.7%, Prefix cache hit rate: 0.0%
+ # Returns an empty hash when no matching line was found (container still loading).
+ def parse_engine_log_line(line)
+ return {} if line.empty?
+
+ {
+ 'avg_prompt_throughput' => extract_float(line, /Avg prompt throughput:\s*([\d.]+)/),
+ 'avg_generation_throughput' => extract_float(line, /Avg generation throughput:\s*([\d.]+)/),
+ 'running' => extract_float(line, /Running:\s*(\d+)\s*reqs/),
+ 'pending' => extract_float(line, /Waiting:\s*(\d+)\s*reqs/),
+ 'swapped' => extract_float(line, /Swapped:\s*(\d+)\s*reqs/),
+ 'gpu_cache_usage_pct' => extract_float(line, /GPU KV cache usage:\s*([\d.]+)%/),
+ 'gpu_prefix_cache_hit_rate_pct' => extract_float(line, /Prefix cache hit rate:\s*([\d.]+)%/)
+ }.compact
+ end
+
+ def extract_float(text, pattern)
+ m = text.match(pattern)
+ m ? m[1].to_f : nil
+ end
+
+ # Build an SSH command array for the watcher.
+ # Uses accept-new rather than yes because the known-hosts file was populated
+ # with the VM's public IP during provisioning; the WireGuard hostname
+ # (hyperstack1.wg1 etc.) won't be in it yet. accept-new auto-trusts the first
+ # connection and caches the key — safe here because we're connecting over the
+ # already-authenticated WireGuard tunnel.
+ def build_ssh_command(config, host)
+ cmd = [
+ 'ssh',
+ '-o', 'BatchMode=yes',
+ '-o', 'StrictHostKeyChecking=accept-new',
+ '-o', "UserKnownHostsFile=#{config.ssh_known_hosts_path}",
+ '-o', "ConnectTimeout=#{config.ssh_connect_timeout}",
+ '-p', config.ssh_port.to_s
+ ]
+ key = config.ssh_private_key_path
+ cmd.concat(['-i', key]) if File.exist?(key)
+ cmd << "#{config.ssh_username}@#{host}"
+ cmd << 'bash -se'
+ cmd
+ end
+
+ def parse_nvidia_smi(text)
+ text.lines.filter_map do |line|
+ parts = line.strip.split(',').map(&:strip)
+ next if parts.length < 7
+
+ GpuInfo.new(
+ index: parts[0].to_i,
+ name: parts[1],
+ temp_c: parts[2].to_f,
+ util_pct: parts[3].to_f,
+ power_w: parts[4].to_f,
+ mem_used_mib: parts[5].to_f,
+ mem_total_mib: parts[6].to_f
+ )
+ end
+ end
+
+ # ── Rendering ────────────────────────────────────────────────────────────
+
+ # Clears the screen and redraws the full dashboard for all VMs.
+ def draw(snapshots)
+ time_str = Time.now.strftime('%H:%M:%S')
+ header = "#{BOLD}#{CYAN}VM watch#{RESET} " \
+ "#{DIM}#{time_str} Ctrl-C to stop " \
+ "refreshing every #{REFRESH_INTERVAL}s#{RESET}"
+
+ panels = snapshots.map { |snap| render_vm(snap) }
+
+ if panels.size >= 2
+ # Lay out VM panels side-by-side, padding each to its own visible width
+ # so the separator column stays aligned regardless of content length.
+ panel_widths = panels.map { |p| p.map { |l| strip_ansi(l).length }.max.to_i }
+ max_rows = panels.map(&:size).max
+ panels.each { |p| p.fill('', p.size...max_rows) }
+ sep = " #{DIM}│#{RESET} "
+
+ panel_lines = (0...max_rows).map do |i|
+ panels.each_with_index.map do |panel, j|
+ cell = panel[i] || ''
+ # Pad every column except the last so the separator stays in column.
+ next cell if j == panels.size - 1
+
+ visible_len = strip_ansi(cell).length
+ cell + ' ' * [panel_widths[j] - visible_len, 0].max
+ end.join(sep)
+ end
+
+ rule_w = [strip_ansi(panel_lines.first || '').length, 72].max
+ rule = DIM + ('─' * rule_w) + RESET
+ lines = [header, rule, *panel_lines, '']
+ else
+ # Single VM: simple vertical layout.
+ rule = DIM + ('─' * 72) + RESET
+ lines = [header, rule]
+ panels.each do |p|
+ lines << ''
+ lines.concat(p)
+ end
+ lines << ''
+ end
+
+ $stdout.write(CLEAR + lines.join("\n"))
+ $stdout.flush
+ end
+
+ # Width of the label column used in every metric row, keeping bars aligned.
+ LABEL_W = 10
+
+ # Renders a single VM panel as an array of strings (one per display line).
+ # Routes the service-specific metrics section based on service_type.
+ def render_vm(snap)
+ lines = []
+
+ svc_label = snap.service_type == :comfyui ? "#{DIM}ComfyUI#{RESET}" : ''
+ model_label = snap.vllm_model ? DIM + snap.vllm_model.split('/').last + RESET : svc_label
+ lines << "#{BOLD}#{snap.label}#{RESET} #{DIM}#{snap.wg_host}#{RESET} #{model_label}"
+
+ # Both GPU and service stats come from the same SSH call; show one error if it failed.
+ if snap.gpu_error
+ lines << " #{RED}#{snap.gpu_error}#{RESET}"
+ else
+ snap.gpus&.each do |gpu|
+ mem_pct = gpu.mem_total_mib > 0 ? (gpu.mem_used_mib / gpu.mem_total_mib * 100.0) : 0.0
+ lines << format(' GPU%-2d %-26s %3.0f°C %5.0fW',
+ gpu.index, gpu.name, gpu.temp_c, gpu.power_w)
+ lines << bar_row('util', gpu.util_pct)
+ lines << bar_row('VRAM', mem_pct)
+ end
+ if snap.service_type == :comfyui
+ lines.concat(render_comfyui_metrics(snap.metrics))
+ elsif snap.metrics&.any?
+ lines.concat(render_vllm_metrics(snap.metrics))
+ elsif snap.metrics && snap.metrics.empty?
+ lines << " #{DIM}(no Engine log line yet — container may still be loading)#{RESET}"
+ end
+ end
+
+ lines
+ end
+
+ # Formats ComfyUI queue stats into display lines.
+ def render_comfyui_metrics(m)
+ return [" #{DIM}(no ComfyUI stats)#{RESET}"] unless m&.any?
+
+ running = m['queue_running'].to_i
+ pending = m['queue_pending'].to_i
+ history = m['history_count'].to_i
+
+ q_str = running > 0 ? "#{GREEN}#{running} running#{RESET}" : "#{DIM}idle#{RESET}"
+ q_str += " #{pending} queued" if pending > 0
+ [
+ row('queue', q_str),
+ row('completed', "#{history} jobs total")
+ ]
+ end
+
+ # Formats the vLLM engine log stats into display lines.
+ # All values come directly from the "Engine 0" log line that vLLM emits
+ # every few seconds, so tok/s figures are the rolling averages vLLM computes
+ # internally — no client-side rate derivation needed.
+ def render_vllm_metrics(m)
+ lines = []
+
+ # Throughput: rolling averages already computed by vLLM
+ prefill_tps = m['avg_prompt_throughput']
+ decode_tps = m['avg_generation_throughput']
+ tput_parts = []
+ tput_parts << "prefill #{format('%.1f', prefill_tps)} tok/s" if prefill_tps
+ tput_parts << "decode #{format('%.1f', decode_tps)} tok/s" if decode_tps
+ lines << row('throughput', tput_parts.empty? ? 'n/a' : tput_parts.join(' '))
+
+ # Request queue depth
+ running = m['running']
+ swapped = m['swapped']
+ pending = m['pending']
+ q_parts = []
+ q_parts << "#{running.to_i} running" if running
+ q_parts << "#{pending.to_i} waiting" if pending
+ q_parts << "#{swapped.to_i} swapped" if swapped && swapped > 0
+ lines << row('requests', q_parts.empty? ? 'n/a' : q_parts.join(' '))
+
+ # KV-cache fill and prefix-cache hit rate, each with an aligned bar
+ gpu_cache = m['gpu_cache_usage_pct']
+ hit_rate_gpu = m['gpu_prefix_cache_hit_rate_pct']
+ lines << bar_row('KV cache', gpu_cache) if gpu_cache
+ lines << bar_row('cache hits', hit_rate_gpu) if hit_rate_gpu
+
+ lines
+ end
+
+ # Formats one metric row: fixed-width label then value, giving all rows the same indent.
+ def row(label, value)
+ " #{label.ljust(LABEL_W)} #{value}"
+ end
+
+ # Formats one bar row: fixed-width label, proportional bar, percentage number.
+ # All bar rows share the same column for '[', aligning bars across GPU and vLLM sections.
+ def bar_row(label, pct)
+ row(label, "#{pct_bar(pct, 10)} #{format('%5.1f', pct)}%")
+ end
+
+ # Renders a proportional bar for any percentage (0–100).
+ # Colour: green below 50%, yellow 50–79%, red 80%+.
+ def pct_bar(pct, width)
+ filled = [(pct / 100.0 * width).round, width].min
+ color = if pct >= 80
+ RED
+ else
+ pct >= 50 ? YELLOW : GREEN
+ end
+ "[#{color}#{'█' * filled}#{RESET}#{' ' * (width - filled)}]"
+ end
+
+ # Strips ANSI escape sequences to measure the visible length of a string.
+ def strip_ansi(str)
+ str.gsub(/\033\[[0-9;]*m/, '')
+ end
+
+ # Formats an integer with thousands separators, e.g. 1234567 → "1,234,567".
+ def fmt_num(n)
+ n.to_i.to_s.reverse.scan(/\d{1,3}/).join(',').reverse
+ end
+ end
+
+end