Initial commit: skill-creator skill

This commit is contained in:
2026-03-22 11:43:42 +08:00
commit 2b3bee5e67
18 changed files with 5656 additions and 0 deletions

0
scripts/__init__.py Normal file
View File

View File

@@ -0,0 +1,401 @@
#!/usr/bin/env python3
"""
Aggregate individual run results into benchmark summary statistics.
Reads grading.json files from run directories and produces:
- run_summary with mean, stddev, min, max for each metric
- delta between with_skill and without_skill configurations
Usage:
python aggregate_benchmark.py <benchmark_dir>
Example:
python aggregate_benchmark.py benchmarks/2026-01-15T10-30-00/
The script supports two directory layouts:
Workspace layout (from skill-creator iterations):
<benchmark_dir>/
└── eval-N/
├── with_skill/
│ ├── run-1/grading.json
│ └── run-2/grading.json
└── without_skill/
├── run-1/grading.json
└── run-2/grading.json
Legacy layout (with runs/ subdirectory):
<benchmark_dir>/
└── runs/
└── eval-N/
├── with_skill/
│ └── run-1/grading.json
└── without_skill/
└── run-1/grading.json
"""
import argparse
import json
import math
import sys
from datetime import datetime, timezone
from pathlib import Path
def calculate_stats(values: list[float]) -> dict:
"""Calculate mean, stddev, min, max for a list of values."""
if not values:
return {"mean": 0.0, "stddev": 0.0, "min": 0.0, "max": 0.0}
n = len(values)
mean = sum(values) / n
if n > 1:
variance = sum((x - mean) ** 2 for x in values) / (n - 1)
stddev = math.sqrt(variance)
else:
stddev = 0.0
return {
"mean": round(mean, 4),
"stddev": round(stddev, 4),
"min": round(min(values), 4),
"max": round(max(values), 4)
}
def load_run_results(benchmark_dir: Path) -> dict:
"""
Load all run results from a benchmark directory.
Returns dict keyed by config name (e.g. "with_skill"/"without_skill",
or "new_skill"/"old_skill"), each containing a list of run results.
"""
# Support both layouts: eval dirs directly under benchmark_dir, or under runs/
runs_dir = benchmark_dir / "runs"
if runs_dir.exists():
search_dir = runs_dir
elif list(benchmark_dir.glob("eval-*")):
search_dir = benchmark_dir
else:
print(f"No eval directories found in {benchmark_dir} or {benchmark_dir / 'runs'}")
return {}
results: dict[str, list] = {}
for eval_idx, eval_dir in enumerate(sorted(search_dir.glob("eval-*"))):
metadata_path = eval_dir / "eval_metadata.json"
if metadata_path.exists():
try:
with open(metadata_path) as mf:
eval_id = json.load(mf).get("eval_id", eval_idx)
except (json.JSONDecodeError, OSError):
eval_id = eval_idx
else:
try:
eval_id = int(eval_dir.name.split("-")[1])
except ValueError:
eval_id = eval_idx
# Discover config directories dynamically rather than hardcoding names
for config_dir in sorted(eval_dir.iterdir()):
if not config_dir.is_dir():
continue
# Skip non-config directories (inputs, outputs, etc.)
if not list(config_dir.glob("run-*")):
continue
config = config_dir.name
if config not in results:
results[config] = []
for run_dir in sorted(config_dir.glob("run-*")):
run_number = int(run_dir.name.split("-")[1])
grading_file = run_dir / "grading.json"
if not grading_file.exists():
print(f"Warning: grading.json not found in {run_dir}")
continue
try:
with open(grading_file) as f:
grading = json.load(f)
except json.JSONDecodeError as e:
print(f"Warning: Invalid JSON in {grading_file}: {e}")
continue
# Extract metrics
result = {
"eval_id": eval_id,
"run_number": run_number,
"pass_rate": grading.get("summary", {}).get("pass_rate", 0.0),
"passed": grading.get("summary", {}).get("passed", 0),
"failed": grading.get("summary", {}).get("failed", 0),
"total": grading.get("summary", {}).get("total", 0),
}
# Extract timing — check grading.json first, then sibling timing.json
timing = grading.get("timing", {})
result["time_seconds"] = timing.get("total_duration_seconds", 0.0)
timing_file = run_dir / "timing.json"
if result["time_seconds"] == 0.0 and timing_file.exists():
try:
with open(timing_file) as tf:
timing_data = json.load(tf)
result["time_seconds"] = timing_data.get("total_duration_seconds", 0.0)
result["tokens"] = timing_data.get("total_tokens", 0)
except json.JSONDecodeError:
pass
# Extract metrics if available
metrics = grading.get("execution_metrics", {})
result["tool_calls"] = metrics.get("total_tool_calls", 0)
if not result.get("tokens"):
result["tokens"] = metrics.get("output_chars", 0)
result["errors"] = metrics.get("errors_encountered", 0)
# Extract expectations — viewer requires fields: text, passed, evidence
raw_expectations = grading.get("expectations", [])
for exp in raw_expectations:
if "text" not in exp or "passed" not in exp:
print(f"Warning: expectation in {grading_file} missing required fields (text, passed, evidence): {exp}")
result["expectations"] = raw_expectations
# Extract notes from user_notes_summary
notes_summary = grading.get("user_notes_summary", {})
notes = []
notes.extend(notes_summary.get("uncertainties", []))
notes.extend(notes_summary.get("needs_review", []))
notes.extend(notes_summary.get("workarounds", []))
result["notes"] = notes
results[config].append(result)
return results
def aggregate_results(results: dict) -> dict:
"""
Aggregate run results into summary statistics.
Returns run_summary with stats for each configuration and delta.
"""
run_summary = {}
configs = list(results.keys())
for config in configs:
runs = results.get(config, [])
if not runs:
run_summary[config] = {
"pass_rate": {"mean": 0.0, "stddev": 0.0, "min": 0.0, "max": 0.0},
"time_seconds": {"mean": 0.0, "stddev": 0.0, "min": 0.0, "max": 0.0},
"tokens": {"mean": 0, "stddev": 0, "min": 0, "max": 0}
}
continue
pass_rates = [r["pass_rate"] for r in runs]
times = [r["time_seconds"] for r in runs]
tokens = [r.get("tokens", 0) for r in runs]
run_summary[config] = {
"pass_rate": calculate_stats(pass_rates),
"time_seconds": calculate_stats(times),
"tokens": calculate_stats(tokens)
}
# Calculate delta between the first two configs (if two exist)
if len(configs) >= 2:
primary = run_summary.get(configs[0], {})
baseline = run_summary.get(configs[1], {})
else:
primary = run_summary.get(configs[0], {}) if configs else {}
baseline = {}
delta_pass_rate = primary.get("pass_rate", {}).get("mean", 0) - baseline.get("pass_rate", {}).get("mean", 0)
delta_time = primary.get("time_seconds", {}).get("mean", 0) - baseline.get("time_seconds", {}).get("mean", 0)
delta_tokens = primary.get("tokens", {}).get("mean", 0) - baseline.get("tokens", {}).get("mean", 0)
run_summary["delta"] = {
"pass_rate": f"{delta_pass_rate:+.2f}",
"time_seconds": f"{delta_time:+.1f}",
"tokens": f"{delta_tokens:+.0f}"
}
return run_summary
def generate_benchmark(benchmark_dir: Path, skill_name: str = "", skill_path: str = "") -> dict:
"""
Generate complete benchmark.json from run results.
"""
results = load_run_results(benchmark_dir)
run_summary = aggregate_results(results)
# Build runs array for benchmark.json
runs = []
for config in results:
for result in results[config]:
runs.append({
"eval_id": result["eval_id"],
"configuration": config,
"run_number": result["run_number"],
"result": {
"pass_rate": result["pass_rate"],
"passed": result["passed"],
"failed": result["failed"],
"total": result["total"],
"time_seconds": result["time_seconds"],
"tokens": result.get("tokens", 0),
"tool_calls": result.get("tool_calls", 0),
"errors": result.get("errors", 0)
},
"expectations": result["expectations"],
"notes": result["notes"]
})
# Determine eval IDs from results
eval_ids = sorted(set(
r["eval_id"]
for config in results.values()
for r in config
))
benchmark = {
"metadata": {
"skill_name": skill_name or "<skill-name>",
"skill_path": skill_path or "<path/to/skill>",
"executor_model": "<model-name>",
"analyzer_model": "<model-name>",
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"evals_run": eval_ids,
"runs_per_configuration": 3
},
"runs": runs,
"run_summary": run_summary,
"notes": [] # To be filled by analyzer
}
return benchmark
def generate_markdown(benchmark: dict) -> str:
"""Generate human-readable benchmark.md from benchmark data."""
metadata = benchmark["metadata"]
run_summary = benchmark["run_summary"]
# Determine config names (excluding "delta")
configs = [k for k in run_summary if k != "delta"]
config_a = configs[0] if len(configs) >= 1 else "config_a"
config_b = configs[1] if len(configs) >= 2 else "config_b"
label_a = config_a.replace("_", " ").title()
label_b = config_b.replace("_", " ").title()
lines = [
f"# Skill Benchmark: {metadata['skill_name']}",
"",
f"**Model**: {metadata['executor_model']}",
f"**Date**: {metadata['timestamp']}",
f"**Evals**: {', '.join(map(str, metadata['evals_run']))} ({metadata['runs_per_configuration']} runs each per configuration)",
"",
"## Summary",
"",
f"| Metric | {label_a} | {label_b} | Delta |",
"|--------|------------|---------------|-------|",
]
a_summary = run_summary.get(config_a, {})
b_summary = run_summary.get(config_b, {})
delta = run_summary.get("delta", {})
# Format pass rate
a_pr = a_summary.get("pass_rate", {})
b_pr = b_summary.get("pass_rate", {})
lines.append(f"| Pass Rate | {a_pr.get('mean', 0)*100:.0f}% ± {a_pr.get('stddev', 0)*100:.0f}% | {b_pr.get('mean', 0)*100:.0f}% ± {b_pr.get('stddev', 0)*100:.0f}% | {delta.get('pass_rate', '')} |")
# Format time
a_time = a_summary.get("time_seconds", {})
b_time = b_summary.get("time_seconds", {})
lines.append(f"| Time | {a_time.get('mean', 0):.1f}s ± {a_time.get('stddev', 0):.1f}s | {b_time.get('mean', 0):.1f}s ± {b_time.get('stddev', 0):.1f}s | {delta.get('time_seconds', '')}s |")
# Format tokens
a_tokens = a_summary.get("tokens", {})
b_tokens = b_summary.get("tokens", {})
lines.append(f"| Tokens | {a_tokens.get('mean', 0):.0f} ± {a_tokens.get('stddev', 0):.0f} | {b_tokens.get('mean', 0):.0f} ± {b_tokens.get('stddev', 0):.0f} | {delta.get('tokens', '')} |")
# Notes section
if benchmark.get("notes"):
lines.extend([
"",
"## Notes",
""
])
for note in benchmark["notes"]:
lines.append(f"- {note}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Aggregate benchmark run results into summary statistics"
)
parser.add_argument(
"benchmark_dir",
type=Path,
help="Path to the benchmark directory"
)
parser.add_argument(
"--skill-name",
default="",
help="Name of the skill being benchmarked"
)
parser.add_argument(
"--skill-path",
default="",
help="Path to the skill being benchmarked"
)
parser.add_argument(
"--output", "-o",
type=Path,
help="Output path for benchmark.json (default: <benchmark_dir>/benchmark.json)"
)
args = parser.parse_args()
if not args.benchmark_dir.exists():
print(f"Directory not found: {args.benchmark_dir}")
sys.exit(1)
# Generate benchmark
benchmark = generate_benchmark(args.benchmark_dir, args.skill_name, args.skill_path)
# Determine output paths
output_json = args.output or (args.benchmark_dir / "benchmark.json")
output_md = output_json.with_suffix(".md")
# Write benchmark.json
with open(output_json, "w") as f:
json.dump(benchmark, f, indent=2)
print(f"Generated: {output_json}")
# Write benchmark.md
markdown = generate_markdown(benchmark)
with open(output_md, "w") as f:
f.write(markdown)
print(f"Generated: {output_md}")
# Print summary
run_summary = benchmark["run_summary"]
configs = [k for k in run_summary if k != "delta"]
delta = run_summary.get("delta", {})
print(f"\nSummary:")
for config in configs:
pr = run_summary[config]["pass_rate"]["mean"]
label = config.replace("_", " ").title()
print(f" {label}: {pr*100:.1f}% pass rate")
print(f" Delta: {delta.get('pass_rate', '')}")
if __name__ == "__main__":
main()

