OC openvibe.community
Paste JAVASCRIPT

pi_power_dashboard.py (benchmark raspi, checks undervoltage)

Created 49d ago · 6 views

Language
JAVASCRIPT
Lines
333
Characters
12056
JAVASCRIPT · amber-elm-89
#!/usr/bin/env python3
"""Raspberry Pi power & throttling dashboard.

Usage:
  python3 ~/pi_power_dashboard.py
  python3 ~/pi_power_dashboard.py --stress
  python3 ~/pi_power_dashboard.py --stress --duration 90 --cpu-workers 2

Optional dependency for nicer visuals:
  pip install rich

If rich is not installed, this script still works with a plain terminal UI.
"""

from __future__ import annotations

import argparse
import os
import shutil
import subprocess
import sys
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

try:
    from rich.console import Console
    from rich.live import Live
    from rich.panel import Panel
    from rich.table import Table
    from rich.text import Text
    RICH_AVAILABLE = True
except ImportError:
    RICH_AVAILABLE = False

console = Console() if RICH_AVAILABLE else None

HISTORY_SIZE = 60

@dataclass
class Sample:
    timestamp: datetime
    temperature: float
    voltage: float
    frequency: int
    throttled: int


def run_command(cmd: list[str]) -> str:
    try:
        return subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True).strip()
    except FileNotFoundError:
        raise RuntimeError(f"Command not found: {cmd[0]}")
    except subprocess.CalledProcessError as exc:
        raise RuntimeError(f"Command failed: {' '.join(cmd)} ({exc.returncode})")


def parse_temp(text: str) -> float:
    # temp=68.1'C
    if text.startswith("temp="):
        text = text[5:]
    return float(text.rstrip("'C"))


def parse_volts(text: str) -> float:
    # volt=0.9700V
    if text.startswith("volt="):
        text = text[5:]
    return float(text.rstrip("V"))


def parse_clock(text: str) -> int:
    # frequency(48)=2000478464
    if "=" in text:
        text = text.split("=", 1)[1]
    return int(text)


def parse_throttled(text: str) -> int:
    # throttled=0x50000
    if "=" in text:
        text = text.split("=", 1)[1]
    return int(text, 16)


def from_vcgencmd() -> Sample:
    temp_raw = run_command(["vcgencmd", "measure_temp"])
    volt_raw = run_command(["vcgencmd", "measure_volts", "core"])
    freq_raw = run_command(["vcgencmd", "measure_clock", "arm"])
    throttled_raw = run_command(["vcgencmd", "get_throttled"])
    return Sample(
        timestamp=datetime.now(),
        temperature=parse_temp(temp_raw),
        voltage=parse_volts(volt_raw),
        frequency=parse_clock(freq_raw),
        throttled=parse_throttled(throttled_raw),
    )


def format_flag(throttled: int, bit: int, label: str) -> str:
    active = (throttled >> bit) & 1
    return f"{label}:{'✔' if active else '·'}"


def describe_throttling(throttled: int) -> str:
    flags = [
        format_flag(throttled, 0, "UNDERVOLT_ACTIVE"),
        format_flag(throttled, 1, "THROTTLE_ACTIVE"),
        format_flag(throttled, 2, "ARM_FREQ_LOWER"),
        format_flag(throttled, 3, "UNDER_VOLTAGE_OCCURRED"),
        format_flag(throttled, 16, "UNDERVOLT_HISTORY"),
        format_flag(throttled, 17, "THROTTLE_HISTORY"),
    ]
    return "  ".join(flags)


def build_stress_cmd(cpu_workers: int, duration: int) -> list[str]:
    return [
        "stress-ng",
        "--cpu",
        str(cpu_workers),
        "--io",
        "1",
        "--timeout",
        f"{duration}s",
    ]


def start_stress_test(cpu_workers: int, duration: int) -> subprocess.Popen:
    cmd = build_stress_cmd(cpu_workers, duration)
    return subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def stress_status_message(proc: Optional[subprocess.Popen], duration: int) -> str:
    if proc is None:
        return "idle"
    if proc.poll() is None:
        return f"running ({duration}s)"
    return "finished"


def stress_outcome_message(initial: int, latest: int) -> str:
    new_history = latest & ~initial
    messages = []
    if new_history & 0x10000:
        messages.append("new undervolt history")
    if new_history & 0x40000:
        messages.append("new throttle history")
    if latest & 0x1:
        messages.append("active undervolt")
    if latest & 0x2:
        messages.append("active throttle")
    if not messages:
        return "no new undervolt/throttle detected"
    return ", ".join(messages)


