from __future__ import annotations
"""Report Rendering & Metrics
Computes sprint metrics and renders leadership-ready reports in Markdown and HTML.
Responsibilities
- Velocity, type counts, reassignments, blockers, defects, dependencies
- Workload per member and board column/state aging
- Assemble final Markdown/HTML, optionally embedding AI commentary
"""
from collections import Counter
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
import io
import re
import unicodedata
import html as _html
from jinja2 import BaseLoader, Environment, select_autoescape
try:
from .utils import parse_date, days_between, field, as_float, DONE_STATES, WIP_STATES
from .config import env_bool, env_json
except ImportError: # direct script mode
from utils import parse_date, days_between, field, as_float, DONE_STATES, WIP_STATES # type: ignore
try:
from config import env_bool, env_json # type: ignore
except ImportError:
import os
def env_bool(key: str, default: bool = False) -> bool:
val = os.environ.get(key, "").lower()
return val in ("true", "1", "yes", "on") if val else default
def env_json(key: str, default=None):
raw = os.environ.get(key)
if not raw:
return default
try:
import json as _json
data = _json.loads(raw)
if isinstance(data, dict):
return data
except Exception:
return default
return default
def compute_velocity(items: List[Dict[str, Any]]):
"""Compute committed and completed story points.
Uses explicit committed/completed fields when present; otherwise infers
completion from items in done-like states. Returns (committed, completed).
"""
committed = 0.0
completed = 0.0
for r in items:
sp_committed = field(r, "story_points_committed", "committed_story_points")
sp_completed = field(r, "story_points_completed", "completed_story_points")
sp = field(r, "story_points", "story point", "effort")
state = str(field(r, "state", "board_column", "current_state", default="")).lower()
if sp_committed is not None:
committed += as_float(sp_committed, 0.0)
elif sp is not None:
committed += as_float(sp, 0.0)
if sp_completed is not None:
completed += as_float(sp_completed, 0.0)
else:
if state in DONE_STATES and sp is not None:
completed += as_float(sp, 0.0)
return committed, completed
def item_type_counts(items: List[Dict[str, Any]]) -> Counter:
"""Count items by work item type (e.g., Story/Bug/Task)."""
c = Counter()
for r in items:
t = str(field(r, "type", "work_item_type", default="unknown")).title()
c[t] += 1
return c
def summarize_reassignments(items: List[Dict[str, Any]]):
"""Extract current ownership and the latest reassignment event per item."""
try:
from .utils import parse_date as _pd # local import to avoid cycles
except ImportError: # script mode
from utils import parse_date as _pd # type: ignore
summary = []
for r in items:
reass = field(r, "reassignment_history", default=None)
if isinstance(reass, str):
try:
import json
reass = json.loads(reass)
except Exception:
reass = None
last = None
if isinstance(reass, list) and reass:
def _key(x):
return _pd(x.get("date")) or datetime.min.replace(tzinfo=timezone.utc)
last = sorted(reass, key=_key)[-1]
summary.append({
"id": field(r, "id"),
"title": field(r, "title"),
"type": field(r, "type", "work_item_type"),
"current_owner": field(r, "assigned_to", "assignee", "owner"),
"reassigned_from": (last or {}).get("from") if last else None,
"date_changed": (last or {}).get("date") if last else None,
"story_points": field(r, "story_points", "effort"),
})
return summary
def summarize_blockers(items: List[Dict[str, Any]], sprint_end):
"""List blocked items and categorize root causes into coarse buckets."""
from collections import Counter as _C
blocked_items = []
for r in items:
blocked = field(r, "blocked", "is_blocked", default=False)
if str(blocked).lower() in ("true", "1", "yes") or blocked is True:
reason = field(r, "blocked_reason", "blocker_reason", default="")
since = parse_date(field(r, "blocked_since", "blocker_since", default=None))
last_updated = parse_date(field(r, "last_updated_date", "changed_date"))
endref = sprint_end or last_updated or datetime.now(timezone.utc)
duration = days_between(since, endref)
blocked_items.append({
"id": field(r, "id"),
"title": field(r, "title"),
"owner": field(r, "assigned_to", "owner"),
"reason": reason,
"duration_days": duration,
})
pattern = _C()
for b in blocked_items:
reason = (b.get("reason") or "").lower()
if not reason:
continue
if any(k in reason for k in ["env", "environment", "test data", "data"]):
pattern["environment/test data"] += 1
elif any(k in reason for k in ["design", "ux", "requirement", "spec", "approval"]):
pattern["requirements/design/approvals"] += 1
elif any(k in reason for k in ["review", "code review", "pr", "qa", "testing"]):
pattern["reviews/testing"] += 1
elif any(k in reason for k in ["dep", "dependency", "api", "integration"]):
pattern["dependencies/integration"] += 1
else:
pattern["other"] += 1
return blocked_items, pattern
def summarize_defects(
items: List[Dict[str, Any]],
now: datetime,
*,
window_start: Optional[datetime] = None,
window_end: Optional[datetime] = None,
iteration_name: Optional[str] = None,
):
"""Summarize defects with iteration-aware board columns and closed stats."""
defects = [r for r in items if str(field(r, "type", "work_item_type", default="")).lower() == "bug"]
open_defects: List[Dict[str, Any]] = []
open_details: List[Dict[str, Any]] = []
focus_details: List[Dict[str, Any]] = []
all_details: List[Dict[str, Any]] = []
ages: List[float] = []
high_priority = 0
new_in_window_total = 0
new_in_window_open = 0
closed_total = 0
closed_in_window = 0
window_enabled = window_start is not None or window_end is not None
iteration_mismatch: List[Dict[str, Any]] = []
iteration_windows = _iteration_windows_from_env()
normalized_current_iter = _normalize_iteration_key(iteration_name)
iteration_board: Dict[str, Dict[str, Any]] = {}
board_states_seen: set = set()
focus_state_labels = {"Active", "In Progress"}
def created_in_window(created_dt: Optional[datetime]) -> bool:
if not created_dt:
return False
if window_start and created_dt < window_start:
return False
if window_end and created_dt > window_end:
return False
return True
def in_window_date(dt: Optional[datetime]) -> bool:
if not dt or not window_enabled:
return False
if window_start and dt < window_start:
return False
if window_end and dt > window_end:
return False
return True
for r in defects:
state_raw = str(field(r, "state", "current_state", default="") or "")
board_hint = field(r, "board_column", default=None)
canonical_state = _canonical_state_label(board_hint or state_raw)
board_states_seen.add(canonical_state)
state_lower = state_raw.strip().lower()
created = parse_date(field(r, "created_date", "created", default=None))
age = days_between(created, now)
if age is not None:
ages.append(age)
is_open = state_lower not in DONE_STATES
iteration_path = field(
r, "iteration_path", "iteration", "System.IterationPath", "system.iterationpath", default=""
) or ""
iteration_name_full = iteration_path or "(Unassigned)"
iteration_label = _short_iteration_name(iteration_path)
normalized_bug_iter = _normalize_iteration_key(iteration_path)
owner = field(r, "assigned_to", "owner", default="-")
completed_dt = _completion_date_from_record(r, current_state_lower=state_lower)
detail = {
"id": field(r, "id"),
"title": field(r, "title"),
"priority": field(r, "priority", default="-"),
"severity": field(r, "severity", default="-"),
"state": state_raw or "-",
"state_label": canonical_state,
"board_column": board_hint,
"assigned_to": owner,
"iteration": iteration_name_full,
"iteration_label": iteration_label,
"area": field(r, "area_path", "System.AreaPath", "system.areapath", default="-"),
"created_date": created,
"age_days": age,
"done_date": completed_dt,
}
all_details.append(detail)
board_entry = iteration_board.get(iteration_name_full)
if not board_entry:
board_entry = {
"name": iteration_name_full,
"label": iteration_label,
"normalized": normalized_bug_iter,
"counts": Counter(),
}
iteration_board[iteration_name_full] = board_entry
board_entry["counts"][canonical_state] += 1
if is_open:
open_defects.append(r)
open_details.append(detail)
if canonical_state in focus_state_labels:
focus_details.append(detail)
else:
closed_total += 1
if in_window_date(completed_dt):
closed_in_window += 1
pr = field(r, "priority", default="")
try:
prn = int(str(pr).strip().lstrip("Pp"))
if prn <= 2:
high_priority += 1
except Exception:
if str(pr).strip().upper() in ("P0", "P1", "BLOCKER", "CRITICAL", "HIGH"):
high_priority += 1
if window_enabled and created_in_window(created):
new_in_window_total += 1
if is_open:
new_in_window_open += 1
mismatch = False
if window_enabled and created_in_window(created):
entry = iteration_windows.get(normalized_bug_iter)
if entry:
start, end = entry
if (start and created and created < start) or (end and created and created > end):
mismatch = True
else:
if normalized_bug_iter and normalized_current_iter and normalized_bug_iter != normalized_current_iter:
mismatch = True
if mismatch:
iteration_mismatch.append({
"id": field(r, "id"),
"title": field(r, "title"),
"created_date": created,
"iteration": iteration_name_full or "(unknown)",
"state": state_raw or "-",
})
state_set = {s for s in board_states_seen if s}
base_order = ["New", "Active", "In Progress", "Resolved", "Closed"]
ordered_columns = [col for col in base_order if col in state_set]
remaining_states = sorted(state_set - set(ordered_columns))
ordered_columns.extend(remaining_states)
board_rows = []
if ordered_columns and iteration_board:
sorted_entries = sorted(
iteration_board.values(),
key=lambda entry: _iteration_sort_key(entry["name"], iteration_windows, normalized_current_iter),
)
for entry in sorted_entries:
counts = {col: entry["counts"].get(col, 0) for col in ordered_columns}
board_rows.append({
"iteration": entry["name"],
"iteration_label": entry["label"],
"counts": counts,
"total": sum(counts.values()),
})
avg_age = sum(ages) / len(ages) if ages else 0.0
carryover_open = (len(open_defects) - new_in_window_open) if window_enabled else None
closed_in_window_val = closed_in_window if window_enabled else None
return {
"total": len(defects),
"open": len(open_defects),
"avg_age_days": avg_age,
"high_priority": high_priority,
"top_aged": sorted(
defects,
key=lambda r: days_between(parse_date(field(r, "created_date")), now) or -1,
reverse=True,
)[:5],
"new_in_window_total": new_in_window_total if window_enabled else None,
"new_in_window_open": new_in_window_open if window_enabled else None,
"carryover_open": carryover_open,
"open_details": open_details,
"focus_open_details": focus_details,
"all_details": all_details,
"iteration_mismatch": iteration_mismatch,
"board_columns": ordered_columns,
"board_rows": board_rows,
"closed_total": closed_total,
"closed_in_window": closed_in_window_val,
}
def summarize_dependencies(items: List[Dict[str, Any]]):
"""Collect dependency links and flag cross-team dependencies when possible."""
deps: List[Dict[str, Any]] = []
for r in items:
linked = field(r, "dependencies", default=None)
if isinstance(linked, str):
try:
import json
linked = json.loads(linked)
except Exception:
try:
from .utils import _as_list
except ImportError:
from utils import _as_list # type: ignore
linked = _as_list(linked)
try:
from .utils import _as_list
except ImportError:
from utils import _as_list # type: ignore
for d in _as_list(linked):
if isinstance(d, dict):
deps.append({
"item_id": field(r, "id"),
"item_title": field(r, "title"),
"dependency_id": d.get("id"),
"dependency_title": d.get("title"),
"dependency_team": d.get("team"),
"cross_team": (d.get("team") and d.get("team") != field(r, "team")),
})
else:
deps.append({
"item_id": field(r, "id"),
"item_title": field(r, "title"),
"dependency_id": d,
"dependency_title": None,
"dependency_team": None,
"cross_team": None,
})
return deps
def summarize_workload(items: List[Dict[str, Any]]):
"""Aggregate workload stats per member: items, SP, active WIP, blocked."""
from collections import defaultdict
try:
from .utils import WIP_STATES
except ImportError:
from utils import WIP_STATES # type: ignore
per_user = defaultdict(lambda: {"items": 0, "story_points": 0.0, "wip": 0, "blocked": 0})
for r in items:
owner = field(r, "assigned_to", "owner", default="Unassigned") or "Unassigned"
state = str(field(r, "state", "board_column", "current_state", default="")).lower()
sp = as_float(field(r, "story_points", "effort"), 0.0)
blocked = field(r, "blocked", default=False)
per_user[owner]["items"] += 1
per_user[owner]["story_points"] += sp
if state in WIP_STATES:
per_user[owner]["wip"] += 1
if str(blocked).lower() in ("true", "1", "yes") or blocked is True:
per_user[owner]["blocked"] += 1
return per_user
def summarize_aging(items: List[Dict[str, Any]], now: datetime):
"""Compute average and longest days per board column/state.
Prefers explicit column/state history; falls back to time-in-state when available.
"""
from collections import defaultdict
per_col_durations = defaultdict(list)
for r in items:
hist = field(r, "column_history", "state_history", default=None)
if isinstance(hist, str):
try:
import json
hist = json.loads(hist)
except Exception:
hist = None
if isinstance(hist, list) and hist:
for h in hist:
col = str(h.get("column") or h.get("state") or "Unknown")
entered = parse_date(h.get("entered"))
exited = parse_date(h.get("exited")) or now
dur = days_between(entered, exited)
if dur is not None:
per_col_durations[col].append(dur)
else:
col = str(field(r, "state", "board_column", default="Unknown"))
entered = parse_date(field(r, "state_entered_date", "column_entered_date", default=None))
dur = days_between(entered, now)
if dur is not None:
per_col_durations[col].append(dur)
summary = []
for col, vals in per_col_durations.items():
avg_days = sum(vals) / len(vals) if vals else 0.0
longest = max(vals) if vals else 0.0
summary.append({"column": col, "avg_days": avg_days, "longest": longest})
order = ["To Do", "In Progress", "Testing", "Done"]
def sort_key(x):
name = x["column"]
try:
return (order.index(name), name)
except ValueError:
return (len(order) + 1, name)
summary.sort(key=sort_key)
return summary
def _items_by_type(items: List[Dict[str, Any]], type_names: List[str]) -> List[Dict[str, Any]]:
wanted = {name.lower() for name in type_names}
filtered = []
for r in items:
t = str(field(r, "type", "work_item_type", default="")).lower()
if t in wanted:
filtered.append(r)
return filtered
def _render_aging_table_html(
aging_rows: List[Dict[str, Any]],
*,
table_style: str,
th_style: str,
td_style: str,
) -> str:
if not aging_rows:
return ""
rows = []
for a in aging_rows:
rows.append(
f"<tr><td style=\"{td_style}\">{_html.escape(str(a['column']))}</td>"
f"<td style=\"{td_style}\">{_html.escape(safe_float_str(a['avg_days']))}</td>"
f"<td style=\"{td_style}\">{_html.escape(safe_float_str(a['longest']))}</td>"
f"<td style=\"{td_style}\"></td></tr>"
)
body = "\n".join(rows)
return (
f"<table style=\"{table_style}\">"
f"<tr><th style=\"{th_style}\">Board Column</th><th style=\"{th_style}\">Avg Days</th>"
f"<th style=\"{th_style}\">Longest Aging</th><th style=\"{th_style}\">Comments</th></tr>"
f"{body}</table>"
)
def safe_float_str(x):
"""Render a float to one decimal place or '-' if None/invalid."""
if x is None:
return "-"
try:
return f"{float(x):.1f}"
except Exception:
return "-"
_AI_WHITESPACE_RE = re.compile(r"\s+")
EXEC_SUMMARY_ICON_LOOKUP = {
"executive summary": "\U0001F4AC",
"summary": "\U0001F4AC",
"highlight": "\u2728",
"highlights": "\u2728",
"win": "\U0001F389",
"wins": "\U0001F389",
"risk": "\u26A0",
"risks": "\u26A0",
"concern": "\u26A0",
"watch": "\u23F3",
"focus": "\U0001F4CC",
"next sprint": "\u27A1",
"next": "\u27A1",
"action": "\u270D",
"actions": "\u270D",
"recommendation": "\U0001F4CC",
"recommendations": "\U0001F4CC",
"block": "\u274C",
"blocks": "\u274C",
"issue": "\u2757",
"issues": "\u2757",
}
_DEFAULT_SECTION_ICON = "\u2728"
_SECONDARY_BULLET_ICON = "\u2022"
_ICON_FONT_STACK = "font-family:'Segoe UI Emoji','Segoe UI Symbol','Segoe UI',sans-serif;"
def _normalize_iteration_key(name: Optional[str]) -> str:
return str(name or "").strip().lower()
def _short_iteration_name(name: Optional[str]) -> str:
"""Return a compact iteration label (last path segment)."""
if not name:
return "(Unassigned)"
parts = re.split(r"[\\/]", str(name))
for part in reversed(parts):
part = part.strip()
if part:
return part
trimmed = str(name).strip()
return trimmed or "(Unassigned)"
def _iteration_windows_from_env() -> Dict[str, Tuple[Optional[datetime], Optional[datetime]]]:
windows: Dict[str, Tuple[Optional[datetime], Optional[datetime]]] = {}
raw = env_json("ITERATION_DATE_MAP", {}) or {}
if not isinstance(raw, dict):
return windows
for key, val in raw.items():
if not isinstance(val, dict):
continue
start = parse_date(val.get("start"))
end = parse_date(val.get("end"))
aliases: List[str] = []
if isinstance(key, str):
aliases.append(key)
parts = re.split(r"[\\/]", key)
if parts:
aliases.append(parts[-1])
else:
aliases.append(str(key))
for alias in aliases:
normalized = _normalize_iteration_key(alias)
if not normalized:
continue
if normalized not in windows:
windows[normalized] = (start, end)
return windows
def _canonical_state_label(raw_state: Optional[str]) -> str:
"""Normalize a work item state or board column into board-friendly buckets."""
text = str(raw_state or "").strip()
if not text:
return "Unspecified"
lowered = text.lower()
if lowered in ("new", "todo", "to do", "proposed", "backlog", "open"):
return "New"
if lowered in ("active", "committed", "accepted", "ready"):
return "Active"
if "progress" in lowered or lowered in ("doing", "development", "implementing", "dev", "working"):
return "In Progress"
if lowered in ("resolved", "qa", "testing", "test", "verify", "verification", "ready for test"):
return "Resolved"
if lowered in DONE_STATES or lowered in ("closed", "done", "completed", "removed"):
return "Closed"
return text.title()
def _completion_date_from_record(record: Dict[str, Any], *, current_state_lower: str) -> Optional[datetime]:
"""Best-effort completion timestamp for a defect (closed/resolved)."""
for key in ("closed_date", "resolved_date"):
dt = parse_date(field(record, key, default=None))
if dt:
return dt
if current_state_lower in DONE_STATES:
state_change = parse_date(field(record, "state_change_date", default=None))
if state_change:
return state_change
history = field(record, "column_history", "state_history", default=None)
if isinstance(history, str):
try:
import json
history = json.loads(history)
except Exception:
history = None
if isinstance(history, list):
done_dates: List[datetime] = []
for entry in history:
state = str(entry.get("state") or "").strip().lower()
if state and state in DONE_STATES:
entered = parse_date(entry.get("entered"))
if entered:
done_dates.append(entered)
if done_dates:
return sorted(done_dates)[0]
return None
def _iteration_sort_key(
name: str,
iteration_windows: Dict[str, Tuple[Optional[datetime], Optional[datetime]]],
normalized_current: str,
) -> Tuple[int, datetime, str]:
"""Sort iterations roughly by start date, prioritizing the current sprint."""
normalized = _normalize_iteration_key(name)
window = iteration_windows.get(normalized)
start = window[0] if window else None
priority = 0 if normalized and normalized == normalized_current else 1
fallback = datetime.max.replace(tzinfo=timezone.utc)
label = _short_iteration_name(name)
return (priority, start or fallback, label.lower())
_LABEL_VALUE_RE = re.compile(r"^\s*([^:]{2,80}):\s*(.+)$")
_SUMMARY_SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
_INLINE_BULLET_FIX_RE = re.compile(r":\s*-\s*")
_INLINE_SYMBOL_BULLET_RE = re.compile(r"(?<!\n)([-*\u2022\u25AA\u25AB])\s+")
_INLINE_BULLET_CHAIN_RE = re.compile(r"(?<=\S)\s+[-\u2013\u2014]\s+(?=[A-Z0-9])")
_INLINE_SYMBOL_CHAIN_RE = re.compile(r"(?<=\S)\s+([\u2022\u25AA\u25AB])\s+(?=[A-Z0-9])")
_SUBPOINT_DELIM_RE = re.compile(r"(?:;|\u2022|\u2219|\u00B7|\s[-\u2013\u2014]\s)")
_NUMBERED_BULLET_RE = re.compile(r"^\d+[\.\)]\s+")
_EMOJI_SHORTCODES = {
":rocket:": "🚀",
":warning:": "⚠️",
":dart:": "🎯",
":sparkles:": "✨",
":trophy:": "🏆",
":target:": "🎯",
":check:": "✅",
":x:": "❌",
}
_EMOJI_SHORTCODE_RE = re.compile(
"|".join(sorted((re.escape(k) for k in _EMOJI_SHORTCODES), key=len, reverse=True))
) if _EMOJI_SHORTCODES else None
_HTML_TAG_SPLIT_RE = re.compile(r"(<[^>]+>)")
_NUMERIC_TOKEN_RE = re.compile(r"\b\d[\d,]*(?:\.\d+)?%?\b")
def _replace_emoji_shortcodes(text: Optional[str]) -> str:
if not text:
return "" if text is None else text
if not _EMOJI_SHORTCODE_RE:
return text
return _EMOJI_SHORTCODE_RE.sub(lambda m: _EMOJI_SHORTCODES.get(m.group(0), m.group(0)), text)
def _emphasize_numbers_html(text: Optional[str]) -> str:
if not text:
return "" if text is None else text
strong_depth = 0
parts: List[str] = []
for chunk in _HTML_TAG_SPLIT_RE.split(text):
if not chunk:
continue
if chunk.startswith("<"):
lowered = chunk.lower()
if lowered.startswith("<strong"):
strong_depth += 1
elif lowered.startswith("</strong"):
strong_depth = max(0, strong_depth - 1)
parts.append(chunk)
continue
if strong_depth > 0:
parts.append(chunk)
continue
parts.append(_NUMERIC_TOKEN_RE.sub(r"<strong>\g<0></strong>", chunk))
return "".join(parts)
def _apply_presentational_filters(text: Optional[str]) -> str:
if text is None:
return ""
with_emoji = _replace_emoji_shortcodes(text)
return _emphasize_numbers_html(with_emoji)
def _label_span(label: str) -> str:
label = label.strip()
if not label:
return ""
return f"<span style='font-weight:600;color:#111827;'>{_html.escape(label)}</span>"
def _format_label_value_html(text: str) -> str:
"""Return escaped HTML with bolded label when pattern resembles `Label: value`."""
line = text.strip()
if not line:
return ""
match = _LABEL_VALUE_RE.match(line)
if match:
label, rest = match.groups()
label = label.strip()
rest = rest.strip()
if label and rest:
return f"{_label_span(label)}: {_html.escape(rest)}"
return _html.escape(line)
def _split_subpoint_candidates(text: str) -> List[str]:
values: List[str] = []
if not text:
return values
for part in _SUBPOINT_DELIM_RE.split(text):
cleaned = part.strip(" -\u2022\u2219\u00b7;")
if not cleaned:
continue
segments = [
seg.strip()
for seg in _SUMMARY_SENTENCE_SPLIT_RE.split(cleaned)
if seg.strip()
]
if not segments:
continue
values.extend(_format_label_value_html(seg) for seg in segments)
return values
def _clean_ai_line(line: str) -> str:
"""Trim control characters and collapse whitespace for AI text lines."""
if not line:
return ""
try:
decoded = _html.unescape(str(line))
except Exception:
decoded = str(line)
decoded = decoded.replace("
", " ").replace("
", " ").replace(" ", " ")
decoded = decoded.replace("&", "&").replace("<", "<").replace(">", ">")
decoded = decoded.replace(""", '"').replace("'", "'")
normalized = unicodedata.normalize("NFKC", decoded)
normalized = normalized.replace("\u00a0", " ").replace("\ufeff", "")
filtered = "".join(
ch for ch in normalized
if unicodedata.category(ch)[0] != "C" or ch in ("\t", "\n", "\r")
)
filtered = filtered.strip()
return _AI_WHITESPACE_RE.sub(" ", filtered)
def _prepare_ai_sections(ai_text: Optional[str]) -> List[Dict[str, Any]]:
"""Split AI commentary into titled sections based on Markdown headings."""
if not ai_text:
return []
cleaned_lines = [_clean_ai_line(ln) for ln in ai_text.splitlines()]
cleaned = [ln for ln in cleaned_lines if ln]
if not cleaned:
return []
sections: List[Dict[str, Any]] = []
current_title = "Highlights"
current_lines: List[str] = []
def _flush():
nonlocal current_lines
if current_lines:
sections.append({"title": current_title, "lines": list(current_lines)})
current_lines.clear()
for line in cleaned:
stripped = line.lstrip("#").strip()
if line.startswith("##") and stripped:
_flush()
current_title = stripped
continue
if line.startswith("#") and stripped and not sections:
_flush()
current_title = stripped
continue
current_lines.append(line)
_flush()
if not sections:
fallback_lines: List[str] = []
fallback_title = "Highlights"
for line in cleaned:
if line.startswith("#"):
maybe = line.lstrip("#").strip()
if maybe:
fallback_title = maybe
else:
fallback_lines.append(line)
sections.append({"title": fallback_title, "lines": fallback_lines or cleaned})
return sections
def _format_paragraph_html(text: str) -> str:
"""Soft-wrap long sentences with <br> separators for readability."""
if not text:
return ""
segments = [
seg.strip()
for seg in _SUMMARY_SENTENCE_SPLIT_RE.split(text)
if seg.strip()
]
if len(segments) <= 1:
return _format_label_value_html(text)
return "<br>".join(_format_label_value_html(seg) for seg in segments)
def _normalize_ai_text_blocks(text: str) -> str:
"""Encourage AI bullet formatting by inserting newlines before inline dash bullets."""
if not text:
return text
fixed = _INLINE_BULLET_FIX_RE.sub(":\n- ", text)
fixed = _INLINE_SYMBOL_BULLET_RE.sub(lambda m: f"\n{m.group(1)} ", fixed)
fixed = _INLINE_BULLET_CHAIN_RE.sub("\n- ", fixed)
fixed = _INLINE_SYMBOL_CHAIN_RE.sub(lambda m: f"\n{m.group(1)} ", fixed)
return fixed
def _summary_points(summary_text: str) -> List[Dict[str, Any]]:
"""Break the fallback summary into structured bullet-friendly snippets."""
if not summary_text:
return []
points: List[Dict[str, Any]] = []
for chunk in _SUMMARY_SENTENCE_SPLIT_RE.split(summary_text):
cleaned = chunk.strip().strip('.')
if not cleaned:
continue
match = _LABEL_VALUE_RE.match(cleaned)
if match:
label, rest = match.groups()
subparts = _split_subpoint_candidates(rest)
if subparts:
points.append({"text": _label_span(label), "subpoints": subparts})
else:
points.append({"text": _format_label_value_html(cleaned), "subpoints": []})
continue
subparts = _split_subpoint_candidates(cleaned)
if len(subparts) > 1:
points.append({"text": subparts[0], "subpoints": subparts[1:]})
else:
points.append({"text": _html.escape(cleaned), "subpoints": []})
return points
def _render_html_bullet_list(
points: List[Dict[str, Any]],
*,
font_size: str = "14px",
color: str = "#334155",
) -> str:
"""Render nested bullet HTML from structured points."""
def _normalize_point(point: Any) -> Dict[str, Any]:
if isinstance(point, dict):
text = _apply_presentational_filters(point.get("text", ""))
subs = [_normalize_point(sub) for sub in point.get("subpoints") or []]
else:
text = _apply_presentational_filters(str(point))
subs = []
return {"text": text, "subpoints": subs}
def _render(items: List[Dict[str, Any]], level: int) -> str:
if not items:
return ""
margin = 0 if level == 0 else 16
padding = 0 if level == 0 else 16
top_margin = 0 if level == 0 else 6
list_style = "none" if level == 0 else "disc"
html_parts = [
"<ul style="
f"'margin:{top_margin}px 0 0 {margin}px;padding-left:{padding}px;"
f"list-style-type:{list_style};"
f"font-size:{font_size};line-height:1.6;color:{color};"
"font-family:Segoe UI,Arial,Helvetica,sans-serif;white-space:normal;'>"
]
for item in items:
li_style = "list-style:none;" if level == 0 else ""
html_parts.append(
"<li style='margin-bottom:8px;word-break:break-word;"
f"{li_style}'>"
)
html_parts.append(item.get("text", ""))
subs = item.get("subpoints") or []
if subs:
html_parts.append(_render(subs, level + 1))
html_parts.append("</li>")
html_parts.append("</ul>")
return "".join(html_parts)
prepared_points = [_normalize_point(pt) for pt in points]
return _render(prepared_points, 0)
def _icon_for_section(title: str, index: int) -> str:
"""Choose a simple icon (HTML entity) for a commentary card."""
lower = title.lower()
for key, icon in EXEC_SUMMARY_ICON_LOOKUP.items():
if key in lower:
return icon
return _DEFAULT_SECTION_ICON if index == 0 else _SECONDARY_BULLET_ICON
_JINJA_ENV = Environment(
loader=BaseLoader(),
autoescape=select_autoescape(
enabled_extensions=("html",),
default_for_string=True,
default=True,
),
)
_COMMENTARY_TEMPLATE = _JINJA_ENV.from_string(
"""
{% for section in sections %}
{% set section_idx = loop.index0 %}
<div style="border-radius:10px;border:{% if section_idx == 0 %}2px solid #3b82f6{% else %}1px solid #cbd5e1{% endif %};background:{% if section_idx == 0 %}#f0f7ff{% else %}#f8fafc{% endif %};padding:18px 20px;margin-bottom:16px;">
<div style="display:flex;align-items:center;gap:8px;margin-bottom:10px;">
<span style="font-size:{% if section_idx == 0 %}24px{% else %}20px{% endif %};vertical-align:middle;font-family:'Segoe UI Emoji','Segoe UI Symbol','Segoe UI',sans-serif;">{{ section.icon }}</span>
<span style="font-weight:700;font-size:{% if section_idx == 0 %}18px{% else %}16px{% endif %};color:#0f172a;line-height:1.3;">{{ section.title | safe }}</span>
</div>
{% if section.paragraphs %}
{% for para in section.paragraphs %}
<p style="margin:0 0 8px 0;font-size:{% if section_idx == 0 %}15px{% else %}14px{% endif %};line-height:1.7;color:#1e293b;font-family:Segoe UI,Arial,Helvetica,sans-serif;">{{ para | safe }}</p>
{% endfor %}
{% endif %}
{% if section.bullets %}
<ul style="margin:0;padding-left:0;list-style:none;font-size:{% if section_idx == 0 %}15px{% else %}14px{% endif %};line-height:1.7;color:#0f172a;font-family:Segoe UI,Arial,Helvetica,sans-serif;">
{% for item in section.bullets %}
<li style="margin-bottom:10px;list-style:none;">
{{ item.text | safe }}
{% if item.subpoints %}
<ul style="margin:6px 0 0 18px;padding-left:18px;list-style:disc;font-size:13px;line-height:1.6;color:#334155;">
{% for sub in item.subpoints %}
<li style="margin-bottom:4px;">{{ sub | safe }}</li>
{% endfor %}
</ul>
{% endif %}
</li>
{% endfor %}
</ul>
{% endif %}
</div>
{% endfor %}
"""
)
def _default_commentary_summary(
iter_name: str,
velocity_pct: float,
reass: List[Dict[str, Any]],
items: List[Dict[str, Any]],
deps: List[Dict[str, Any]],
) -> str:
"""Produce a fallback text summary when AI commentary is unavailable."""
name = iter_name or "Sprint"
if velocity_pct >= 85:
achievement = "strong delivery vs commitment"
elif velocity_pct >= 70:
achievement = "solid delivery with room to improve"
else:
achievement = "below target delivery; review commitment and flow"
risks = []
if any(field(r, "blocked", default=False) for r in items):
risks.append("recurring blockers")
if any(
(field(r, "priority", default="") in ("P0", "P1", 0, 1, 2))
for r in items
if str(field(r, "type", default="")).lower() == "bug"
):
risks.append("aging high-priority defects")
if any(d.get("cross_team") for d in deps):
risks.append("cross-team dependencies")
reassign_trend = (
"limited reassignments"
if sum(1 for r in reass if r.get("reassigned_from")) == 0
else "mid-sprint reassignments for load balancing"
)
summary = (
f"{name} shows {achievement}. "
f"Key risks: {', '.join(risks) if risks else 'no major risks detected'}. "
f"Ownership shifts: {reassign_trend}. "
"Recommendations: stabilize test environments, tighten dependency planning, "
"and align commitment with demonstrated velocity."
)
return summary
def _format_ai_commentary_block(ai_text: Optional[str]) -> Optional[str]:
"""Convert AI commentary text into Outlook-friendly HTML cards.
Args:
ai_text: The extracted commentary text from generate_ai_commentary()
(this is the content extracted from raw_text in ai.py)
Uses the AI commentary text directly, parsing sections based on markdown
headings (##) as expected from the AI prompt structure. The text should
already be extracted from JSON response format if needed.
"""
if not ai_text:
return None
normalized = _normalize_ai_text_blocks(ai_text)
sections = _prepare_ai_sections(normalized)
if not sections:
return None
payload = []
bullet_prefixes = ("- ", "* ", "• ", "▪ ", "▫ ", "– ", "— ")
def _make_bullet(text: str, subpoints: Optional[List[str]] = None) -> Dict[str, Any]:
return {
"text": _apply_presentational_filters(text),
"subpoints": [_apply_presentational_filters(sp) for sp in (subpoints or [])],
}
for idx, section in enumerate(sections):
title = section.get("title") or "Highlights"
paragraphs: List[str] = []
bullets: List[Dict[str, Any]] = []
for raw_line in section.get("lines") or []:
line = raw_line.strip()
if not line:
continue
bullet_entry: Optional[Dict[str, Any]] = None
num_match = _NUMBERED_BULLET_RE.match(line)
if num_match:
candidate = line[num_match.end():].strip()
if candidate:
bullet_entry = _make_bullet(_format_paragraph_html(candidate))
if not bullet_entry:
for prefix in bullet_prefixes:
if line.startswith(prefix):
cleaned = line[len(prefix):].strip()
if cleaned:
bullet_entry = _make_bullet(_format_paragraph_html(cleaned))
break
if not bullet_entry and ":" in line:
head, tail = line.split(":", 1)
candidates = _split_subpoint_candidates(tail)
if candidates:
bullet_entry = _make_bullet(_label_span(head), candidates)
if not bullet_entry:
candidates = _split_subpoint_candidates(line)
if len(candidates) > 1:
bullet_entry = _make_bullet(candidates[0], candidates[1:])
if bullet_entry:
bullets.append(bullet_entry)
continue
if line.startswith(("**", "__")) and line.endswith(("**", "__")):
cleaned = line.strip("*_ ").strip()
if cleaned:
paragraphs.append(_apply_presentational_filters(_format_paragraph_html(cleaned)))
else:
paragraphs.append(_apply_presentational_filters(_format_paragraph_html(line)))
if not paragraphs and not bullets:
continue
payload.append({
"title": _apply_presentational_filters(_html.escape(title)),
"icon": _icon_for_section(title, idx),
"paragraphs": paragraphs,
"bullets": bullets,
})
if not payload:
return None
return _COMMENTARY_TEMPLATE.render(sections=payload)
if not payload:
return None
return _COMMENTARY_TEMPLATE.render(sections=payload)
def _render_executive_summary_block(
summary_text: str,
ai_text: Optional[str],
*,
ai_enabled: bool,
) -> str:
"""Render the executive summary with optional AI commentary stacked underneath."""
summary_icon = EXEC_SUMMARY_ICON_LOOKUP.get("summary", _DEFAULT_SECTION_ICON)
summary_points = _summary_points(summary_text)
if summary_points:
summary_body = _render_html_bullet_list(summary_points)
else:
summary_body = (
"<p style='margin:0;font-size:14px;line-height:1.7;color:#0f172a;"
"font-family:Segoe UI,Arial,Helvetica,sans-serif;'>"
f"{_apply_presentational_filters(_format_paragraph_html(summary_text))}</p>"
)
summary_card = (
"<div style='border-radius:12px;border:1px solid #cbd5e1;background:#f8fafc;"
"padding:18px 20px;margin-bottom:16px;'>"
"<div style='display:flex;align-items:center;gap:8px;font-weight:700;font-size:16px;"
"color:#0f172a;line-height:1.3;'>"
f"<span style='font-size:20px;vertical-align:middle;{_ICON_FONT_STACK}'>{summary_icon}</span>"
"<span>Executive Summary</span>"
"</div>"
f"<div style='margin-top:12px;'>{summary_body}</div>"
"</div>"
)
if not ai_enabled:
return summary_card
ai_sections = _format_ai_commentary_block(ai_text) if ai_text else None
if ai_sections:
return "\n".join([summary_card, ai_sections])
if ai_text:
# AI returned text but not the expected section format - keep it readable
ai_fallback_card = (
"<div style='border-radius:12px;border:1px solid #3b82f6;background:#e0edff;"
"padding:18px 20px;margin-bottom:16px;'>"
"<div style='display:flex;align-items:center;gap:8px;font-weight:700;font-size:16px;"
"color:#1d4ed8;'>"
f"<span style='font-size:20px;{_ICON_FONT_STACK}'>{_DEFAULT_SECTION_ICON}</span>"
"<span>AI Highlights</span>"
"</div>"
f"<div style='margin-top:12px;font-size:14px;line-height:1.7;color:#0f172a;font-family:Segoe UI,Arial,Helvetica,sans-serif;white-space:pre-wrap;'>"
f"{_apply_presentational_filters(_html.escape(ai_text))}</div>"
"</div>"
)
return "\n".join([summary_card, ai_fallback_card])
no_ai_card = (
"<div style='border-radius:10px;border:1px dashed #cbd5e1;background:#f8fafc;"
"padding:14px 16px;margin-bottom:16px;'>"
"<div style='font-size:14px;line-height:1.6;color:#6b7280;font-family:Segoe UI,Arial,Helvetica,sans-serif;'>"
"AI commentary not available"
"</div>"
"</div>"
)
return "\n".join([summary_card, no_ai_card])
def render_report(
params: Dict[str, str],
items: List[Dict[str, Any]],
ai_commentary: Optional[str] = None,
) -> str:
"""Render a clean Markdown report from inputs and computed metrics."""
now = datetime.now(timezone.utc)
sprint_start = parse_date(params.get("start"))
sprint_end = parse_date(params.get("end"))
iter_name = params.get("iteration") or ""
proj = params.get("project") or "Project"
type_counts = item_type_counts(items)
total_items = sum(type_counts.values())
committed, completed = compute_velocity(items)
velocity_pct = (completed / committed * 100.0) if committed > 0 else 0.0
last_updated_dates = [parse_date(field(r, "last_updated_date", "changed_date", default=None)) for r in items]
last_updated = max([d for d in last_updated_dates if d], default=now)
reass = summarize_reassignments(items)
blocked_items, blocker_patterns = summarize_blockers(items, sprint_end)
defects = summarize_defects(
items,
now,
window_start=sprint_start,
window_end=sprint_end,
iteration_name=iter_name,
)
deps = summarize_dependencies(items)
workload = summarize_workload(items)
aging = summarize_aging(items, now)
story_items = _items_by_type(items, ["user story", "story"])
bug_items = _items_by_type(items, ["bug"])
story_aging = summarize_aging(story_items, now) if story_items else []
bug_aging = summarize_aging(bug_items, now) if bug_items else []
lines: List[str] = []
header = f"## {iter_name or 'Sprint'} Summary — {proj} ({params.get('start','?')}–{params.get('end','?')})"
lines.append(header)
lines.append("")
lines.append("### Sprint Overview")
type_breakdown = ", ".join([f"{k if k.endswith('s') else k + 's'}: {v}" for k, v in type_counts.items()])
lines.append(f"- Total Work Items: {total_items} ({type_breakdown})")
lines.append(f"- Velocity: {int(round(committed))} SP committed / {int(round(completed))} SP completed ({int(round(velocity_pct))}%)")
lines.append(f"- Last Updated: {last_updated.date()}")
lines.append("")
lines.append("### Ownership & Reassignments")
lines.append("| User Story | Current Owner | Reassigned From | Date Changed | Story Points |")
lines.append("|------------|---------------|-----------------|--------------|--------------|")
for r in reass:
lines.append(
f"| {r['id']} | {r.get('current_owner') or '-'} | {r.get('reassigned_from') or '-'} | "
f"{(r.get('date_changed') or '-')} | {r.get('story_points') or '-'} |")
mid_reassigns = sum(1 for r in reass if r.get('reassigned_from'))
if mid_reassigns:
lines.append(f"> {mid_reassigns} reassignments occurred (typical causes: blocker mitigation, load balancing).")
else:
lines.append("> No mid-sprint reassignments detected.")
lines.append("")
lines.append("### Blockers & Root Causes")
if blocked_items:
lines.append("| Work Item | Title | Owner | Duration (days) | Reason |")
lines.append("|-----------|-------|-------|------------------|--------|")
for b in blocked_items:
d = b.get("duration_days")
dur = safe_float_str(d) if d is not None else "-"
lines.append(f"| {b['id']} | {b['title']} | {b.get('owner') or '-'} | {dur} | {b.get('reason') or '-'} |")
if blocker_patterns:
top = ", ".join([f"{k}: {v}" for k, v in blocker_patterns.most_common()])
lines.append(f"> Blocker patterns suggest: {top}.")
else:
lines.append("- No blocked items reported.")
lines.append("")
lines.append("### Defect Insights")
lines.append(f"- {defects['total']} total defects; average age {safe_float_str(defects['avg_age_days'])} days")
lines.append(f"- {defects['open']} open; {defects['high_priority']} high-priority")
if defects.get("new_in_window_total") is not None:
lines.append(
f"- {defects['new_in_window_total']} defects created this sprint; "
f"{defects['new_in_window_open']} still open"
)
carry = defects.get("carryover_open")
if carry is not None:
lines.append(f"- {carry} open defects carried over from prior sprints")
closed_total = defects.get("closed_total")
if closed_total is not None:
closed_line = f"- {closed_total} defects closed to date"
closed_in_window = defects.get("closed_in_window")
if closed_in_window is not None:
closed_line += f" ({closed_in_window} closed within this sprint)"
lines.append(closed_line)
board_cols = defects.get("board_columns") or []
board_rows = defects.get("board_rows") or []
if board_cols and board_rows:
lines.append("")
header_cells = ["Iteration"] + board_cols + ["Total"]
header = "| " + " | ".join(header_cells) + " |"
divider_cells = ["-----------"] + ["---"] * len(board_cols) + ["---"]
divider = "| " + " | ".join(divider_cells) + " |"
lines.append(header)
lines.append(divider)
for row in board_rows:
row_cells = [row.get("iteration_label") or row.get("iteration") or "-"]
row_cells.extend(str(row["counts"].get(col, 0)) for col in board_cols)
row_cells.append(str(row.get("total", 0)))
lines.append("| " + " | ".join(row_cells) + " |")
if defects["total"] > 0 and defects["top_aged"]:
lines.append("")
lines.append("| Defect | Title | Age (days) | Priority | State |")
lines.append("|--------|-------|------------|----------|-------|")
for d in defects["top_aged"]:
age = days_between(parse_date(field(d, "created_date")), now)
pr = field(d, "priority", default="-")
st = field(d, "state", "board_column", "current_state", default="-")
lines.append(f"| {field(d,'id')} | {field(d,'title')} | {safe_float_str(age)} | {pr} | {st} |")
focus_items = defects.get("focus_open_details") or []
if focus_items:
lines.append("")
lines.append("> Active/In Progress defects needing attention:")
max_focus = 10
for detail in focus_items[:max_focus]:
age = safe_float_str(detail.get("age_days"))
lines.append(
f"> - {detail.get('id')}: {detail.get('title')} "
f"({detail.get('assigned_to') or '-'}, {detail.get('iteration_label')}, {age}d)"
)
if len(focus_items) > max_focus:
lines.append(f"> - ... {len(focus_items) - max_focus} more in Excel details")
mismatches = defects.get("iteration_mismatch") or []
if mismatches:
lines.append("")
lines.append("> ⚠️ Defects created in this sprint but tagged to a different iteration:")
for mis in mismatches[:10]:
created = mis.get("created_date")
created_str = created.date() if isinstance(created, datetime) else created
lines.append(f"> - {mis['id']}: {mis['title']} (created {created_str}, state {mis['state']}, iteration {mis.get('iteration')})")
if len(mismatches) > 10:
lines.append(f"> - … {len(mismatches) - 10} more")
lines.append("")
lines.append("### Velocity & Story Points")
lines.append("| Sprint | Committed | Completed | Velocity % |")
lines.append("|--------|-----------|-----------|------------|")
lines.append(f"| {iter_name or 'Sprint'} | {int(round(committed))} | {int(round(completed))} | {int(round(velocity_pct))}% |")
lines.append("")
lines.append("### Dependencies")
if deps:
lines.append("| Item | Depends On | Title | Team | Cross-team |")
lines.append("|------|------------|-------|------|-----------|")
for d in deps[:50]:
lines.append(
f"| {d['item_id']} | {d.get('dependency_id') or '-'} | {d.get('dependency_title') or ''} | "
f"{d.get('dependency_team') or '-'} | {('Yes' if d.get('cross_team') else 'No') if d.get('cross_team') is not None else '-'} |"
)
cross_team = sum(1 for d in deps if d.get("cross_team"))
lines.append(f"> {len(deps)} dependencies tracked ({cross_team} cross-team).")
else:
lines.append("- No dependencies recorded in data.")
lines.append("")
lines.append("### <u><strong>Team Workload Summary</strong></u>")
lines.append("| Member | Work Items | Story Points | Active WIP | Blocked |")
lines.append("|--------|------------|--------------|------------|---------|")
for user, stats in sorted(workload.items(), key=lambda kv: (-kv[1]["story_points"], kv[0])):
lines.append(f"| {user} | {stats['items']} | {int(round(stats['story_points']))} | {stats['wip']} | {stats['blocked']} |")
lines.append("")
lines.append("### <u><strong>Sprint Aging Summary</strong></u>")
if aging:
lines.append("| Board Column | Avg Days | Longest Aging | Comments |")
lines.append("|--------------|----------|---------------|----------|")
for a in aging:
lines.append(f"| {a['column']} | {safe_float_str(a['avg_days'])} | {safe_float_str(a['longest'])} | |")
else:
lines.append("- No column/state history provided; aging summary not available.")
lines.append("")
if story_aging:
lines.append("#### Story Aging (User Stories)")
lines.append("| Board Column | Avg Days | Longest Aging | Comments |")
lines.append("|--------------|----------|---------------|----------|")
for a in story_aging:
lines.append(f"| {a['column']} | {safe_float_str(a['avg_days'])} | {safe_float_str(a['longest'])} | |")
lines.append("")
if bug_aging:
lines.append("#### Bug Aging")
lines.append("| Board Column | Avg Days | Longest Aging | Comments |")
lines.append("|--------------|----------|---------------|----------|")
for a in bug_aging:
lines.append(f"| {a['column']} | {safe_float_str(a['avg_days'])} | {safe_float_str(a['longest'])} | |")
lines.append("")
lines.append("### Commentary")
# Check if AI is enabled via environment variable
ai_enabled = env_bool("AI_ENABLED", False)
if ai_enabled:
# AI is enabled - use raw response directly if available
if ai_commentary:
md_sections = _prepare_ai_sections(ai_commentary)
if md_sections:
for section in md_sections:
title = section.get("title") or "Highlights"
lines.append(f"#### {title}")
for entry in section.get("lines") or []:
lines.append(entry)
lines.append("")
else:
# If no sections found, display raw commentary
lines.append(ai_commentary)
lines.append("")
else:
# AI enabled but commentary not available
lines.append("> AI commentary not available")
lines.append("")
else:
# AI not enabled - use fallback summary
fallback_text = _default_commentary_summary(iter_name, velocity_pct, reass, items, deps)
lines.append(f"> {fallback_text}")
lines.append("")
return "\n".join(lines) + "\n"
def render_report_html(
params: Dict[str, str],
items: List[Dict[str, Any]],
ai_commentary: Optional[str] = None,
*,
compact: bool = False,
) -> str:
"""Render the report as styled HTML suitable for email clients."""
now = datetime.now(timezone.utc)
sprint_start = parse_date(params.get("start"))
sprint_end = parse_date(params.get("end"))
iter_name = params.get("iteration") or ""
proj = params.get("project") or "Project"
date_range = f"{params.get('start','?')} - {params.get('end','?')}"
type_counts = item_type_counts(items)
total_items = sum(type_counts.values())
committed, completed = compute_velocity(items)
velocity_pct = (completed / committed * 100.0) if committed > 0 else 0.0
last_updated_dates = [parse_date(field(r, "last_updated_date", "changed_date", default=None)) for r in items]
last_updated = max([d for d in last_updated_dates if d], default=now)
reass = summarize_reassignments(items)
blocked_items, blocker_patterns = summarize_blockers(items, sprint_end)
defects = summarize_defects(
items,
now,
window_start=sprint_start,
window_end=sprint_end,
iteration_name=iter_name,
)
deps = summarize_dependencies(items)
workload = summarize_workload(items)
aging = summarize_aging(items, now)
story_items = _items_by_type(items, ["user story", "story"])
bug_items = _items_by_type(items, ["bug"])
story_aging = summarize_aging(story_items, now) if story_items else []
bug_aging = summarize_aging(bug_items, now) if bug_items else []
def _level_num(val: Optional[float], warn: float, bad: float, *, higher_is_better: bool) -> Optional[str]:
if val is None:
return None
try:
v = float(val)
except Exception:
return None
if higher_is_better:
if v >= bad:
return "good"
if v >= warn:
return "warn"
return "bad"
if v >= bad:
return "bad"
if v >= warn:
return "warn"
return "good"
def _bg_style(level: Optional[str]) -> str:
if level == "bad":
return "background:#fee2e2;"
if level == "warn":
return "background:#fef3c7;"
if level == "good":
return "background:#ecfdf5;"
return ""
def _fg_color(level: Optional[str]) -> str:
if level == "bad":
return "#b91c1c"
if level == "warn":
return "#b45309"
if level == "good":
return "#047857"
return "#111"
style = "body{font-family:Segoe UI,Arial,Helvetica,sans-serif;color:#111;margin:0;padding:20px;background:#ffffff;}"
table_style = "border-collapse:collapse;width:100%;mso-table-lspace:0pt;mso-table-rspace:0pt;margin-top:8px;margin-bottom:8px;"
wrap = "word-wrap:break-word;word-break:break-word;white-space:normal;"
th_style = f"border:1px solid #d1d5db;padding:10px 12px;text-align:left;font-size:13px;vertical-align:top;background:#f9fafb;font-weight:600;color:#1f2937;{wrap}"
td_style = f"border:1px solid #e5e7eb;padding:10px 12px;text-align:left;font-size:13px;vertical-align:top;color:#374151;{wrap}"
h2_style = "margin:0 0 8px 0;font-size:24px;font-weight:700;color:#111827;line-height:1.2;"
h3_style = "margin:32px 0 12px 0;font-size:18px;font-weight:600;color:#1f2937;border-bottom:2px solid #e5e7eb;padding-bottom:6px;"
note_style = "color:#6b7280;font-size:12px;margin:8px 0;line-height:1.5;"
muted_style = "color:#6b7280;font-size:13px;"
def esc(x: Any) -> str:
return _html.escape(str(x) if x is not None else "-")
type_breakdown = ", ".join([f"{k if k.endswith('s') else k + 's'}: {v}" for k, v in type_counts.items()])
vel_level = _level_num(velocity_pct, warn=70, bad=85, higher_is_better=True)
vel_color = _fg_color(vel_level)
vel_bg = _bg_style(vel_level)
overview_html = f"""
<table role="presentation" style="{table_style}">
<tr>
<th style="{th_style}">Work Items</th>
<th style="{th_style}">Committed</th>
<th style="{th_style}">Completed</th>
<th style="{th_style}">Velocity</th>
</tr>
<tr>
<td style="{td_style}">
<div style="font-size:16px;font-weight:600;color:#111827;">{total_items}</div>
<div style="{muted_style};margin-top:4px;">{esc(type_breakdown)}</div>
</td>
<td style="{td_style}">
<div style="font-size:16px;font-weight:600;color:#111827;">{int(round(committed))} SP</div>
</td>
<td style="{td_style}">
<div style="font-size:16px;font-weight:600;color:#111827;">{int(round(completed))} SP</div>
</td>
<td style="{td_style}{vel_bg}">
<div style="font-size:18px;font-weight:700;color:{vel_color};">{int(round(velocity_pct))}%</div>
</td>
</tr>
</table>
<div style="{note_style}">Last Updated: {last_updated.date()}</div>
"""
rows_reassign = "\n".join(
f"<tr><td style='{td_style}'>{esc(r['id'])}</td>"
f"<td style='{td_style}'>{esc(r.get('current_owner') or '-')}</td>"
f"<td style='{td_style}'>{esc(r.get('reassigned_from') or '-')}</td>"
f"<td style='{td_style}'>{esc(r.get('date_changed') or '-')}</td>"
f"<td style='{td_style}'>{esc(r.get('story_points') or '-')}</td></tr>"
for r in reass
)
mid_reassigns = sum(1 for r in reass if r.get("reassigned_from"))
reass_note = (
f"<div style='{note_style}'>{mid_reassigns} reassignments occurred (blocker mitigation, load balancing).</div>"
if mid_reassigns
else f"<div style='{note_style}'>No mid-sprint reassignments detected.</div>"
)
reass_html = f"""
<table style="{table_style}">
<tr><th style="{th_style}">User Story</th><th style="{th_style}">Current Owner</th><th style="{th_style}">Reassigned From</th><th style="{th_style}">Date Changed</th><th style="{th_style}">Story Points</th></tr>
{rows_reassign}
</table>
{reass_note}
"""
if blocked_items:
durations = [b.get("duration_days") for b in blocked_items if b.get("duration_days") is not None]
avg_dur = safe_float_str(sum(durations) / len(durations)) if durations else "-"
max_dur = safe_float_str(max(durations)) if durations else "-"
patterns = ", ".join([f"{k}: {v}" for k, v in blocker_patterns.most_common()]) if blocker_patterns else "-"
blockers_html = f"""
<table role="presentation" style="{table_style}">
<tr><th style="{th_style}">Blocked items</th><th style="{th_style}">Avg duration</th><th style="{th_style}">Max duration</th><th style="{th_style}">Top reasons</th></tr>
<tr><td style="{td_style}">{len(blocked_items)}</td><td style="{td_style}">{avg_dur} d</td><td style="{td_style}">{max_dur} d</td><td style="{td_style}">{esc(patterns)}</td></tr>
</table>
"""
else:
blockers_html = f"<div style='{note_style}'>No blocked items reported.</div>"
top_rows = []
for d in defects.get("top_aged") or []:
age = days_between(parse_date(field(d, "created_date")), now)
lvl = _level_num(age, warn=7, bad=14, higher_is_better=False)
cell_style = _bg_style(lvl)
top_rows.append(
f"<tr><td style='{td_style}'>{esc(field(d,'id'))}</td>"
f"<td style='{td_style}'>{esc(field(d,'title'))}</td>"
f"<td style='{td_style}{cell_style}'>{esc(safe_float_str(age))}</td>"
f"<td style='{td_style}'>{esc(field(d,'priority', default='-'))}</td>"
f"<td style='{td_style}'>{esc(field(d,'state','board_column','current_state', default='-'))}</td></tr>"
)
aged_rows = "\n".join(top_rows)
note_parts: List[str] = []
if defects.get("new_in_window_total") is not None:
carry = defects.get("carryover_open")
carry_text = "-" if carry is None else str(carry)
note_parts.append(
f"New this sprint: {defects['new_in_window_total']} defects "
f"({defects['new_in_window_open']} still open). Carryover open: {carry_text}"
)
closed_total = defects.get("closed_total")
if closed_total is not None:
closed_line = f"Closed to date: {closed_total}"
closed_in_window = defects.get("closed_in_window")
if closed_in_window is not None:
closed_line += f" ({closed_in_window} closed this sprint)"
note_parts.append(closed_line)
defect_note = ""
if note_parts:
defect_note = f"<div style=\"{note_style}\">{' • '.join(note_parts)}</div>"
mismatch_html = ""
mismatches = defects.get("iteration_mismatch") or []
if mismatches:
bullets = []
for mis in mismatches[:10]:
created = mis.get("created_date")
created_str = created.strftime("%Y-%m-%d") if isinstance(created, datetime) else str(created or "-")
bullets.append(
"<li>"
f"#{_html.escape(str(mis['id']))}: {_html.escape(str(mis['title']))} "
f"(created {created_str}, state {mis['state']}, iteration { _html.escape(str(mis.get('iteration') or '-')) })"
"</li>"
)
extra = ""
if len(mismatches) > 10:
extra = f"<li>… {len(mismatches) - 10} more</li>"
mismatch_html = (
f"<div style=\"{note_style}\">⚠️ Created this sprint but tagged to another iteration:</div>"
f"<ul style=\"margin:4px 0 8px 18px;padding-left:18px;\">{''.join(bullets)}{extra}</ul>"
)
board_cols = defects.get("board_columns") or []
board_rows = defects.get("board_rows") or []
board_html = ""
if board_cols and board_rows:
if compact:
board_html = f"<div style=\"{note_style}\">Iteration board snapshot is attached in Excel.</div>"
else:
header_cells = "".join(f"<th style=\"{th_style}\">{_html.escape(col)}</th>" for col in board_cols)
rows_html = []
for row in board_rows:
cols_html = "".join(
f"<td style=\"{td_style}\">{int(row['counts'].get(col, 0))}</td>"
for col in board_cols
)
rows_html.append(
f"<tr><td style=\"{td_style}\">{esc(row.get('iteration_label') or row.get('iteration') or '-')}</td>"
f"{cols_html}"
f"<td style=\"{td_style}\">{int(row.get('total', 0))}</td></tr>"
)
board_html = (
f"<table style=\"{table_style}\">"
f"<tr><th style=\"{th_style}\">Iteration</th>{header_cells}<th style=\"{th_style}\">Total</th></tr>"
f"{''.join(rows_html)}</table>"
)
top_table_html = ""
if aged_rows:
if compact:
top_table_html = f"<div style=\"{note_style}\">Top aging defects are listed in the Excel attachment.</div>"
else:
top_table_html = (
f"<table style=\"{table_style}\">"
f"<tr><th style=\"{th_style}\">Defect</th><th style=\"{th_style}\">Title</th>"
f"<th style=\"{th_style}\">Age (days)</th><th style=\"{th_style}\">Priority</th>"
f"<th style=\"{th_style}\">State</th></tr>{aged_rows}</table>"
)
focus_html = ""
if not compact:
focus_items = defects.get("focus_open_details") or []
if focus_items:
focus_entries = []
max_focus = 8
for detail in focus_items[:max_focus]:
age = safe_float_str(detail.get("age_days"))
focus_entries.append(
"<li>"
f"{_html.escape(str(detail.get('id')))}: {_html.escape(str(detail.get('title')))} "
f"({_html.escape(str(detail.get('assigned_to') or '-'))}, "
f"{_html.escape(str(detail.get('iteration_label') or detail.get('iteration') or '-'))}, "
f"{age}d)</li>"
)
if len(focus_items) > max_focus:
focus_entries.append(f"<li>... {len(focus_items) - max_focus} more in Excel details</li>")
focus_html = (
f"<div style=\"{note_style}\">Active/In Progress defects needing attention:</div>"
f"<ul style=\"margin:4px 0 8px 18px;padding-left:18px;\">{''.join(focus_entries)}</ul>"
)
compact_extra = ""
if compact:
compact_extra = f"<div style=\"{note_style}\">Detailed defect owners and board columns are attached in Excel.</div>"
defects_html = f"""
<table role="presentation" style="{table_style}">
<tr>
<th style="{th_style}">Total defects</th>
<th style="{th_style}">Open</th>
<th style="{th_style}">High priority</th>
<th style="{th_style}">Avg age</th>
</tr>
<tr>
<td style="{td_style}">{defects['total']}</td>
<td style="{td_style}">{defects['open']}</td>
<td style="{td_style}">{defects['high_priority']}</td>
<td style="{td_style}">{safe_float_str(defects['avg_age_days'])} d</td>
</tr>
</table>
{defect_note}
{board_html}
{top_table_html}
{focus_html}
{mismatch_html}
{compact_extra}
"""
velocity_html = f"""
<table style="{table_style}">
<tr><th style="{th_style}">Sprint</th><th style="{th_style}">Committed</th><th style="{th_style}">Completed</th><th style="{th_style}">Velocity %</th></tr>
<tr><td style="{td_style}">{esc(iter_name or 'Sprint')}</td><td style="{td_style}">{int(round(committed))}</td><td style="{td_style}">{int(round(completed))}</td><td style="{td_style}">{int(round(velocity_pct))}%</td></tr>
</table>
"""
if deps:
dep_rows = []
for d in deps[:100]:
ct = d.get("cross_team")
lvl = "warn" if ct else None
cell_style = _bg_style(lvl)
dep_rows.append(
f"<tr><td style='{td_style}'>{esc(d['item_id'])}</td>"
f"<td style='{td_style}'>{esc(d.get('dependency_id') or '-')}</td>"
f"<td style='{td_style}'>{esc(d.get('dependency_title') or '')}</td>"
f"<td style='{td_style}'>{esc(d.get('dependency_team') or '-')}</td>"
f"<td style='{td_style}{cell_style}'>{esc('Yes' if ct else 'No') if ct is not None else '-'}</td></tr>"
)
dep_rows_html = "\n".join(dep_rows)
cross_team = sum(1 for d in deps if d.get("cross_team"))
deps_html = f"""
<table style="{table_style}">
<tr><th style="{th_style}">Item</th><th style="{th_style}">Depends On</th><th style="{th_style}">Title</th><th style="{th_style}">Team</th><th style="{th_style}">Cross-team</th></tr>
{dep_rows_html}
</table>
<div style="{note_style}">{len(deps)} dependencies tracked ({cross_team} cross-team).</div>
"""
else:
deps_html = f"<div style='{note_style}'>No dependencies recorded in data.</div>"
wl_rows = []
for user, stats in sorted(workload.items(), key=lambda kv: (-kv[1]["story_points"], kv[0])):
lvl = _level_num(stats["wip"], warn=3, bad=5, higher_is_better=False)
cell_style = _bg_style(lvl)
wl_rows.append(
f"<tr><td style='{td_style}'>{esc(user)}</td><td style='{td_style}'>{stats['items']}</td>"
f"<td style='{td_style}'>{int(round(stats['story_points']))}</td>"
f"<td style='{td_style}{cell_style}'>{stats['wip']}</td>"
f"<td style='{td_style}'>{stats['blocked']}</td></tr>"
)
wl_rows_html = "\n".join(wl_rows)
workload_html = f"""
<table style="{table_style}">
<tr><th style="{th_style}">Member</th><th style="{th_style}">Work Items</th><th style="{th_style}">Story Points</th><th style="{th_style}">Active WIP</th><th style="{th_style}">Blocked</th></tr>
{wl_rows_html}
</table>
"""
if aging:
aging_html = _render_aging_table_html(aging, table_style=table_style, th_style=th_style, td_style=td_style)
else:
aging_html = f"<div style='{note_style}'>No column/state history provided; aging summary not available.</div>"
if story_aging:
story_aging_html = _render_aging_table_html(story_aging, table_style=table_style, th_style=th_style, td_style=td_style)
else:
story_aging_html = f"<div style='{note_style}'>No user story aging data available.</div>"
if bug_aging:
bug_aging_html = _render_aging_table_html(bug_aging, table_style=table_style, th_style=th_style, td_style=td_style)
else:
bug_aging_html = f"<div style='{note_style}'>No bug aging data available.</div>"
# Build the executive summary stack (baseline summary + optional AI analysis)
ai_enabled = env_bool("AI_ENABLED", False)
fallback_text = _default_commentary_summary(iter_name, velocity_pct, reass, items, deps)
commentary_html = _render_executive_summary_block(
fallback_text,
ai_commentary,
ai_enabled=ai_enabled,
)
html = f"""
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml" xmlns:vimport html as _html="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office">
<head>
<meta charset="utf-8">
<meta http-equiv="x-ua-compatible" content="ie=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="x-apple-disable-message-reformatting">
<!--[if mso]>
<noscript>
<xml>
<o:OfficeDocumentSettings>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
</noscript>
<![endif]-->
<style>{style}</style>
<title>{_html.escape(iter_name or 'Sprint')} Summary — {_html.escape(proj)}</title>
</head>
<body style="font-family:Segoe UI,Arial,Helvetica,sans-serif;color:#111;margin:0;padding:20px;background:#ffffff;">
<table role="presentation" cellpadding="0" cellspacing="0" border="0" width="100%" style="max-width:900px;margin:0 auto;">
<tr>
<td>
<h2 style="{h2_style}">{_html.escape(iter_name or 'Sprint')} Summary — {_html.escape(proj)}</h2>
<div style="{muted_style};margin-bottom:24px;">{_html.escape(date_range)}</div>
<h3 style="{h3_style}">Executive Commentary</h3>
{commentary_html}
<h3 style="{h3_style}">Sprint Overview</h3>
{overview_html}
<h3 style="{h3_style}">Ownership & Reassignments</h3>
{('<div style="' + note_style + '">See attached Excel for details.</div>') if compact else reass_html}
<h3 style="{h3_style}">Blockers & Root Causes</h3>
{blockers_html}
<h3 style="{h3_style}">Defect Insights</h3>
{defects_html}
<h3 style="{h3_style}">Velocity & Story Points</h3>
{velocity_html}
<h3 style="{h3_style}">Dependencies</h3>
{deps_html}
<h3 style="{h3_style}"><span style="text-decoration:underline;font-weight:700;">Team Workload Summary</span></h3>
{('<div style="' + note_style + '">See attached Excel for details.</div>') if compact else workload_html}
<h3 style="{h3_style}"><span style="text-decoration:underline;font-weight:700;">Sprint Aging Summary</span></h3>
{aging_html}
<h3 style="{h3_style}">Story Aging (User Stories)</h3>
{story_aging_html}
<h3 style="{h3_style}">Bug Aging</h3>
{bug_aging_html}
</td>
</tr>
</table>
</body>
</html>
"""
return html
def build_excel_details(items: List[Dict[str, Any]]) -> bytes:
"""Build an Excel workbook (XLSX) with detailed tables for
- Ownership & Reassignments
- Team Workload Summary
Returns the workbook as bytes.
"""
try:
from openpyxl import Workbook
from openpyxl.utils import get_column_letter
from openpyxl.styles import Font, Alignment
except Exception as exc: # pragma: no cover
raise RuntimeError("openpyxl is required to build Excel attachments") from exc
wb = Workbook()
# First sheet: Ownership & Reassignments
ws1 = wb.active
ws1.title = "Ownership & Reassignments"
headers1 = [
"Work Item", "Title", "Type", "Current Owner", "Reassigned From", "Date Changed", "Story Points"
]
ws1.append(headers1)
bold = Font(bold=True)
wrap = Alignment(wrapText=True, vertical="top")
for c_idx in range(1, len(headers1)+1):
ws1.cell(row=1, column=c_idx).font = bold
# rows
for r in summarize_reassignments(items):
ws1.append([
r.get("id"), r.get("title"), r.get("type"), r.get("current_owner"),
r.get("reassigned_from"), r.get("date_changed"), r.get("story_points"),
])
# widths & wrapping
widths1 = [12, 60, 16, 24, 24, 20, 14]
for i, w in enumerate(widths1, start=1):
ws1.column_dimensions[get_column_letter(i)].width = w
for row in ws1.iter_rows(min_row=2):
for cell in row:
cell.alignment = wrap
# Second sheet: Team Workload Summary
ws2 = wb.create_sheet(title="Team Workload")
headers2 = ["Member", "Work Items", "Story Points", "Active WIP", "Blocked"]
ws2.append(headers2)
for c_idx in range(1, len(headers2)+1):
ws2.cell(row=1, column=c_idx).font = bold
wl = summarize_workload(items)
for user, stats in sorted(wl.items(), key=lambda kv: (-kv[1]["story_points"], kv[0])):
ws2.append([user, stats["items"], stats["story_points"], stats["wip"], stats["blocked"]])
widths2 = [28, 12, 14, 12, 12]
for i, w in enumerate(widths2, start=1):
ws2.column_dimensions[get_column_letter(i)].width = w
# Third sheet: Defects (developer triage view)
now = datetime.now(timezone.utc)
defects_summary = summarize_defects(items, now)
ws3 = wb.create_sheet(title="Defects")
headers3 = [
"Work Item",
"Title",
"Priority",
"Severity",
"State",
"Iteration",
"Owner",
"Age (days)",
"Created",
"Closed",
"Area",
]
ws3.append(headers3)
for c_idx in range(1, len(headers3) + 1):
ws3.cell(row=1, column=c_idx).font = bold
defect_rows = defects_summary.get("all_details") or defects_summary.get("open_details") or []
for d in defect_rows:
created = d.get("created_date")
created_str = created.strftime("%Y-%m-%d") if isinstance(created, datetime) else str(created or "-")
closed = d.get("done_date")
closed_str = closed.strftime("%Y-%m-%d") if isinstance(closed, datetime) else str(closed or "-")
ws3.append([
d.get("id"),
d.get("title"),
d.get("priority"),
d.get("severity"),
d.get("state"),
d.get("iteration_label") or d.get("iteration"),
d.get("assigned_to"),
d.get("age_days"),
created_str,
closed_str,
d.get("area"),
])
widths3 = [12, 60, 12, 12, 14, 28, 24, 12, 14, 14, 28]
for i, w in enumerate(widths3, start=1):
ws3.column_dimensions[get_column_letter(i)].width = w
for row in ws3.iter_rows(min_row=2):
for cell in row:
cell.alignment = wrap
# Fourth sheet: Defect board snapshot per iteration
board_cols = defects_summary.get("board_columns") or []
board_rows = defects_summary.get("board_rows") or []
ws4 = None
if board_cols and board_rows:
ws4 = wb.create_sheet(title="Defect Board")
headers4 = ["Iteration"] + board_cols + ["Total"]
ws4.append(headers4)
for c_idx in range(1, len(headers4) + 1):
ws4.cell(row=1, column=c_idx).font = bold
for row in board_rows:
values = [row.get("iteration") or row.get("iteration_label")]
values.extend(int(row["counts"].get(col, 0)) for col in board_cols)
values.append(int(row.get("total", 0)))
ws4.append(values)
widths4 = [28] + [12] * len(board_cols) + [12]
for i, w in enumerate(widths4, start=1):
ws4.column_dimensions[get_column_letter(i)].width = w
# Freeze header rows
ws1.freeze_panes = "A2"
ws2.freeze_panes = "A2"
ws3.freeze_panes = "A2"
if ws4 is not None:
ws4.freeze_panes = "A2"
# Save to bytes
buf = io.BytesIO()
wb.save(buf)
return buf.getvalue()