updated modelgen, decoupling tester

This commit is contained in:
2026-04-12 03:07:25 -03:00
parent af06309dad
commit 85a856b7ac
58 changed files with 4770 additions and 625 deletions

330
ctrl/spr.py Executable file
View File

@@ -0,0 +1,330 @@
#!/usr/bin/env python3
"""
spr - Soleprint component manager
Manages distributable components from the spr repo into target folders.
Runs from spr/. Consuming projects have no awareness of spr — they just
commit whatever lands in the target folder.
Usage:
python ctrl/spr.py list
python ctrl/spr.py sync soleprint-ui ~/wdir/unt/ui/framework
python ctrl/spr.py watch soleprint-ui ~/wdir/unt/ui/framework # ctrl+c to stop
python ctrl/spr.py publish soleprint-ui ~/wdir/mpr/ui/framework
python ctrl/spr.py diff soleprint-ui ~/wdir/mpr/ui/framework
"""
import argparse
import json
import logging
import shutil
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
log = logging.getLogger("spr")
SPR_HOME = Path(__file__).resolve().parent.parent
REGISTRY_PATH = SPR_HOME / "registry.json"
EXCLUDE = {
"node_modules",
"__pycache__",
".venv",
"dist",
".git",
"*.egg-info",
".spr",
"pnpm-lock.yaml",
}
def load_registry():
if not REGISTRY_PATH.exists():
log.error("registry not found: %s", REGISTRY_PATH)
sys.exit(1)
return json.loads(REGISTRY_PATH.read_text())
def resolve_component(registry, name):
if name not in registry:
log.error("unknown component: %s", name)
log.info("available: %s", ", ".join(registry.keys()))
sys.exit(1)
entry = registry[name]
source = SPR_HOME / entry["path"]
if not source.is_dir():
log.error("source not found: %s", source)
sys.exit(1)
return entry["type"], source
def get_version(comp_type, source):
try:
if comp_type == "npm":
pkg = json.loads((source / "package.json").read_text())
return pkg.get("version", "?")
elif comp_type == "pip":
for line in (source / "pyproject.toml").read_text().splitlines():
if line.strip().startswith("version"):
return line.split('"')[1]
except FileNotFoundError:
pass
return "?"
def get_sha():
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=SPR_HOME,
capture_output=True,
text=True,
)
return result.stdout.strip() if result.returncode == 0 else "unknown"
except FileNotFoundError:
return "unknown"
def should_exclude(path, base):
"""Check if a path should be excluded from sync."""
rel = path.relative_to(base)
for part in rel.parts:
if part in EXCLUDE:
return True
for pattern in EXCLUDE:
if "*" in pattern and part.endswith(pattern.replace("*", "")):
return True
return False
def copy_tree(src, dst):
"""Copy src to dst, excluding build artifacts. Returns count of files copied."""
count = 0
dst.mkdir(parents=True, exist_ok=True)
for item in src.iterdir():
if should_exclude(item, src.parent):
continue
target = dst / item.name
if item.is_dir():
count += copy_tree(item, target)
else:
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, target)
count += 1
return count
def sync_tree(src, dst):
"""Bidirectional sync — newest file wins. Returns (fwd, rev) counts."""
fwd = _sync_one_way(src, dst)
rev = _sync_one_way(dst, src)
return fwd, rev
def _sync_one_way(src, dst):
"""Copy files from src to dst only if src is newer."""
count = 0
if not src.is_dir():
return count
dst.mkdir(parents=True, exist_ok=True)
for item in src.iterdir():
if should_exclude(item, src.parent):
continue
target = dst / item.name
if item.is_dir():
count += _sync_one_way(item, target)
else:
if not target.exists() or item.stat().st_mtime > target.stat().st_mtime:
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, target)
count += 1
# Remove files in dst that don't exist in src
if dst.is_dir():
for item in dst.iterdir():
if should_exclude(item, dst.parent):
continue
counterpart = src / item.name
if not counterpart.exists():
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
count += 1
return count
def write_stamp(dest, name, comp_type, source, mode):
version = get_version(comp_type, source)
sha = get_sha()
stamp = dest / ".spr"
stamp.write_text(
f"name={name}\n"
f"version={version}\n"
f"type={comp_type}\n"
f"sha={sha}\n"
f"mode={mode}\n"
f"updated={datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}\n"
f"source={source}\n"
)
# ── commands ─────────────────────────────────────────────────
def cmd_list(args):
registry = load_registry()
log.info("Components (%s):\n", REGISTRY_PATH)
for name, entry in registry.items():
comp_type = entry["type"]
source = SPR_HOME / entry["path"]
version = get_version(comp_type, source)
log.info(" %s %s v%-10s %s", f"{name:<25}", f"{comp_type:<5}", version, entry["path"])
def cmd_publish(args):
registry = load_registry()
comp_type, source = resolve_component(registry, args.component)
dest = Path(args.dest).resolve()
if dest.exists():
shutil.rmtree(dest)
count = copy_tree(source, dest)
write_stamp(dest, args.component, comp_type, source, "published")
version = get_version(comp_type, source)
sha = get_sha()
log.info("%s v%s (%s) -> %s (%d files)", args.component, version, sha, dest, count)
def cmd_sync(args):
registry = load_registry()
comp_type, source = resolve_component(registry, args.component)
dest = Path(args.dest).resolve()
dest.mkdir(parents=True, exist_ok=True)
fwd, rev = sync_tree(source, dest)
write_stamp(dest, args.component, comp_type, source, "synced")
log.info("%s synced (%d fwd, %d rev)", args.component, fwd, rev)
def cmd_watch(args):
registry = load_registry()
comp_type, source = resolve_component(registry, args.component)
dest = Path(args.dest).resolve()
dest.mkdir(parents=True, exist_ok=True)
interval = args.interval
# Initial sync
fwd, rev = sync_tree(source, dest)
write_stamp(dest, args.component, comp_type, source, "watching")
log.info("%s initial sync (%d fwd, %d rev)", args.component, fwd, rev)
log.info(" source: %s", source)
log.info(" dest: %s", dest)
log.info(" watching every %ds — ctrl+c to stop", interval)
try:
while True:
time.sleep(interval)
fwd, rev = sync_tree(source, dest)
if fwd or rev:
log.info("synced (%d fwd, %d rev)", fwd, rev)
except KeyboardInterrupt:
# Final sync
fwd, rev = sync_tree(source, dest)
if fwd or rev:
log.info("final sync (%d fwd, %d rev)", fwd, rev)
log.info("stopped")
def cmd_diff(args):
registry = load_registry()
_, source = resolve_component(registry, args.component)
dest = Path(args.dest).resolve()
if not dest.exists():
log.error("%s not found at %s", args.component, dest)
sys.exit(1)
result = subprocess.run(
[
"diff",
"-rq",
"--exclude=node_modules",
"--exclude=__pycache__",
"--exclude=.venv",
"--exclude=dist",
"--exclude=*.egg-info",
"--exclude=.git",
"--exclude=.spr",
"--exclude=pnpm-lock.yaml",
str(source),
str(dest),
],
capture_output=True,
text=True,
)
if result.stdout:
log.info("\n%s", result.stdout.rstrip())
else:
log.info("%s is in sync", args.component)
# ── main ─────────────────────────────────────────────────────
def main():
logging.basicConfig(
level=logging.DEBUG if "--debug" in sys.argv else logging.INFO,
format="%(levelname)s %(message)s" if "--debug" in sys.argv else ":: %(message)s",
)
sys.argv = [a for a in sys.argv if a != "--debug"]
parser = argparse.ArgumentParser(
prog="spr",
description="Soleprint component manager",
)
sub = parser.add_subparsers(dest="command")
sub.add_parser("list", help="show available components")
for cmd in ("publish", "sync", "diff"):
p = sub.add_parser(cmd)
p.add_argument("component", help="component name")
p.add_argument("dest", help="target folder path")
p = sub.add_parser("watch", help="continuous two-way sync (foreground, ctrl+c to stop)")
p.add_argument("component", help="component name")
p.add_argument("dest", help="target folder path")
p.add_argument("-i", "--interval", type=int, default=2, help="poll interval in seconds (default: 2)")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(0)
{
"list": cmd_list,
"publish": cmd_publish,
"sync": cmd_sync,
"watch": cmd_watch,
"diff": cmd_diff,
}[args.command](args)
if __name__ == "__main__":
main()