def throttled_explanation(throttled: int) -> str:
    if throttled & 0x1:
        return "Active undervolt -> likely power/cable issue."
    if throttled & 0x2:
        if throttled & 0x10000:
            return "Active throttle plus undervolt history -> likely power issue."
        return "Active throttle only -> may be load-related rather than a direct power fault."
    if throttled & 0x10000:
        return "Undervolt history recorded -> power supply or cable is suspect."
    if throttled & 0x40000:
        return "Throttle history recorded -> load or power issue may have occurred."
    return "No undervolt or throttle condition detected."


def gauge(value: float, minimum: float, maximum: float, width: int = 30) -> str:
    pct = max(0.0, min(1.0, (value - minimum) / (maximum - minimum))) if maximum > minimum else 0.0
    filled = int(pct * width)
    empty = width - filled
    return f"[{'#' * filled}{'-' * empty}] {value:.1f}" if RICH_AVAILABLE else f"[{'#' * filled}{'-' * empty}] {value:.1f}"


def build_dashboard(history: deque[Sample], stress_message: str) -> Table:
    sample = history[-1]
    table = Table.grid(expand=True)
    table.add_column(ratio=1)
    table.add_column(ratio=1)

    left = Table.grid()
    left.add_row("[bold cyan]Raspberry Pi Power Dashboard[/bold cyan]")
    left.add_row(f"[bold]Time:[/bold] {sample.timestamp:%Y-%m-%d %H:%M:%S}")
    left.add_row(f"[bold]Stress:[/bold] {stress_message}")
    left.add_row(f"[bold]Temp:[/bold] {sample.temperature:.1f}°C")
    left.add_row(gauge(sample.temperature, 30.0, 90.0))
    left.add_row(f"[bold]Voltage:[/bold] {sample.voltage:.4f} V")
    left.add_row(gauge(sample.voltage, 0.90, 1.20))
    left.add_row(f"[bold]CPU freq:[/bold] {sample.frequency / 1_000_000:.1f} MHz")
    left.add_row(gauge(sample.frequency / 1_000_000, 500.0, 2000.0))
    left.add_row(f"[bold]Throttled:[/bold] 0x{sample.throttled:08x}")
    left.add_row(describe_throttling(sample.throttled))
    left.add_row(f"[italic yellow]{throttled_explanation(sample.throttled)}[/italic yellow]")

    right = Table.grid()
    right.add_row("[bold]History (last 60 samples)[/bold]")
    if len(history) > 1:
        temps = [s.temperature for s in history]
        volts = [s.voltage for s in history]
        freqs = [s.frequency / 1_000_000 for s in history]
        throttle_history = [s.throttled for s in history]
        right.add_row("[green]Temp:[/green] " + sparkline(temps, 30))
        right.add_row("[yellow]Volt:[/yellow] " + sparkline(volts, 30))
        right.add_row("[magenta]Freq:[/magenta] " + sparkline(freqs, 30))
        right.add_row("[red]Thrott:[/red] " + sparkline([float(x.bit_count()) for x in throttle_history], 30))
    else:
        right.add_row("waiting for more samples...")

    table.add_row(Panel(left, title="Current"), Panel(right, title="Trends"))
    return table


def sparkline(values: list[float], width: int = 32) -> str:
    if not values:
        return ""
    bars = "▁▂▃▄▅▆▇█"
    lo = min(values)
    hi = max(values)
    span = max(hi - lo, 1e-6)
    return "".join(bars[int((v - lo) / span * (len(bars) - 1))] for v in values[-width:])


