Paste
JAVASCRIPT
pi_power_dashboard.py (benchmark raspi, checks undervoltage)
Created 49d ago · 6 views
Language
JAVASCRIPT
Lines
333
Characters
12056
JAVASCRIPT · amber-elm-89
#!/usr/bin/env python3
"""Raspberry Pi power & throttling dashboard.
Usage:
python3 ~/pi_power_dashboard.py
python3 ~/pi_power_dashboard.py --stress
python3 ~/pi_power_dashboard.py --stress --duration 90 --cpu-workers 2
Optional dependency for nicer visuals:
pip install rich
If rich is not installed, this script still works with a plain terminal UI.
"""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import sys
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
try:
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
console = Console() if RICH_AVAILABLE else None
HISTORY_SIZE = 60
@dataclass
class Sample:
timestamp: datetime
temperature: float
voltage: float
frequency: int
throttled: int
def run_command(cmd: list[str]) -> str:
try:
return subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True).strip()
except FileNotFoundError:
raise RuntimeError(f"Command not found: {cmd[0]}")
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Command failed: {' '.join(cmd)} ({exc.returncode})")
def parse_temp(text: str) -> float:
# temp=68.1'C
if text.startswith("temp="):
text = text[5:]
return float(text.rstrip("'C"))
def parse_volts(text: str) -> float:
# volt=0.9700V
if text.startswith("volt="):
text = text[5:]
return float(text.rstrip("V"))
def parse_clock(text: str) -> int:
# frequency(48)=2000478464
if "=" in text:
text = text.split("=", 1)[1]
return int(text)
def parse_throttled(text: str) -> int:
# throttled=0x50000
if "=" in text:
text = text.split("=", 1)[1]
return int(text, 16)
def from_vcgencmd() -> Sample:
temp_raw = run_command(["vcgencmd", "measure_temp"])
volt_raw = run_command(["vcgencmd", "measure_volts", "core"])
freq_raw = run_command(["vcgencmd", "measure_clock", "arm"])
throttled_raw = run_command(["vcgencmd", "get_throttled"])
return Sample(
timestamp=datetime.now(),
temperature=parse_temp(temp_raw),
voltage=parse_volts(volt_raw),
frequency=parse_clock(freq_raw),
throttled=parse_throttled(throttled_raw),
)
def format_flag(throttled: int, bit: int, label: str) -> str:
active = (throttled >> bit) & 1
return f"{label}:{'✔' if active else '·'}"
def describe_throttling(throttled: int) -> str:
flags = [
format_flag(throttled, 0, "UNDERVOLT_ACTIVE"),
format_flag(throttled, 1, "THROTTLE_ACTIVE"),
format_flag(throttled, 2, "ARM_FREQ_LOWER"),
format_flag(throttled, 3, "UNDER_VOLTAGE_OCCURRED"),
format_flag(throttled, 16, "UNDERVOLT_HISTORY"),
format_flag(throttled, 17, "THROTTLE_HISTORY"),
]
return " ".join(flags)
def build_stress_cmd(cpu_workers: int, duration: int) -> list[str]:
return [
"stress-ng",
"--cpu",
str(cpu_workers),
"--io",
"1",
"--timeout",
f"{duration}s",
]
def start_stress_test(cpu_workers: int, duration: int) -> subprocess.Popen:
cmd = build_stress_cmd(cpu_workers, duration)
return subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def stress_status_message(proc: Optional[subprocess.Popen], duration: int) -> str:
if proc is None:
return "idle"
if proc.poll() is None:
return f"running ({duration}s)"
return "finished"
def stress_outcome_message(initial: int, latest: int) -> str:
new_history = latest & ~initial
messages = []
if new_history & 0x10000:
messages.append("new undervolt history")
if new_history & 0x40000:
messages.append("new throttle history")
if latest & 0x1:
messages.append("active undervolt")
if latest & 0x2:
messages.append("active throttle")
if not messages:
return "no new undervolt/throttle detected"
return ", ".join(messages)
def throttled_explanation(throttled: int) -> str:
if throttled & 0x1:
return "Active undervolt -> likely power/cable issue."
if throttled & 0x2:
if throttled & 0x10000:
return "Active throttle plus undervolt history -> likely power issue."
return "Active throttle only -> may be load-related rather than a direct power fault."
if throttled & 0x10000:
return "Undervolt history recorded -> power supply or cable is suspect."
if throttled & 0x40000:
return "Throttle history recorded -> load or power issue may have occurred."
return "No undervolt or throttle condition detected."
def gauge(value: float, minimum: float, maximum: float, width: int = 30) -> str:
pct = max(0.0, min(1.0, (value - minimum) / (maximum - minimum))) if maximum > minimum else 0.0
filled = int(pct * width)
empty = width - filled
return f"[{'#' * filled}{'-' * empty}] {value:.1f}" if RICH_AVAILABLE else f"[{'#' * filled}{'-' * empty}] {value:.1f}"
def build_dashboard(history: deque[Sample], stress_message: str) -> Table:
sample = history[-1]
table = Table.grid(expand=True)
table.add_column(ratio=1)
table.add_column(ratio=1)
left = Table.grid()
left.add_row("[bold cyan]Raspberry Pi Power Dashboard[/bold cyan]")
left.add_row(f"[bold]Time:[/bold] {sample.timestamp:%Y-%m-%d %H:%M:%S}")
left.add_row(f"[bold]Stress:[/bold] {stress_message}")
left.add_row(f"[bold]Temp:[/bold] {sample.temperature:.1f}°C")
left.add_row(gauge(sample.temperature, 30.0, 90.0))
left.add_row(f"[bold]Voltage:[/bold] {sample.voltage:.4f} V")
left.add_row(gauge(sample.voltage, 0.90, 1.20))
left.add_row(f"[bold]CPU freq:[/bold] {sample.frequency / 1_000_000:.1f} MHz")
left.add_row(gauge(sample.frequency / 1_000_000, 500.0, 2000.0))
left.add_row(f"[bold]Throttled:[/bold] 0x{sample.throttled:08x}")
left.add_row(describe_throttling(sample.throttled))
left.add_row(f"[italic yellow]{throttled_explanation(sample.throttled)}[/italic yellow]")
right = Table.grid()
right.add_row("[bold]History (last 60 samples)[/bold]")
if len(history) > 1:
temps = [s.temperature for s in history]
volts = [s.voltage for s in history]
freqs = [s.frequency / 1_000_000 for s in history]
throttle_history = [s.throttled for s in history]
right.add_row("[green]Temp:[/green] " + sparkline(temps, 30))
right.add_row("[yellow]Volt:[/yellow] " + sparkline(volts, 30))
right.add_row("[magenta]Freq:[/magenta] " + sparkline(freqs, 30))
right.add_row("[red]Thrott:[/red] " + sparkline([float(x.bit_count()) for x in throttle_history], 30))
else:
right.add_row("waiting for more samples...")
table.add_row(Panel(left, title="Current"), Panel(right, title="Trends"))
return table
def sparkline(values: list[float], width: int = 32) -> str:
if not values:
return ""
bars = "▁▂▃▄▅▆▇█"
lo = min(values)
hi = max(values)
span = max(hi - lo, 1e-6)
return "".join(bars[int((v - lo) / span * (len(bars) - 1))] for v in values[-width:])
def plain_dashboard(history: deque[Sample], stress_message: str) -> str:
sample = history[-1]
lines = []
lines.append("Raspberry Pi Power Dashboard")
lines.append("=============================")
lines.append(f"Time: {sample.timestamp:%Y-%m-%d %H:%M:%S}")
lines.append(f"Stress: {stress_message}")
lines.append(f"Temp: {sample.temperature:.1f}°C {gauge(sample.temperature, 30.0, 90.0)}")
lines.append(f"Volt: {sample.voltage:.4f} V {gauge(sample.voltage, 0.90, 1.20)}")
lines.append(f"Freq: {sample.frequency / 1_000_000:.1f} MHz {gauge(sample.frequency / 1_000_000, 500.0, 2000.0)}")
lines.append(f"Throttled: 0x{sample.throttled:08x}")
lines.append(describe_throttling(sample.throttled))
lines.append(throttled_explanation(sample.throttled))
if len(history) > 1:
lines.append("")
lines.append("History:")
lines.append(f" Temp: {sparkline([s.temperature for s in history])}")
lines.append(f" Volt: {sparkline([s.voltage for s in history])}")
lines.append(f" Freq: {sparkline([s.frequency / 1_000_000 for s in history])}")
lines.append(f" Th: {sparkline([float(x.bit_count()) for x in history])}")
return "\n".join(lines)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Raspberry Pi power & stress dashboard")
parser.add_argument("--stress", action="store_true", help="Run a stress-ng CPU stress test while monitoring")
parser.add_argument("--duration", type=int, default=60, help="Stress test duration in seconds")
parser.add_argument("--cpu-workers", type=int, default=4, help="Number of stress-ng CPU workers")
return parser.parse_args()
def main() -> int:
if not shutil.which("vcgencmd"):
print("This script requires vcgencmd on Raspberry Pi.")
return 1
args = parse_args()
stress_proc: Optional[subprocess.Popen] = None
stress_initial_throttled: Optional[int] = None
stress_summary: Optional[str] = None
if args.stress:
if not shutil.which("stress-ng"):
print("Stress test requested, but stress-ng is not installed.")
print("Install it with: sudo apt install stress-ng")
return 1
stress_proc = start_stress_test(args.cpu_workers, args.duration)
history: deque[Sample] = deque(maxlen=HISTORY_SIZE)
if RICH_AVAILABLE:
with Live(console=console, refresh_per_second=2) as live:
while True:
try:
sample = from_vcgencmd()
except RuntimeError as exc:
live.update(Panel(str(exc), title="Error", style="red"))
time.sleep(2)
continue
history.append(sample)
if args.stress and stress_initial_throttled is None:
stress_initial_throttled = sample.throttled
if stress_proc is not None and stress_proc.poll() is not None:
if stress_summary is None and stress_initial_throttled is not None:
stress_summary = stress_outcome_message(stress_initial_throttled, sample.throttled)
stress_proc = None
stress_message = stress_status_message(stress_proc, args.duration)
if stress_summary is not None:
stress_message += f" -> {stress_summary}"
live.update(build_dashboard(history, stress_message))
time.sleep(1.5)
else:
print("rich not installed; using plain terminal output.")
while True:
try:
sample = from_vcgencmd()
except RuntimeError as exc:
print(f"ERROR: {exc}")
time.sleep(2)
continue
history.append(sample)
if args.stress and stress_initial_throttled is None:
stress_initial_throttled = sample.throttled
if stress_proc is not None and stress_proc.poll() is not None:
if stress_summary is None and stress_initial_throttled is not None:
stress_summary = stress_outcome_message(stress_initial_throttled, sample.throttled)
stress_proc = None
stress_message = stress_status_message(stress_proc, args.duration)
if stress_summary is not None:
stress_message += f" -> {stress_summary}"
os.system("clear")
print(plain_dashboard(history, stress_message))
time.sleep(1.5)
return 0
if __name__ == "__main__":
raise SystemExit(main())
More pastes
Browse all →
TEXT
27 views · 27d ago
TEXT
25 views · 28d ago
TEXT
14 views · 28d ago
TEXT
13 views · 28d ago
TEXT
15 views · 30d ago
TEXT
12 views · 30d ago
TEXT
13 views · 30d ago
TEXT
19 views · 31d ago
TEXT
15 views · 31d ago
TEXT
19 views · 31d ago
TEXT
11 views · 31d ago
TEXT
13 views · 32d ago
TEXT
9 views · 33d ago
TEXT
8 views · 33d ago
TEXT
14 views · 33d ago
TEXT
10 views · 33d ago