326
scripts/generate_report.py Normal file
View File

@@ -0,0 +1,326 @@
#!/usr/bin/env python3
"""Generate an HTML report from run_loop.py output.
Takes the JSON output from run_loop.py and generates a visual HTML report
showing each description attempt with check/x for each test case.
Distinguishes between train and test queries.
"""
import argparse
import html
import json
import sys
from pathlib import Path
def generate_html(data: dict, auto_refresh: bool = False, skill_name: str = "") -> str:
"""Generate HTML report from loop output data. If auto_refresh is True, adds a meta refresh tag."""
history = data.get("history", [])
holdout = data.get("holdout", 0)
title_prefix = html.escape(skill_name + " \u2014 ") if skill_name else ""
# Get all unique queries from train and test sets, with should_trigger info
train_queries: list[dict] = []
test_queries: list[dict] = []
if history:
for r in history[0].get("train_results", history[0].get("results", [])):
train_queries.append({"query": r["query"], "should_trigger": r.get("should_trigger", True)})
if history[0].get("test_results"):
for r in history[0].get("test_results", []):
test_queries.append({"query": r["query"], "should_trigger": r.get("should_trigger", True)})
refresh_tag = ' <meta http-equiv="refresh" content="5">\n' if auto_refresh else ""
html_parts = ["""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
""" + refresh_tag + """ <title>""" + title_prefix + """Skill Description Optimization</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Poppins:wght@500;600&family=Lora:wght@400;500&display=swap" rel="stylesheet">
<style>
body {
font-family: 'Lora', Georgia, serif;
max-width: 100%;
margin: 0 auto;
padding: 20px;
background: #faf9f5;
color: #141413;
}
h1 { font-family: 'Poppins', sans-serif; color: #141413; }
.explainer {
background: white;
padding: 15px;
border-radius: 6px;
margin-bottom: 20px;
border: 1px solid #e8e6dc;
color: #b0aea5;
font-size: 0.875rem;
line-height: 1.6;
}
.summary {
background: white;
padding: 15px;
border-radius: 6px;
margin-bottom: 20px;
border: 1px solid #e8e6dc;
}
.summary p { margin: 5px 0; }
.best { color: #788c5d; font-weight: bold; }
.table-container {
overflow-x: auto;
width: 100%;
}
table {
border-collapse: collapse;
background: white;
border: 1px solid #e8e6dc;
border-radius: 6px;
font-size: 12px;
min-width: 100%;
}
th, td {
padding: 8px;
text-align: left;
border: 1px solid #e8e6dc;
white-space: normal;
word-wrap: break-word;
}
th {
font-family: 'Poppins', sans-serif;
background: #141413;
color: #faf9f5;
font-weight: 500;
}
th.test-col {
background: #6a9bcc;
}
th.query-col { min-width: 200px; }
td.description {
font-family: monospace;
font-size: 11px;
word-wrap: break-word;
max-width: 400px;
}
td.result {
text-align: center;
font-size: 16px;
min-width: 40px;
}
td.test-result {
background: #f0f6fc;
}
.pass { color: #788c5d; }
.fail { color: #c44; }
.rate {
font-size: 9px;
color: #b0aea5;
display: block;
}
tr:hover { background: #faf9f5; }
.score {
display: inline-block;
padding: 2px 6px;
border-radius: 4px;
font-weight: bold;
font-size: 11px;
}
.score-good { background: #eef2e8; color: #788c5d; }
.score-ok { background: #fef3c7; color: #d97706; }
.score-bad { background: #fceaea; color: #c44; }
.train-label { color: #b0aea5; font-size: 10px; }
.test-label { color: #6a9bcc; font-size: 10px; font-weight: bold; }
.best-row { background: #f5f8f2; }
th.positive-col { border-bottom: 3px solid #788c5d; }
th.negative-col { border-bottom: 3px solid #c44; }
th.test-col.positive-col { border-bottom: 3px solid #788c5d; }
th.test-col.negative-col { border-bottom: 3px solid #c44; }
.legend { font-family: 'Poppins', sans-serif; display: flex; gap: 20px; margin-bottom: 10px; font-size: 13px; align-items: center; }
.legend-item { display: flex; align-items: center; gap: 6px; }
.legend-swatch { width: 16px; height: 16px; border-radius: 3px; display: inline-block; }
.swatch-positive { background: #141413; border-bottom: 3px solid #788c5d; }
.swatch-negative { background: #141413; border-bottom: 3px solid #c44; }
.swatch-test { background: #6a9bcc; }
.swatch-train { background: #141413; }
</style>
</head>
<body>
<h1>""" + title_prefix + """Skill Description Optimization</h1>
<div class="explainer">
<strong>Optimizing your skill's description.</strong> This page updates automatically as Claude tests different versions of your skill's description. Each row is an iteration — a new description attempt. The columns show test queries: green checkmarks mean the skill triggered correctly (or correctly didn't trigger), red crosses mean it got it wrong. The "Train" score shows performance on queries used to improve the description; the "Test" score shows performance on held-out queries the optimizer hasn't seen. When it's done, Claude will apply the best-performing description to your skill.
</div>
"""]
# Summary section
best_test_score = data.get('best_test_score')
best_train_score = data.get('best_train_score')
html_parts.append(f"""
<div class="summary">
<p><strong>Original:</strong> {html.escape(data.get('original_description', 'N/A'))}</p>
<p class="best"><strong>Best:</strong> {html.escape(data.get('best_description', 'N/A'))}</p>
<p><strong>Best Score:</strong> {data.get('best_score', 'N/A')} {'(test)' if best_test_score else '(train)'}</p>
<p><strong>Iterations:</strong> {data.get('iterations_run', 0)} | <strong>Train:</strong> {data.get('train_size', '?')} | <strong>Test:</strong> {data.get('test_size', '?')}</p>
</div>
""")
# Legend
html_parts.append("""
<div class="legend">
<span style="font-weight:600">Query columns:</span>
<span class="legend-item"><span class="legend-swatch swatch-positive"></span> Should trigger</span>
<span class="legend-item"><span class="legend-swatch swatch-negative"></span> Should NOT trigger</span>
<span class="legend-item"><span class="legend-swatch swatch-train"></span> Train</span>
<span class="legend-item"><span class="legend-swatch swatch-test"></span> Test</span>
</div>
""")
# Table header
html_parts.append("""
<div class="table-container">
<table>
<thead>
<tr>
<th>Iter</th>
<th>Train</th>
<th>Test</th>
<th class="query-col">Description</th>
""")
# Add column headers for train queries
for qinfo in train_queries:
polarity = "positive-col" if qinfo["should_trigger"] else "negative-col"
html_parts.append(f' <th class="{polarity}">{html.escape(qinfo["query"])}</th>\n')
# Add column headers for test queries (different color)
for qinfo in test_queries:
polarity = "positive-col" if qinfo["should_trigger"] else "negative-col"
html_parts.append(f' <th class="test-col {polarity}">{html.escape(qinfo["query"])}</th>\n')
html_parts.append(""" </tr>
</thead>
<tbody>
""")
# Find best iteration for highlighting
if test_queries:
best_iter = max(history, key=lambda h: h.get("test_passed") or 0).get("iteration")
else:
best_iter = max(history, key=lambda h: h.get("train_passed", h.get("passed", 0))).get("iteration")
# Add rows for each iteration
for h in history:
iteration = h.get("iteration", "?")
train_passed = h.get("train_passed", h.get("passed", 0))
train_total = h.get("train_total", h.get("total", 0))
test_passed = h.get("test_passed")
test_total = h.get("test_total")
description = h.get("description", "")
train_results = h.get("train_results", h.get("results", []))
test_results = h.get("test_results", [])
# Create lookups for results by query
train_by_query = {r["query"]: r for r in train_results}
test_by_query = {r["query"]: r for r in test_results} if test_results else {}
# Compute aggregate correct/total runs across all retries
def aggregate_runs(results: list[dict]) -> tuple[int, int]:
correct = 0
total = 0
for r in results:
runs = r.get("runs", 0)
triggers = r.get("triggers", 0)
total += runs
if r.get("should_trigger", True):
correct += triggers
else:
correct += runs - triggers
return correct, total
train_correct, train_runs = aggregate_runs(train_results)
test_correct, test_runs = aggregate_runs(test_results)
# Determine score classes
def score_class(correct: int, total: int) -> str:
if total > 0:
ratio = correct / total
if ratio >= 0.8:
return "score-good"
elif ratio >= 0.5:
return "score-ok"
return "score-bad"
train_class = score_class(train_correct, train_runs)
test_class = score_class(test_correct, test_runs)
row_class = "best-row" if iteration == best_iter else ""
html_parts.append(f""" <tr class="{row_class}">
<td>{iteration}</td>
<td><span class="score {train_class}">{train_correct}/{train_runs}</span></td>
<td><span class="score {test_class}">{test_correct}/{test_runs}</span></td>
<td class="description">{html.escape(description)}</td>
""")
# Add result for each train query
for qinfo in train_queries:
r = train_by_query.get(qinfo["query"], {})
did_pass = r.get("pass", False)
triggers = r.get("triggers", 0)
runs = r.get("runs", 0)
icon = "" if did_pass else ""
css_class = "pass" if did_pass else "fail"
html_parts.append(f' <td class="result {css_class}">{icon}<span class="rate">{triggers}/{runs}</span></td>\n')
# Add result for each test query (with different background)
for qinfo in test_queries:
r = test_by_query.get(qinfo["query"], {})
did_pass = r.get("pass", False)
triggers = r.get("triggers", 0)
runs = r.get("runs", 0)
icon = "" if did_pass else ""
css_class = "pass" if did_pass else "fail"
html_parts.append(f' <td class="result test-result {css_class}">{icon}<span class="rate">{triggers}/{runs}</span></td>\n')
html_parts.append(" </tr>\n")
html_parts.append(""" </tbody>
</table>
</div>
""")
html_parts.append("""
</body>
</html>
""")
return "".join(html_parts)
def main():
parser = argparse.ArgumentParser(description="Generate HTML report from run_loop output")
parser.add_argument("input", help="Path to JSON output from run_loop.py (or - for stdin)")
parser.add_argument("-o", "--output", default=None, help="Output HTML file (default: stdout)")
parser.add_argument("--skill-name", default="", help="Skill name to include in the report title")
args = parser.parse_args()
if args.input == "-":
data = json.load(sys.stdin)
else:
data = json.loads(Path(args.input).read_text())
html_output = generate_html(data, skill_name=args.skill_name)
if args.output:
Path(args.output).write_text(html_output)
print(f"Report written to {args.output}", file=sys.stderr)
else:
print(html_output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""Improve a skill description based on eval results.
Takes eval results (from run_eval.py) and generates an improved description
by calling `claude -p` as a subprocess (same auth pattern as run_eval.py —
uses the session's Claude Code auth, no separate ANTHROPIC_API_KEY needed).
"""
import argparse
import json
import os
import re
import subprocess
import sys
from pathlib import Path
from scripts.utils import parse_skill_md
def _call_claude(prompt: str, model: str | None, timeout: int = 300) -> str:
"""Run `claude -p` with the prompt on stdin and return the text response.
Prompt goes over stdin (not argv) because it embeds the full SKILL.md
body and can easily exceed comfortable argv length.
"""
cmd = ["claude", "-p", "--output-format", "text"]
if model:
cmd.extend(["--model", model])
# Remove CLAUDECODE env var to allow nesting claude -p inside a
# Claude Code session. The guard is for interactive terminal conflicts;
# programmatic subprocess usage is safe. Same pattern as run_eval.py.
env = {k: v for k, v in os.environ.items() if k != "CLAUDECODE"}
result = subprocess.run(
cmd,
input=prompt,
capture_output=True,
text=True,
env=env,
timeout=timeout,
)
if result.returncode != 0:
raise RuntimeError(
f"claude -p exited {result.returncode}\nstderr: {result.stderr}"
)
return result.stdout
def improve_description(
skill_name: str,
skill_content: str,
current_description: str,
eval_results: dict,
history: list[dict],
model: str,
test_results: dict | None = None,
log_dir: Path | None = None,
iteration: int | None = None,
) -> str:
"""Call Claude to improve the description based on eval results."""
failed_triggers = [
r for r in eval_results["results"]
if r["should_trigger"] and not r["pass"]
]
false_triggers = [
r for r in eval_results["results"]
if not r["should_trigger"] and not r["pass"]
]
# Build scores summary
train_score = f"{eval_results['summary']['passed']}/{eval_results['summary']['total']}"
if test_results:
test_score = f"{test_results['summary']['passed']}/{test_results['summary']['total']}"
scores_summary = f"Train: {train_score}, Test: {test_score}"
else:
scores_summary = f"Train: {train_score}"
prompt = f"""You are optimizing a skill description for a Claude Code skill called "{skill_name}". A "skill" is sort of like a prompt, but with progressive disclosure -- there's a title and description that Claude sees when deciding whether to use the skill, and then if it does use the skill, it reads the .md file which has lots more details and potentially links to other resources in the skill folder like helper files and scripts and additional documentation or examples.
The description appears in Claude's "available_skills" list. When a user sends a query, Claude decides whether to invoke the skill based solely on the title and on this description. Your goal is to write a description that triggers for relevant queries, and doesn't trigger for irrelevant ones.
Here's the current description:
<current_description>
"{current_description}"
</current_description>
Current scores ({scores_summary}):
<scores_summary>
"""
if failed_triggers:
prompt += "FAILED TO TRIGGER (should have triggered but didn't):\n"
for r in failed_triggers:
prompt += f' - "{r["query"]}" (triggered {r["triggers"]}/{r["runs"]} times)\n'
prompt += "\n"
if false_triggers:
prompt += "FALSE TRIGGERS (triggered but shouldn't have):\n"
for r in false_triggers:
prompt += f' - "{r["query"]}" (triggered {r["triggers"]}/{r["runs"]} times)\n'
prompt += "\n"
if history:
prompt += "PREVIOUS ATTEMPTS (do NOT repeat these — try something structurally different):\n\n"
for h in history:
train_s = f"{h.get('train_passed', h.get('passed', 0))}/{h.get('train_total', h.get('total', 0))}"
test_s = f"{h.get('test_passed', '?')}/{h.get('test_total', '?')}" if h.get('test_passed') is not None else None
score_str = f"train={train_s}" + (f", test={test_s}" if test_s else "")
prompt += f'<attempt {score_str}>\n'
prompt += f'Description: "{h["description"]}"\n'
if "results" in h:
prompt += "Train results:\n"
for r in h["results"]:
status = "PASS" if r["pass"] else "FAIL"
prompt += f' [{status}] "{r["query"][:80]}" (triggered {r["triggers"]}/{r["runs"]})\n'
if h.get("note"):
prompt += f'Note: {h["note"]}\n'
prompt += "</attempt>\n\n"
prompt += f"""</scores_summary>
Skill content (for context on what the skill does):
<skill_content>
{skill_content}
</skill_content>
Based on the failures, write a new and improved description that is more likely to trigger correctly. When I say "based on the failures", it's a bit of a tricky line to walk because we don't want to overfit to the specific cases you're seeing. So what I DON'T want you to do is produce an ever-expanding list of specific queries that this skill should or shouldn't trigger for. Instead, try to generalize from the failures to broader categories of user intent and situations where this skill would be useful or not useful. The reason for this is twofold:
1. Avoid overfitting
2. The list might get loooong and it's injected into ALL queries and there might be a lot of skills, so we don't want to blow too much space on any given description.
Concretely, your description should not be more than about 100-200 words, even if that comes at the cost of accuracy. There is a hard limit of 1024 characters — descriptions over that will be truncated, so stay comfortably under it.
Here are some tips that we've found to work well in writing these descriptions:
- The skill should be phrased in the imperative -- "Use this skill for" rather than "this skill does"
- The skill description should focus on the user's intent, what they are trying to achieve, vs. the implementation details of how the skill works.
- The description competes with other skills for Claude's attention — make it distinctive and immediately recognizable.
- If you're getting lots of failures after repeated attempts, change things up. Try different sentence structures or wordings.
I'd encourage you to be creative and mix up the style in different iterations since you'll have multiple opportunities to try different approaches and we'll just grab the highest-scoring one at the end.
Please respond with only the new description text in <new_description> tags, nothing else."""
text = _call_claude(prompt, model)
match = re.search(r"<new_description>(.*?)</new_description>", text, re.DOTALL)
description = match.group(1).strip().strip('"') if match else text.strip().strip('"')
transcript: dict = {
"iteration": iteration,
"prompt": prompt,
"response": text,
"parsed_description": description,
"char_count": len(description),
"over_limit": len(description) > 1024,
}
# Safety net: the prompt already states the 1024-char hard limit, but if
# the model blew past it anyway, make one fresh single-turn call that
# quotes the too-long version and asks for a shorter rewrite. (The old
# SDK path did this as a true multi-turn; `claude -p` is one-shot, so we
# inline the prior output into the new prompt instead.)
if len(description) > 1024:
shorten_prompt = (
f"{prompt}\n\n"
f"---\n\n"
f"A previous attempt produced this description, which at "
f"{len(description)} characters is over the 1024-character hard limit:\n\n"
f'"{description}"\n\n'
f"Rewrite it to be under 1024 characters while keeping the most "
f"important trigger words and intent coverage. Respond with only "
f"the new description in <new_description> tags."
)
shorten_text = _call_claude(shorten_prompt, model)
match = re.search(r"<new_description>(.*?)</new_description>", shorten_text, re.DOTALL)
shortened = match.group(1).strip().strip('"') if match else shorten_text.strip().strip('"')
transcript["rewrite_prompt"] = shorten_prompt
transcript["rewrite_response"] = shorten_text
transcript["rewrite_description"] = shortened
transcript["rewrite_char_count"] = len(shortened)
description = shortened
transcript["final_description"] = description
if log_dir:
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"improve_iter_{iteration or 'unknown'}.json"
log_file.write_text(json.dumps(transcript, indent=2))
return description
def main():
parser = argparse.ArgumentParser(description="Improve a skill description based on eval results")
parser.add_argument("--eval-results", required=True, help="Path to eval results JSON (from run_eval.py)")
parser.add_argument("--skill-path", required=True, help="Path to skill directory")
parser.add_argument("--history", default=None, help="Path to history JSON (previous attempts)")
parser.add_argument("--model", required=True, help="Model for improvement")
parser.add_argument("--verbose", action="store_true", help="Print thinking to stderr")
args = parser.parse_args()
skill_path = Path(args.skill_path)
if not (skill_path / "SKILL.md").exists():
print(f"Error: No SKILL.md found at {skill_path}", file=sys.stderr)
sys.exit(1)
eval_results = json.loads(Path(args.eval_results).read_text())
history = []
if args.history:
history = json.loads(Path(args.history).read_text())
name, _, content = parse_skill_md(skill_path)
current_description = eval_results["description"]
if args.verbose:
print(f"Current: {current_description}", file=sys.stderr)
print(f"Score: {eval_results['summary']['passed']}/{eval_results['summary']['total']}", file=sys.stderr)
new_description = improve_description(
skill_name=name,
skill_content=content,
current_description=current_description,
eval_results=eval_results,
history=history,
model=args.model,
)
if args.verbose:
print(f"Improved: {new_description}", file=sys.stderr)
# Output as JSON with both the new description and updated history
output = {
"description": new_description,
"history": history + [{
"description": current_description,
"passed": eval_results["summary"]["passed"],
"failed": eval_results["summary"]["failed"],
"total": eval_results["summary"]["total"],
"results": eval_results["results"],
}],
}
print(json.dumps(output, indent=2))
if __name__ == "__main__":
main()

136
scripts/package_skill.py Normal file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
Skill Packager - Creates a distributable .skill file of a skill folder
Usage:
python utils/package_skill.py <path/to/skill-folder> [output-directory]
Example:
python utils/package_skill.py skills/public/my-skill
python utils/package_skill.py skills/public/my-skill ./dist
"""
import fnmatch
import sys
import zipfile
from pathlib import Path
from scripts.quick_validate import validate_skill
# Patterns to exclude when packaging skills.
EXCLUDE_DIRS = {"__pycache__", "node_modules"}
EXCLUDE_GLOBS = {"*.pyc"}
EXCLUDE_FILES = {".DS_Store"}
# Directories excluded only at the skill root (not when nested deeper).
ROOT_EXCLUDE_DIRS = {"evals"}
def should_exclude(rel_path: Path) -> bool:
"""Check if a path should be excluded from packaging."""
parts = rel_path.parts
if any(part in EXCLUDE_DIRS for part in parts):
return True
# rel_path is relative to skill_path.parent, so parts[0] is the skill
# folder name and parts[1] (if present) is the first subdir.
if len(parts) > 1 and parts[1] in ROOT_EXCLUDE_DIRS:
return True
name = rel_path.name
if name in EXCLUDE_FILES:
return True
return any(fnmatch.fnmatch(name, pat) for pat in EXCLUDE_GLOBS)
def package_skill(skill_path, output_dir=None):
"""
Package a skill folder into a .skill file.
Args:
skill_path: Path to the skill folder
output_dir: Optional output directory for the .skill file (defaults to current directory)
Returns:
Path to the created .skill file, or None if error
"""
skill_path = Path(skill_path).resolve()
# Validate skill folder exists
if not skill_path.exists():
print(f"❌ Error: Skill folder not found: {skill_path}")
return None
if not skill_path.is_dir():
print(f"❌ Error: Path is not a directory: {skill_path}")
return None
# Validate SKILL.md exists
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
print(f"❌ Error: SKILL.md not found in {skill_path}")
return None
# Run validation before packaging
print("🔍 Validating skill...")
valid, message = validate_skill(skill_path)
if not valid:
print(f"❌ Validation failed: {message}")
print(" Please fix the validation errors before packaging.")
return None
print(f"{message}\n")
# Determine output location
skill_name = skill_path.name
if output_dir:
output_path = Path(output_dir).resolve()
output_path.mkdir(parents=True, exist_ok=True)
else:
output_path = Path.cwd()
skill_filename = output_path / f"{skill_name}.skill"
# Create the .skill file (zip format)
try:
with zipfile.ZipFile(skill_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Walk through the skill directory, excluding build artifacts
for file_path in skill_path.rglob('*'):
if not file_path.is_file():
continue
arcname = file_path.relative_to(skill_path.parent)
if should_exclude(arcname):
print(f" Skipped: {arcname}")
continue
zipf.write(file_path, arcname)
print(f" Added: {arcname}")
print(f"\n✅ Successfully packaged skill to: {skill_filename}")
return skill_filename
except Exception as e:
print(f"❌ Error creating .skill file: {e}")
return None
def main():
if len(sys.argv) < 2:
print("Usage: python utils/package_skill.py <path/to/skill-folder> [output-directory]")
print("\nExample:")
print(" python utils/package_skill.py skills/public/my-skill")
print(" python utils/package_skill.py skills/public/my-skill ./dist")
sys.exit(1)
skill_path = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else None
print(f"📦 Packaging skill: {skill_path}")
if output_dir:
print(f" Output directory: {output_dir}")
print()
result = package_skill(skill_path, output_dir)
if result:
sys.exit(0)
else:
sys.exit(1)
if __name__ == "__main__":
main()

103
scripts/quick_validate.py Normal file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
Quick validation script for skills - minimal version
"""
import sys
import os
import re
import yaml
from pathlib import Path
def validate_skill(skill_path):
"""Basic validation of a skill"""
skill_path = Path(skill_path)
# Check SKILL.md exists
skill_md = skill_path / 'SKILL.md'
if not skill_md.exists():
return False, "SKILL.md not found"
# Read and validate frontmatter
content = skill_md.read_text()
if not content.startswith('---'):
return False, "No YAML frontmatter found"
# Extract frontmatter
match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
if not match:
return False, "Invalid frontmatter format"
frontmatter_text = match.group(1)
# Parse YAML frontmatter
try:
frontmatter = yaml.safe_load(frontmatter_text)
if not isinstance(frontmatter, dict):
return False, "Frontmatter must be a YAML dictionary"
except yaml.YAMLError as e:
return False, f"Invalid YAML in frontmatter: {e}"
# Define allowed properties
ALLOWED_PROPERTIES = {'name', 'description', 'license', 'allowed-tools', 'metadata', 'compatibility'}
# Check for unexpected properties (excluding nested keys under metadata)
unexpected_keys = set(frontmatter.keys()) - ALLOWED_PROPERTIES
if unexpected_keys:
return False, (
f"Unexpected key(s) in SKILL.md frontmatter: {', '.join(sorted(unexpected_keys))}. "
f"Allowed properties are: {', '.join(sorted(ALLOWED_PROPERTIES))}"
)
# Check required fields
if 'name' not in frontmatter:
return False, "Missing 'name' in frontmatter"
if 'description' not in frontmatter:
return False, "Missing 'description' in frontmatter"
# Extract name for validation
name = frontmatter.get('name', '')
if not isinstance(name, str):
return False, f"Name must be a string, got {type(name).__name__}"
name = name.strip()
if name:
# Check naming convention (kebab-case: lowercase with hyphens)
if not re.match(r'^[a-z0-9-]+$', name):
return False, f"Name '{name}' should be kebab-case (lowercase letters, digits, and hyphens only)"
if name.startswith('-') or name.endswith('-') or '--' in name:
return False, f"Name '{name}' cannot start/end with hyphen or contain consecutive hyphens"
# Check name length (max 64 characters per spec)
if len(name) > 64:
return False, f"Name is too long ({len(name)} characters). Maximum is 64 characters."
# Extract and validate description
description = frontmatter.get('description', '')
if not isinstance(description, str):
return False, f"Description must be a string, got {type(description).__name__}"
description = description.strip()
if description:
# Check for angle brackets
if '<' in description or '>' in description:
return False, "Description cannot contain angle brackets (< or >)"
# Check description length (max 1024 characters per spec)
if len(description) > 1024:
return False, f"Description is too long ({len(description)} characters). Maximum is 1024 characters."
# Validate compatibility field if present (optional)
compatibility = frontmatter.get('compatibility', '')
if compatibility:
if not isinstance(compatibility, str):
return False, f"Compatibility must be a string, got {type(compatibility).__name__}"
if len(compatibility) > 500:
return False, f"Compatibility is too long ({len(compatibility)} characters). Maximum is 500 characters."
return True, "Skill is valid!"
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python quick_validate.py <skill_directory>")
sys.exit(1)
valid, message = validate_skill(sys.argv[1])
print(message)
sys.exit(0 if valid else 1)

310
scripts/run_eval.py Normal file
View File

@@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""Run trigger evaluation for a skill description.
Tests whether a skill's description causes Claude to trigger (read the skill)
for a set of queries. Outputs results as JSON.
"""
import argparse
import json
import os
import select
import subprocess
import sys
import time
import uuid
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from scripts.utils import parse_skill_md
def find_project_root() -> Path:
"""Find the project root by walking up from cwd looking for .claude/.
Mimics how Claude Code discovers its project root, so the command file
we create ends up where claude -p will look for it.
"""
current = Path.cwd()
for parent in [current, *current.parents]:
if (parent / ".claude").is_dir():
return parent
return current
def run_single_query(
query: str,
skill_name: str,
skill_description: str,
timeout: int,
project_root: str,
model: str | None = None,
) -> bool:
"""Run a single query and return whether the skill was triggered.
Creates a command file in .claude/commands/ so it appears in Claude's
available_skills list, then runs `claude -p` with the raw query.
Uses --include-partial-messages to detect triggering early from
stream events (content_block_start) rather than waiting for the
full assistant message, which only arrives after tool execution.
"""
unique_id = uuid.uuid4().hex[:8]
clean_name = f"{skill_name}-skill-{unique_id}"
project_commands_dir = Path(project_root) / ".claude" / "commands"
command_file = project_commands_dir / f"{clean_name}.md"
try:
project_commands_dir.mkdir(parents=True, exist_ok=True)
# Use YAML block scalar to avoid breaking on quotes in description
indented_desc = "\n ".join(skill_description.split("\n"))
command_content = (
f"---\n"
f"description: |\n"
f" {indented_desc}\n"
f"---\n\n"
f"# {skill_name}\n\n"
f"This skill handles: {skill_description}\n"
)
command_file.write_text(command_content)
cmd = [
"claude",
"-p", query,
"--output-format", "stream-json",
"--verbose",
"--include-partial-messages",
]
if model:
cmd.extend(["--model", model])
# Remove CLAUDECODE env var to allow nesting claude -p inside a
# Claude Code session. The guard is for interactive terminal conflicts;
# programmatic subprocess usage is safe.
env = {k: v for k, v in os.environ.items() if k != "CLAUDECODE"}
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=project_root,
env=env,
)
triggered = False
start_time = time.time()
buffer = ""
# Track state for stream event detection
pending_tool_name = None
accumulated_json = ""
try:
while time.time() - start_time < timeout:
if process.poll() is not None:
remaining = process.stdout.read()
if remaining:
buffer += remaining.decode("utf-8", errors="replace")
break
ready, _, _ = select.select([process.stdout], [], [], 1.0)
if not ready:
continue
chunk = os.read(process.stdout.fileno(), 8192)
if not chunk:
break
buffer += chunk.decode("utf-8", errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
# Early detection via stream events
if event.get("type") == "stream_event":
se = event.get("event", {})
se_type = se.get("type", "")
if se_type == "content_block_start":
cb = se.get("content_block", {})
if cb.get("type") == "tool_use":
tool_name = cb.get("name", "")
if tool_name in ("Skill", "Read"):
pending_tool_name = tool_name
accumulated_json = ""
else:
return False
elif se_type == "content_block_delta" and pending_tool_name:
delta = se.get("delta", {})
if delta.get("type") == "input_json_delta":
accumulated_json += delta.get("partial_json", "")
if clean_name in accumulated_json:
return True
elif se_type in ("content_block_stop", "message_stop"):
if pending_tool_name:
return clean_name in accumulated_json
if se_type == "message_stop":
return False
# Fallback: full assistant message
elif event.get("type") == "assistant":
message = event.get("message", {})
for content_item in message.get("content", []):
if content_item.get("type") != "tool_use":
continue
tool_name = content_item.get("name", "")
tool_input = content_item.get("input", {})
if tool_name == "Skill" and clean_name in tool_input.get("skill", ""):
triggered = True
elif tool_name == "Read" and clean_name in tool_input.get("file_path", ""):
triggered = True
return triggered
elif event.get("type") == "result":
return triggered
finally:
# Clean up process on any exit path (return, exception, timeout)
if process.poll() is None:
process.kill()
process.wait()
return triggered
finally:
if command_file.exists():
command_file.unlink()
def run_eval(
eval_set: list[dict],
skill_name: str,
description: str,
num_workers: int,
timeout: int,
project_root: Path,
runs_per_query: int = 1,
trigger_threshold: float = 0.5,
model: str | None = None,
) -> dict:
"""Run the full eval set and return results."""
results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
future_to_info = {}
for item in eval_set:
for run_idx in range(runs_per_query):
future = executor.submit(
run_single_query,
item["query"],
skill_name,
description,
timeout,
str(project_root),
model,
)
future_to_info[future] = (item, run_idx)
query_triggers: dict[str, list[bool]] = {}
query_items: dict[str, dict] = {}
for future in as_completed(future_to_info):
item, _ = future_to_info[future]
query = item["query"]
query_items[query] = item
if query not in query_triggers:
query_triggers[query] = []
try:
query_triggers[query].append(future.result())
except Exception as e:
print(f"Warning: query failed: {e}", file=sys.stderr)
query_triggers[query].append(False)
for query, triggers in query_triggers.items():
item = query_items[query]
trigger_rate = sum(triggers) / len(triggers)
should_trigger = item["should_trigger"]
if should_trigger:
did_pass = trigger_rate >= trigger_threshold
else:
did_pass = trigger_rate < trigger_threshold
results.append({
"query": query,
"should_trigger": should_trigger,
"trigger_rate": trigger_rate,
"triggers": sum(triggers),
"runs": len(triggers),
"pass": did_pass,
})
passed = sum(1 for r in results if r["pass"])
total = len(results)
return {
"skill_name": skill_name,
"description": description,
"results": results,
"summary": {
"total": total,
"passed": passed,
"failed": total - passed,
},
}
def main():
parser = argparse.ArgumentParser(description="Run trigger evaluation for a skill description")
parser.add_argument("--eval-set", required=True, help="Path to eval set JSON file")
parser.add_argument("--skill-path", required=True, help="Path to skill directory")
parser.add_argument("--description", default=None, help="Override description to test")
parser.add_argument("--num-workers", type=int, default=10, help="Number of parallel workers")
parser.add_argument("--timeout", type=int, default=30, help="Timeout per query in seconds")
parser.add_argument("--runs-per-query", type=int, default=3, help="Number of runs per query")
parser.add_argument("--trigger-threshold", type=float, default=0.5, help="Trigger rate threshold")
parser.add_argument("--model", default=None, help="Model to use for claude -p (default: user's configured model)")
parser.add_argument("--verbose", action="store_true", help="Print progress to stderr")
args = parser.parse_args()
eval_set = json.loads(Path(args.eval_set).read_text())
skill_path = Path(args.skill_path)
if not (skill_path / "SKILL.md").exists():
print(f"Error: No SKILL.md found at {skill_path}", file=sys.stderr)
sys.exit(1)
name, original_description, content = parse_skill_md(skill_path)
description = args.description or original_description
project_root = find_project_root()
if args.verbose:
print(f"Evaluating: {description}", file=sys.stderr)
output = run_eval(
eval_set=eval_set,
skill_name=name,
description=description,
num_workers=args.num_workers,
timeout=args.timeout,
project_root=project_root,
runs_per_query=args.runs_per_query,
trigger_threshold=args.trigger_threshold,
model=args.model,
)
if args.verbose:
summary = output["summary"]
print(f"Results: {summary['passed']}/{summary['total']} passed", file=sys.stderr)
for r in output["results"]:
status = "PASS" if r["pass"] else "FAIL"
rate_str = f"{r['triggers']}/{r['runs']}"
print(f" [{status}] rate={rate_str} expected={r['should_trigger']}: {r['query'][:70]}", file=sys.stderr)
print(json.dumps(output, indent=2))
if __name__ == "__main__":
main()

328
scripts/run_loop.py Normal file
View File

@@ -0,0 +1,328 @@
#!/usr/bin/env python3
"""Run the eval + improve loop until all pass or max iterations reached.
Combines run_eval.py and improve_description.py in a loop, tracking history
and returning the best description found. Supports train/test split to prevent
overfitting.
"""
import argparse
import json
import random
import sys
import tempfile
import time
import webbrowser
from pathlib import Path
from scripts.generate_report import generate_html
from scripts.improve_description import improve_description
from scripts.run_eval import find_project_root, run_eval
from scripts.utils import parse_skill_md
def split_eval_set(eval_set: list[dict], holdout: float, seed: int = 42) -> tuple[list[dict], list[dict]]:
"""Split eval set into train and test sets, stratified by should_trigger."""
random.seed(seed)
# Separate by should_trigger
trigger = [e for e in eval_set if e["should_trigger"]]
no_trigger = [e for e in eval_set if not e["should_trigger"]]
# Shuffle each group
random.shuffle(trigger)
random.shuffle(no_trigger)
# Calculate split points
n_trigger_test = max(1, int(len(trigger) * holdout))
n_no_trigger_test = max(1, int(len(no_trigger) * holdout))
# Split
test_set = trigger[:n_trigger_test] + no_trigger[:n_no_trigger_test]
train_set = trigger[n_trigger_test:] + no_trigger[n_no_trigger_test:]
return train_set, test_set
def run_loop(
eval_set: list[dict],
skill_path: Path,
description_override: str | None,
num_workers: int,
timeout: int,
max_iterations: int,
runs_per_query: int,
trigger_threshold: float,
holdout: float,
model: str,
verbose: bool,
live_report_path: Path | None = None,
log_dir: Path | None = None,
) -> dict:
"""Run the eval + improvement loop."""
project_root = find_project_root()
name, original_description, content = parse_skill_md(skill_path)
current_description = description_override or original_description
# Split into train/test if holdout > 0
if holdout > 0:
train_set, test_set = split_eval_set(eval_set, holdout)
if verbose:
print(f"Split: {len(train_set)} train, {len(test_set)} test (holdout={holdout})", file=sys.stderr)
else:
train_set = eval_set
test_set = []
history = []
exit_reason = "unknown"
for iteration in range(1, max_iterations + 1):
if verbose:
print(f"\n{'='*60}", file=sys.stderr)
print(f"Iteration {iteration}/{max_iterations}", file=sys.stderr)
print(f"Description: {current_description}", file=sys.stderr)
print(f"{'='*60}", file=sys.stderr)
# Evaluate train + test together in one batch for parallelism
all_queries = train_set + test_set
t0 = time.time()
all_results = run_eval(
eval_set=all_queries,
skill_name=name,
description=current_description,
num_workers=num_workers,
timeout=timeout,
project_root=project_root,
runs_per_query=runs_per_query,
trigger_threshold=trigger_threshold,
model=model,
)
eval_elapsed = time.time() - t0
# Split results back into train/test by matching queries
train_queries_set = {q["query"] for q in train_set}
train_result_list = [r for r in all_results["results"] if r["query"] in train_queries_set]
test_result_list = [r for r in all_results["results"] if r["query"] not in train_queries_set]
train_passed = sum(1 for r in train_result_list if r["pass"])
train_total = len(train_result_list)
train_summary = {"passed": train_passed, "failed": train_total - train_passed, "total": train_total}
train_results = {"results": train_result_list, "summary": train_summary}
if test_set:
test_passed = sum(1 for r in test_result_list if r["pass"])
test_total = len(test_result_list)
test_summary = {"passed": test_passed, "failed": test_total - test_passed, "total": test_total}
test_results = {"results": test_result_list, "summary": test_summary}
else:
test_results = None
test_summary = None
history.append({
"iteration": iteration,
"description": current_description,
"train_passed": train_summary["passed"],
"train_failed": train_summary["failed"],
"train_total": train_summary["total"],
"train_results": train_results["results"],
"test_passed": test_summary["passed"] if test_summary else None,
"test_failed": test_summary["failed"] if test_summary else None,
"test_total": test_summary["total"] if test_summary else None,
"test_results": test_results["results"] if test_results else None,
# For backward compat with report generator
"passed": train_summary["passed"],
"failed": train_summary["failed"],
"total": train_summary["total"],
"results": train_results["results"],
})
# Write live report if path provided
if live_report_path:
partial_output = {
"original_description": original_description,
"best_description": current_description,
"best_score": "in progress",
"iterations_run": len(history),
"holdout": holdout,
"train_size": len(train_set),
"test_size": len(test_set),
"history": history,
}
live_report_path.write_text(generate_html(partial_output, auto_refresh=True, skill_name=name))
if verbose:
def print_eval_stats(label, results, elapsed):
pos = [r for r in results if r["should_trigger"]]
neg = [r for r in results if not r["should_trigger"]]
tp = sum(r["triggers"] for r in pos)
pos_runs = sum(r["runs"] for r in pos)
fn = pos_runs - tp
fp = sum(r["triggers"] for r in neg)
neg_runs = sum(r["runs"] for r in neg)
tn = neg_runs - fp
total = tp + tn + fp + fn
precision = tp / (tp + fp) if (tp + fp) > 0 else 1.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 1.0
accuracy = (tp + tn) / total if total > 0 else 0.0
print(f"{label}: {tp+tn}/{total} correct, precision={precision:.0%} recall={recall:.0%} accuracy={accuracy:.0%} ({elapsed:.1f}s)", file=sys.stderr)
for r in results:
status = "PASS" if r["pass"] else "FAIL"
rate_str = f"{r['triggers']}/{r['runs']}"
print(f" [{status}] rate={rate_str} expected={r['should_trigger']}: {r['query'][:60]}", file=sys.stderr)
print_eval_stats("Train", train_results["results"], eval_elapsed)
if test_summary:
print_eval_stats("Test ", test_results["results"], 0)
if train_summary["failed"] == 0:
exit_reason = f"all_passed (iteration {iteration})"
if verbose:
print(f"\nAll train queries passed on iteration {iteration}!", file=sys.stderr)
break
if iteration == max_iterations:
exit_reason = f"max_iterations ({max_iterations})"
if verbose:
print(f"\nMax iterations reached ({max_iterations}).", file=sys.stderr)
break
# Improve the description based on train results
if verbose:
print(f"\nImproving description...", file=sys.stderr)
t0 = time.time()
# Strip test scores from history so improvement model can't see them
blinded_history = [
{k: v for k, v in h.items() if not k.startswith("test_")}
for h in history
]
new_description = improve_description(
skill_name=name,
skill_content=content,
current_description=current_description,
eval_results=train_results,
history=blinded_history,
model=model,
log_dir=log_dir,
iteration=iteration,
)
improve_elapsed = time.time() - t0
if verbose:
print(f"Proposed ({improve_elapsed:.1f}s): {new_description}", file=sys.stderr)
current_description = new_description
# Find the best iteration by TEST score (or train if no test set)
if test_set:
best = max(history, key=lambda h: h["test_passed"] or 0)
best_score = f"{best['test_passed']}/{best['test_total']}"
else:
best = max(history, key=lambda h: h["train_passed"])
best_score = f"{best['train_passed']}/{best['train_total']}"
if verbose:
print(f"\nExit reason: {exit_reason}", file=sys.stderr)
print(f"Best score: {best_score} (iteration {best['iteration']})", file=sys.stderr)
return {
"exit_reason": exit_reason,
"original_description": original_description,
"best_description": best["description"],
"best_score": best_score,
"best_train_score": f"{best['train_passed']}/{best['train_total']}",
"best_test_score": f"{best['test_passed']}/{best['test_total']}" if test_set else None,
"final_description": current_description,
"iterations_run": len(history),
"holdout": holdout,
"train_size": len(train_set),
"test_size": len(test_set),
"history": history,
}
def main():
parser = argparse.ArgumentParser(description="Run eval + improve loop")
parser.add_argument("--eval-set", required=True, help="Path to eval set JSON file")
parser.add_argument("--skill-path", required=True, help="Path to skill directory")
parser.add_argument("--description", default=None, help="Override starting description")
parser.add_argument("--num-workers", type=int, default=10, help="Number of parallel workers")
parser.add_argument("--timeout", type=int, default=30, help="Timeout per query in seconds")
parser.add_argument("--max-iterations", type=int, default=5, help="Max improvement iterations")
parser.add_argument("--runs-per-query", type=int, default=3, help="Number of runs per query")
parser.add_argument("--trigger-threshold", type=float, default=0.5, help="Trigger rate threshold")
parser.add_argument("--holdout", type=float, default=0.4, help="Fraction of eval set to hold out for testing (0 to disable)")
parser.add_argument("--model", required=True, help="Model for improvement")
parser.add_argument("--verbose", action="store_true", help="Print progress to stderr")
parser.add_argument("--report", default="auto", help="Generate HTML report at this path (default: 'auto' for temp file, 'none' to disable)")
parser.add_argument("--results-dir", default=None, help="Save all outputs (results.json, report.html, log.txt) to a timestamped subdirectory here")
args = parser.parse_args()
eval_set = json.loads(Path(args.eval_set).read_text())
skill_path = Path(args.skill_path)
if not (skill_path / "SKILL.md").exists():
print(f"Error: No SKILL.md found at {skill_path}", file=sys.stderr)
sys.exit(1)
name, _, _ = parse_skill_md(skill_path)
# Set up live report path
if args.report != "none":
if args.report == "auto":
timestamp = time.strftime("%Y%m%d_%H%M%S")
live_report_path = Path(tempfile.gettempdir()) / f"skill_description_report_{skill_path.name}_{timestamp}.html"
else:
live_report_path = Path(args.report)
# Open the report immediately so the user can watch
live_report_path.write_text("<html><body><h1>Starting optimization loop...</h1><meta http-equiv='refresh' content='5'></body></html>")
webbrowser.open(str(live_report_path))
else:
live_report_path = None
# Determine output directory (create before run_loop so logs can be written)
if args.results_dir:
timestamp = time.strftime("%Y-%m-%d_%H%M%S")
results_dir = Path(args.results_dir) / timestamp
results_dir.mkdir(parents=True, exist_ok=True)
else:
results_dir = None
log_dir = results_dir / "logs" if results_dir else None
output = run_loop(
eval_set=eval_set,
skill_path=skill_path,
description_override=args.description,
num_workers=args.num_workers,
timeout=args.timeout,
max_iterations=args.max_iterations,
runs_per_query=args.runs_per_query,
trigger_threshold=args.trigger_threshold,
holdout=args.holdout,
model=args.model,
verbose=args.verbose,
live_report_path=live_report_path,
log_dir=log_dir,
)
# Save JSON output
json_output = json.dumps(output, indent=2)
print(json_output)
if results_dir:
(results_dir / "results.json").write_text(json_output)
# Write final HTML report (without auto-refresh)
if live_report_path:
live_report_path.write_text(generate_html(output, auto_refresh=False, skill_name=name))
print(f"\nReport: {live_report_path}", file=sys.stderr)
if results_dir and live_report_path:
(results_dir / "report.html").write_text(generate_html(output, auto_refresh=False, skill_name=name))
if results_dir:
print(f"Results saved to: {results_dir}", file=sys.stderr)
if __name__ == "__main__":
main()

47
scripts/utils.py Normal file
View File

@@ -0,0 +1,47 @@
"""Shared utilities for skill-creator scripts."""
from pathlib import Path
def parse_skill_md(skill_path: Path) -> tuple[str, str, str]:
"""Parse a SKILL.md file, returning (name, description, full_content)."""
content = (skill_path / "SKILL.md").read_text()
lines = content.split("\n")
if lines[0].strip() != "---":
raise ValueError("SKILL.md missing frontmatter (no opening ---)")
end_idx = None
for i, line in enumerate(lines[1:], start=1):
if line.strip() == "---":
end_idx = i
break
if end_idx is None:
raise ValueError("SKILL.md missing frontmatter (no closing ---)")
name = ""
description = ""
frontmatter_lines = lines[1:end_idx]
i = 0
while i < len(frontmatter_lines):
line = frontmatter_lines[i]
if line.startswith("name:"):
name = line[len("name:"):].strip().strip('"').strip("'")
elif line.startswith("description:"):
value = line[len("description:"):].strip()
# Handle YAML multiline indicators (>, |, >-, |-)
if value in (">", "|", ">-", "|-"):
continuation_lines: list[str] = []
i += 1
while i < len(frontmatter_lines) and (frontmatter_lines[i].startswith(" ") or frontmatter_lines[i].startswith("\t")):
continuation_lines.append(frontmatter_lines[i].strip())
i += 1
description = " ".join(continuation_lines)
continue
else:
description = value.strip('"').strip("'")
i += 1
return name, description, content