def plain_dashboard(history: deque[Sample], stress_message: str) -> str:
    sample = history[-1]
    lines = []
    lines.append("Raspberry Pi Power Dashboard")
    lines.append("=============================")
    lines.append(f"Time: {sample.timestamp:%Y-%m-%d %H:%M:%S}")
    lines.append(f"Stress: {stress_message}")
    lines.append(f"Temp: {sample.temperature:.1f}°C  {gauge(sample.temperature, 30.0, 90.0)}")
    lines.append(f"Volt: {sample.voltage:.4f} V  {gauge(sample.voltage, 0.90, 1.20)}")
    lines.append(f"Freq: {sample.frequency / 1_000_000:.1f} MHz  {gauge(sample.frequency / 1_000_000, 500.0, 2000.0)}")
    lines.append(f"Throttled: 0x{sample.throttled:08x}")
    lines.append(describe_throttling(sample.throttled))
    lines.append(throttled_explanation(sample.throttled))
    if len(history) > 1:
        lines.append("")
        lines.append("History:")
        lines.append(f" Temp: {sparkline([s.temperature for s in history])}")
        lines.append(f" Volt: {sparkline([s.voltage for s in history])}")
        lines.append(f" Freq: {sparkline([s.frequency / 1_000_000 for s in history])}")
        lines.append(f" Th: {sparkline([float(x.bit_count()) for x in history])}")
    return "\n".join(lines)


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Raspberry Pi power & stress dashboard")
    parser.add_argument("--stress", action="store_true", help="Run a stress-ng CPU stress test while monitoring")
    parser.add_argument("--duration", type=int, default=60, help="Stress test duration in seconds")
    parser.add_argument("--cpu-workers", type=int, default=4, help="Number of stress-ng CPU workers")
    return parser.parse_args()


def main() -> int:
    if not shutil.which("vcgencmd"):
        print("This script requires vcgencmd on Raspberry Pi.")
        return 1

    args = parse_args()
    stress_proc: Optional[subprocess.Popen] = None
    stress_initial_throttled: Optional[int] = None
    stress_summary: Optional[str] = None
    if args.stress:
        if not shutil.which("stress-ng"):
            print("Stress test requested, but stress-ng is not installed.")
            print("Install it with: sudo apt install stress-ng")
            return 1
        stress_proc = start_stress_test(args.cpu_workers, args.duration)

    history: deque[Sample] = deque(maxlen=HISTORY_SIZE)

    if RICH_AVAILABLE:
        with Live(console=console, refresh_per_second=2) as live:
            while True:
                try:
                    sample = from_vcgencmd()
                except RuntimeError as exc:
                    live.update(Panel(str(exc), title="Error", style="red"))
                    time.sleep(2)
                    continue

                history.append(sample)
                if args.stress and stress_initial_throttled is None:
                    stress_initial_throttled = sample.throttled

                if stress_proc is not None and stress_proc.poll() is not None:
                    if stress_summary is None and stress_initial_throttled is not None:
                        stress_summary = stress_outcome_message(stress_initial_throttled, sample.throttled)
                    stress_proc = None

                stress_message = stress_status_message(stress_proc, args.duration)
                if stress_summary is not None:
                    stress_message += f" -> {stress_summary}"

                live.update(build_dashboard(history, stress_message))
                time.sleep(1.5)
    else:
        print("rich not installed; using plain terminal output.")
        while True:
            try:
                sample = from_vcgencmd()
            except RuntimeError as exc:
                print(f"ERROR: {exc}")
                time.sleep(2)
                continue

            history.append(sample)
            if args.stress and stress_initial_throttled is None:
                stress_initial_throttled = sample.throttled

            if stress_proc is not None and stress_proc.poll() is not None:
                if stress_summary is None and stress_initial_throttled is not None:
                    stress_summary = stress_outcome_message(stress_initial_throttled, sample.throttled)
                stress_proc = None

            stress_message = stress_status_message(stress_proc, args.duration)
            if stress_summary is not None:
                stress_message += f" -> {stress_summary}"

            os.system("clear")
            print(plain_dashboard(history, stress_message))
            time.sleep(1.5)

    return 0

if __name__ == "__main__":
    raise SystemExit(main())
← All pastes
Community pulse
More pastes Browse all →
TEXT twin birdies 27 views · 27d ago TEXT The Jeffrey Twins Grabbing 25 views · 28d ago TEXT Screenshot 14 views · 28d ago TEXT Screenshot 13 views · 28d ago TEXT Screenshot 15 views · 30d ago TEXT Guy belongs into Circus with that trick 12 views · 30d ago TEXT SDXL Sucks ass 13 views · 30d ago TEXT Screenshot 19 views · 31d ago TEXT Screenshot 15 views · 31d ago TEXT Korean Streamer 19 views · 31d ago TEXT Some beautiful Art 11 views · 31d ago TEXT Some kind of Demo account? 13 views · 32d ago TEXT Screenshot 9 views · 33d ago TEXT Screenshot 8 views · 33d ago TEXT Screenshot 14 views · 33d ago TEXT Screenshot 10 views · 33d ago