diff options
Diffstat (limited to 'lib/hyperstack/watcher.rb')
| -rw-r--r-- | lib/hyperstack/watcher.rb | 425 |
1 files changed, 425 insertions, 0 deletions
diff --git a/lib/hyperstack/watcher.rb b/lib/hyperstack/watcher.rb new file mode 100644 index 0000000..de3d71e --- /dev/null +++ b/lib/hyperstack/watcher.rb @@ -0,0 +1,425 @@ +# frozen_string_literal: true + +require 'json' +require 'net/http' +require 'open3' +require 'socket' +require 'timeout' + +module HyperstackVM + class VllmWatcher + REFRESH_INTERVAL = 5 + + # ANSI escape helpers + BOLD = "\033[1m" + DIM = "\033[2m" + GREEN = "\033[32m" + YELLOW = "\033[33m" + CYAN = "\033[36m" + RED = "\033[31m" + RESET = "\033[0m" + CLEAR = "\033[2J\033[H" + + # Snapshot of one VM's stats at a point in time. + # service_type is :vllm or :comfyui — controls which metrics section is rendered. + VmSnapshot = Struct.new( + :label, :wg_host, :service_type, + :vllm_model, :container_name, + :metrics, :gpus, + :vllm_error, :gpu_error, + :fetched_at, + keyword_init: true + ) + + # Parsed per-GPU row from nvidia-smi. + GpuInfo = Struct.new( + :index, :name, :temp_c, :util_pct, :power_w, + :mem_used_mib, :mem_total_mib, + keyword_init: true + ) + + def initialize(config_loaders:) + @config_loaders = config_loaders + end + + # Runs the watch loop until the user presses Ctrl-C. + def run + $stdout.print "\033[?25l" # hide cursor + loop do + snapshots = fetch_all_parallel + draw(snapshots) + sleep REFRESH_INTERVAL + end + rescue Interrupt + nil + ensure + $stdout.print "\033[?25h\n" # restore cursor + end + + private + + # Fetches stats for every VM concurrently and returns an array of VmSnapshot. + def fetch_all_parallel + threads = @config_loaders.map { |loader| Thread.new { fetch_vm(loader) } } + threads.map(&:value) + end + + # Fetches GPU stats and service stats for a single VM via one SSH session. + # Routes to fetch_comfyui_vm or fetch_vllm_vm based on config. + def fetch_vm(loader) + config = loader.config + label = File.basename(loader.path, '.toml') + wg_host = config.wireguard_gateway_hostname + state = load_state(config.state_file) + + unless state + svc = config.comfyui_install_enabled? ? :comfyui : :vllm + return VmSnapshot.new(label: label, wg_host: wg_host, service_type: svc, + vllm_model: nil, container_name: nil, + metrics: nil, gpus: nil, + vllm_error: 'no state file', gpu_error: nil, + fetched_at: Time.now) + end + + if config.comfyui_install_enabled? + fetch_comfyui_vm(config, label, wg_host) + else + fetch_vllm_vm(config, label, wg_host, state) + end + rescue StandardError => e + VmSnapshot.new(label: label || '?', wg_host: wg_host || '?', service_type: :vllm, + vllm_model: nil, container_name: nil, + metrics: nil, gpus: nil, + vllm_error: e.message, gpu_error: nil, + fetched_at: Time.now) + end + + # Fetches GPU + vLLM container stats for a vLLM VM. + def fetch_vllm_vm(config, label, wg_host, state) + vllm_model = state['vllm_model'] || config.vllm_model + container_name = state['vllm_container_name'] || config.vllm_container_name + + gpus, metrics, ssh_error = fetch_vm_stats(config, wg_host, container_name) + + VmSnapshot.new(label: label, wg_host: wg_host, service_type: :vllm, + vllm_model: vllm_model, container_name: container_name, + metrics: metrics, gpus: gpus, + vllm_error: ssh_error, gpu_error: ssh_error, + fetched_at: Time.now) + end + + # Fetches GPU + ComfyUI queue stats for a ComfyUI VM. + # Returns queue running/pending counts and total outputs produced so far. + def fetch_comfyui_vm(config, label, wg_host) + gpus, metrics, ssh_error = fetch_comfyui_stats(config, wg_host, config.comfyui_port) + + VmSnapshot.new(label: label, wg_host: wg_host, service_type: :comfyui, + vllm_model: nil, container_name: nil, + metrics: metrics, gpus: gpus, + vllm_error: ssh_error, gpu_error: ssh_error, + fetched_at: Time.now) + end + + def load_state(path) + JSON.parse(File.read(path)) + rescue Errno::ENOENT, JSON::ParserError + nil + end + + # Single SSH call: nvidia-smi + ComfyUI queue + output file count. + # Sections separated by sentinel lines so we can split the output cleanly. + # Returns [gpus, metrics_hash, error_or_nil]. + def fetch_comfyui_stats(config, wg_host, port) + gpu_query = 'index,name,temperature.gpu,utilization.gpu,power.draw,memory.used,memory.total' + script = <<~BASH + nvidia-smi --query-gpu=#{gpu_query} --format=csv,noheader,nounits + echo ===COMFYUI=== + curl -s http://localhost:#{port}/queue 2>/dev/null + echo + echo ===HISTORY=== + curl -s http://localhost:#{port}/history 2>/dev/null | python3 -c \ + "import json,sys; h=json.load(sys.stdin); print(len(h))" 2>/dev/null || echo 0 + BASH + + ssh = build_ssh_command(config, wg_host) + stdout, stderr, status = Timeout.timeout(15) { Open3.capture3(*ssh, stdin_data: script) } + return [nil, nil, "exit #{status.exitstatus}: #{stderr.strip}"] unless status.success? + + gpu_section, rest = stdout.split("===COMFYUI===\n", 2) + queue_section, hist_section = rest.to_s.split("===HISTORY===\n", 2) + gpus = parse_nvidia_smi(gpu_section.to_s) + metrics = parse_comfyui_queue(queue_section.to_s.strip, hist_section.to_s.strip) + [gpus, metrics, nil] + end + + # Parse ComfyUI /queue JSON into a plain Hash. + def parse_comfyui_queue(queue_json, history_count_str) + q = begin + JSON.parse(queue_json) + rescue StandardError + {} + end + { + 'queue_running' => Array(q['queue_running']).size, + 'queue_pending' => Array(q['queue_pending']).size, + 'history_count' => history_count_str.to_i + } + end + + # Single SSH call that runs nvidia-smi and tails the vLLM container logs. + # The two sections are separated by a sentinel line so we can split them. + # Returns [gpus, metrics, error_or_nil]. + def fetch_vm_stats(config, wg_host, container_name) + gpu_query = 'index,name,temperature.gpu,utilization.gpu,power.draw,memory.used,memory.total' + # --tail 200 instead of --since N so we always get the last stats line + # even when the VM has been idle for longer than the refresh interval. + script = <<~BASH + nvidia-smi --query-gpu=#{gpu_query} --format=csv,noheader,nounits + echo ===VLLM=== + docker logs --tail 200 #{container_name} 2>&1 | grep 'Engine 0' | tail -1 + BASH + + ssh = build_ssh_command(config, wg_host) + stdout, stderr, status = Timeout.timeout(15) { Open3.capture3(*ssh, stdin_data: script) } + return [nil, nil, "exit #{status.exitstatus}: #{stderr.strip}"] unless status.success? + + gpu_section, vllm_section = stdout.split("===VLLM===\n", 2) + gpus = parse_nvidia_smi(gpu_section.to_s) + metrics = parse_engine_log_line(vllm_section.to_s.strip) + [gpus, metrics, nil] + end + + # Parse a vLLM "Engine 0" log line into a plain Hash. + # Actual log format (loggers.py): + # (APIServer pid=1) INFO ... [loggers.py:259] Engine 000: + # Avg prompt throughput: 6154.6 tokens/s, + # Avg generation throughput: 27.4 tokens/s, + # Running: 1 reqs, Waiting: 0 reqs, + # GPU KV cache usage: 0.7%, Prefix cache hit rate: 0.0% + # Returns an empty hash when no matching line was found (container still loading). + def parse_engine_log_line(line) + return {} if line.empty? + + { + 'avg_prompt_throughput' => extract_float(line, /Avg prompt throughput:\s*([\d.]+)/), + 'avg_generation_throughput' => extract_float(line, /Avg generation throughput:\s*([\d.]+)/), + 'running' => extract_float(line, /Running:\s*(\d+)\s*reqs/), + 'pending' => extract_float(line, /Waiting:\s*(\d+)\s*reqs/), + 'swapped' => extract_float(line, /Swapped:\s*(\d+)\s*reqs/), + 'gpu_cache_usage_pct' => extract_float(line, /GPU KV cache usage:\s*([\d.]+)%/), + 'gpu_prefix_cache_hit_rate_pct' => extract_float(line, /Prefix cache hit rate:\s*([\d.]+)%/) + }.compact + end + + def extract_float(text, pattern) + m = text.match(pattern) + m ? m[1].to_f : nil + end + + # Build an SSH command array for the watcher. + # Uses accept-new rather than yes because the known-hosts file was populated + # with the VM's public IP during provisioning; the WireGuard hostname + # (hyperstack1.wg1 etc.) won't be in it yet. accept-new auto-trusts the first + # connection and caches the key — safe here because we're connecting over the + # already-authenticated WireGuard tunnel. + def build_ssh_command(config, host) + cmd = [ + 'ssh', + '-o', 'BatchMode=yes', + '-o', 'StrictHostKeyChecking=accept-new', + '-o', "UserKnownHostsFile=#{config.ssh_known_hosts_path}", + '-o', "ConnectTimeout=#{config.ssh_connect_timeout}", + '-p', config.ssh_port.to_s + ] + key = config.ssh_private_key_path + cmd.concat(['-i', key]) if File.exist?(key) + cmd << "#{config.ssh_username}@#{host}" + cmd << 'bash -se' + cmd + end + + def parse_nvidia_smi(text) + text.lines.filter_map do |line| + parts = line.strip.split(',').map(&:strip) + next if parts.length < 7 + + GpuInfo.new( + index: parts[0].to_i, + name: parts[1], + temp_c: parts[2].to_f, + util_pct: parts[3].to_f, + power_w: parts[4].to_f, + mem_used_mib: parts[5].to_f, + mem_total_mib: parts[6].to_f + ) + end + end + + # ── Rendering ──────────────────────────────────────────────────────────── + + # Clears the screen and redraws the full dashboard for all VMs. + def draw(snapshots) + time_str = Time.now.strftime('%H:%M:%S') + header = "#{BOLD}#{CYAN}VM watch#{RESET} " \ + "#{DIM}#{time_str} Ctrl-C to stop " \ + "refreshing every #{REFRESH_INTERVAL}s#{RESET}" + + panels = snapshots.map { |snap| render_vm(snap) } + + if panels.size >= 2 + # Lay out VM panels side-by-side, padding each to its own visible width + # so the separator column stays aligned regardless of content length. + panel_widths = panels.map { |p| p.map { |l| strip_ansi(l).length }.max.to_i } + max_rows = panels.map(&:size).max + panels.each { |p| p.fill('', p.size...max_rows) } + sep = " #{DIM}│#{RESET} " + + panel_lines = (0...max_rows).map do |i| + panels.each_with_index.map do |panel, j| + cell = panel[i] || '' + # Pad every column except the last so the separator stays in column. + next cell if j == panels.size - 1 + + visible_len = strip_ansi(cell).length + cell + ' ' * [panel_widths[j] - visible_len, 0].max + end.join(sep) + end + + rule_w = [strip_ansi(panel_lines.first || '').length, 72].max + rule = DIM + ('─' * rule_w) + RESET + lines = [header, rule, *panel_lines, ''] + else + # Single VM: simple vertical layout. + rule = DIM + ('─' * 72) + RESET + lines = [header, rule] + panels.each do |p| + lines << '' + lines.concat(p) + end + lines << '' + end + + $stdout.write(CLEAR + lines.join("\n")) + $stdout.flush + end + + # Width of the label column used in every metric row, keeping bars aligned. + LABEL_W = 10 + + # Renders a single VM panel as an array of strings (one per display line). + # Routes the service-specific metrics section based on service_type. + def render_vm(snap) + lines = [] + + svc_label = snap.service_type == :comfyui ? "#{DIM}ComfyUI#{RESET}" : '' + model_label = snap.vllm_model ? DIM + snap.vllm_model.split('/').last + RESET : svc_label + lines << "#{BOLD}#{snap.label}#{RESET} #{DIM}#{snap.wg_host}#{RESET} #{model_label}" + + # Both GPU and service stats come from the same SSH call; show one error if it failed. + if snap.gpu_error + lines << " #{RED}#{snap.gpu_error}#{RESET}" + else + snap.gpus&.each do |gpu| + mem_pct = gpu.mem_total_mib > 0 ? (gpu.mem_used_mib / gpu.mem_total_mib * 100.0) : 0.0 + lines << format(' GPU%-2d %-26s %3.0f°C %5.0fW', + gpu.index, gpu.name, gpu.temp_c, gpu.power_w) + lines << bar_row('util', gpu.util_pct) + lines << bar_row('VRAM', mem_pct) + end + if snap.service_type == :comfyui + lines.concat(render_comfyui_metrics(snap.metrics)) + elsif snap.metrics&.any? + lines.concat(render_vllm_metrics(snap.metrics)) + elsif snap.metrics && snap.metrics.empty? + lines << " #{DIM}(no Engine log line yet — container may still be loading)#{RESET}" + end + end + + lines + end + + # Formats ComfyUI queue stats into display lines. + def render_comfyui_metrics(m) + return [" #{DIM}(no ComfyUI stats)#{RESET}"] unless m&.any? + + running = m['queue_running'].to_i + pending = m['queue_pending'].to_i + history = m['history_count'].to_i + + q_str = running > 0 ? "#{GREEN}#{running} running#{RESET}" : "#{DIM}idle#{RESET}" + q_str += " #{pending} queued" if pending > 0 + [ + row('queue', q_str), + row('completed', "#{history} jobs total") + ] + end + + # Formats the vLLM engine log stats into display lines. + # All values come directly from the "Engine 0" log line that vLLM emits + # every few seconds, so tok/s figures are the rolling averages vLLM computes + # internally — no client-side rate derivation needed. + def render_vllm_metrics(m) + lines = [] + + # Throughput: rolling averages already computed by vLLM + prefill_tps = m['avg_prompt_throughput'] + decode_tps = m['avg_generation_throughput'] + tput_parts = [] + tput_parts << "prefill #{format('%.1f', prefill_tps)} tok/s" if prefill_tps + tput_parts << "decode #{format('%.1f', decode_tps)} tok/s" if decode_tps + lines << row('throughput', tput_parts.empty? ? 'n/a' : tput_parts.join(' ')) + + # Request queue depth + running = m['running'] + swapped = m['swapped'] + pending = m['pending'] + q_parts = [] + q_parts << "#{running.to_i} running" if running + q_parts << "#{pending.to_i} waiting" if pending + q_parts << "#{swapped.to_i} swapped" if swapped && swapped > 0 + lines << row('requests', q_parts.empty? ? 'n/a' : q_parts.join(' ')) + + # KV-cache fill and prefix-cache hit rate, each with an aligned bar + gpu_cache = m['gpu_cache_usage_pct'] + hit_rate_gpu = m['gpu_prefix_cache_hit_rate_pct'] + lines << bar_row('KV cache', gpu_cache) if gpu_cache + lines << bar_row('cache hits', hit_rate_gpu) if hit_rate_gpu + + lines + end + + # Formats one metric row: fixed-width label then value, giving all rows the same indent. + def row(label, value) + " #{label.ljust(LABEL_W)} #{value}" + end + + # Formats one bar row: fixed-width label, proportional bar, percentage number. + # All bar rows share the same column for '[', aligning bars across GPU and vLLM sections. + def bar_row(label, pct) + row(label, "#{pct_bar(pct, 10)} #{format('%5.1f', pct)}%") + end + + # Renders a proportional bar for any percentage (0–100). + # Colour: green below 50%, yellow 50–79%, red 80%+. + def pct_bar(pct, width) + filled = [(pct / 100.0 * width).round, width].min + color = if pct >= 80 + RED + else + pct >= 50 ? YELLOW : GREEN + end + "[#{color}#{'█' * filled}#{RESET}#{' ' * (width - filled)}]" + end + + # Strips ANSI escape sequences to measure the visible length of a string. + def strip_ansi(str) + str.gsub(/\033\[[0-9;]*m/, '') + end + + # Formats an integer with thousands separators, e.g. 1234567 → "1,234,567". + def fmt_num(n) + n.to_i.to_s.reverse.scan(/\d{1,3}/).join(',').reverse + end + end + +end |
