Initial Plan to Add Traces - PrototypeJam/agento_app GitHub Wiki
This plan outlines the general strategy for integrating comprehensive trace collection across all modules of the Agento system.
I. Core Tracing Infrastructure (agento-streamlit/streamlit_app/utils/tracing_utils.py
)
-
File Location and Initialization:
- Create
agento-streamlit/streamlit_app/utils/tracing_utils.py
. - Ensure
agento-streamlit/streamlit_app/utils/__init__.py
exists (can be empty) to makeutils
a package.
- Create
-
tracing_utils.py
Contents:-
Imports:
json
,os
,csv
,datetime
,typing
(List, Dict, Any, Optional),pydantic
(BaseModel),agents
(TraceProcessor, Span, Trace, add_trace_processor),agents.processors.OTLPHTTPTraceSpanProcessor
. -
Directory Setup: Define
TRACES_ROOT_DIR
pointing correctly toagento-streamlit/traces/
. Create subdirectories:raw_sdk_spans/
,eval_sets/
,otel_spans/
(if using file-based OTel).# Inside agento-streamlit/streamlit_app/utils/tracing_utils.py TRACES_ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "traces")) RAW_SDK_SPANS_DIR = os.path.join(TRACES_ROOT_DIR, "raw_sdk_spans") EVAL_SETS_DIR = os.path.join(TRACES_ROOT_DIR, "eval_sets") # OTEL_SPANS_DIR = os.path.join(TRACES_ROOT_DIR, "otel_spans") # If writing OTel to file os.makedirs(RAW_SDK_SPANS_DIR, exist_ok=True) os.makedirs(EVAL_SETS_DIR, exist_ok=True) # os.makedirs(OTEL_SPANS_DIR, exist_ok=True)
-
LLMCallRecord(BaseModel)
:- Fields:
trace_id
,span_id
,parent_span_id
,workflow_name
,module_name
,agent_name
,timestamp
,model
,system_prompt
(string),full_input_prompt
(string, reconstructed concatenation),input_tool_results_json
(JSON string of tool results provided as input to this LLM call),llm_output_text
,output_tool_calls_json
(JSON string of tool calls generated by this LLM call),prompt_tokens
,completion_tokens
,total_tokens
,latency_ms
,cost_usd
,expected_output
(blank string).
- Fields:
-
AgentoTraceProcessor(TraceProcessor)
Class:-
__init__(self, module_name: str, run_id: str)
: Storesmodule_name
,run_id
. Initializes buffers (llm_calls_buffer
,raw_spans_buffer
). Initializes file path attributes toNone
. -
_calculate_cost(model: Optional[str], total_tokens: Optional[int]) -> Optional[float]
: Helper method. Include a placeholder cost table:COST_PER_MODEL = { "gpt-4": {"prompt": 0.03 / 1000, "completion": 0.06 / 1000}, # Example "gpt-4o": {"prompt": 0.005 / 1000, "completion": 0.015 / 1000}, "gpt-3.5-turbo": {"prompt": 0.0005 / 1000, "completion": 0.0015 / 1000}, # Add other models used by Agento } # In _calculate_cost, use prompt_tokens and completion_tokens for more accuracy if available # For simplicity here, using total_tokens and an assumed average or input cost if model and total_tokens: model_key_part = model.lower() for known_model, costs in COST_PER_MODEL.items(): if known_model in model_key_part: # Simple check # Fallback to prompt cost if only total tokens are available for simplicity return total_tokens * costs.get("prompt", 0.01/1000) return None
-
process_span(self, span: Span)
:- Append
span.model_dump(exclude_none=True)
toself.raw_spans_buffer
. - Detect if it's an LLM generation span (e.g., by checking
span.attributes.get("openai.event_type") == "LLM_RUN"
orspan.name
orspan.attributes.get("event.name") == "llm.generation"
). - If LLM span, extract data, reconstruct
system_prompt
andfull_input_prompt
(concatenating messages like "role: content\nrole: content"), calculatelatency_ms
(fromspan.start_time
,span.end_time
),cost_usd
. Populate and append toself.llm_calls_buffer
.
- Append
-
process_trace(self, trace: Trace)
: This method is called when a full trace (potentially containing multiple spans) is completed. This is the primary place to write files for the entire run/workflow.- Iterate through
trace.spans
. For eachspan_obj
intrace.spans
:- Add
span_obj.model_dump(exclude_none=True)
to a temporary list for the raw SDK JSONL file for this specific trace. - If it's an LLM span, extract data into
LLMCallRecord
and add to a temporary list for the eval CSV for this specific trace.
- Add
-
Write Raw SDK Spans (JSONL): Filename
raw_sdk_spans_{self.module_name}_{self.run_id}_{trace.trace_id}.jsonl
. Write each buffered raw span (from the temporary list for this trace) as a JSON line. Store path inself.raw_sdk_trace_file_path
. -
Write Eval Set (CSV): Filename
eval_data_{self.module_name}_{self.run_id}_{trace.trace_id}.csv
. Write bufferedLLMCallRecord
instances (from temporary list for this trace) to CSV. Store path inself.eval_csv_file_path
. Correction: User wants one CSV per module run, so the CSV writing should buffer across all traces within a singleAgentoTraceProcessor
instance and write at the very end (e.g. in ashutdown
orfinalize_module_run
method called explicitly). -
Revised approach for
process_trace
and file writing:process_span
buffers everything. A new methodfinalize_and_write_files(self)
will be called explicitly by the module'srun_module_X
function after theasync with agent_trace(...)
block. Thisfinalize_and_write_files
will write one raw JSONL file (containing all spans from the run) and one CSV file (containing all LLM calls from the run).
- Iterate through
-
finalize_and_write_files(self)
:-
Raw SDK Spans (JSONL): Filename
raw_sdk_spans_{self.module_name}_{self.run_id}.jsonl
. Writeself.raw_spans_buffer
. -
Eval Set (CSV): Filename
eval_data_{self.module_name}_{self.run_id}.csv
. Writeself.llm_calls_buffer
.
-
Raw SDK Spans (JSONL): Filename
-
get_generated_file_paths(self) -> Dict[str, Optional[str]]
: Returns dict of file paths.
-
-
init_tracing(module_name: str, run_id: str) -> AgentoTraceProcessor
Function:- Creates
AgentoTraceProcessor
instance. -
add_trace_processor(instance)
. - Checks
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
env var. If set, createsOTLPHTTPTraceSpanProcessor
andadd_trace_processor()
for it too. - Returns the
AgentoTraceProcessor
instance.
- Creates
-
Imports:
II. Module Integration Pattern (moduleX.py
)
-
Import:
from streamlit_app.utils.tracing_utils import init_tracing
. -
In
run_module_X
:- Generate
run_id = datetime.now().strftime("%Y%m%d%H%M%S%f")
. -
trace_processor = init_tracing(module_name="moduleX", run_id=run_id)
. - Wrap core logic:
async with agents.trace(f"ModuleX_Run_{run_id}"): ...
.
- Generate
-
End of
run_module_X
(inside the main try, before returning):-
trace_processor.finalize_and_write_files()
. -
trace_files = trace_processor.get_generated_file_paths()
. - Return
trace_files
(e.g., as part of a dictionary or tuple with module output).
-
III. Streamlit UI Integration Pattern (pages/X_Module_Y.py
)
- Adapt
run_module_async
to capturetrace_files
from backendrun_module_X
. - Store
trace_files
inst.session_state.current_logs[module_name]['trace_files_info']
. - Add download buttons for "Raw SDK Spans (JSONL)" and "Eval Data (CSV)", checking for file existence and handling
None
gracefully. Useos.path.basename
for downloaded filenames.st.download_button
takesdata
as string or bytes; file content will be read.
IV. Directory Structure
-
agento-streamlit/
-
traces/
raw_sdk_spans/
eval_sets/
-
streamlit_app/
-
utils/
__init__.py
tracing_utils.py
- ... (other utils)
pages/
app.py
-
moduleX.py
-
V. OpenAI Dashboard Traces
- No
set_tracing_disabled(True)
will be called. The use ofadd_trace_processor
allows the SDK's default processor (which sends to OpenAI if key is set) to coexist with custom ones.
Goal: Implement trace collection for module1.py
, making trace files (raw SDK spans as JSONL, LLM calls for evaluation as CSV) available for download in the Streamlit UI.
Pre-requisites:
- Basic Python and file system understanding.
- Project cloned and virtual environment set up.
- Read the "Full Plan to Add Traces to Each Module" (Part 1 above) for context.
Step 1: Create __init__.py
for Utilities
-
Action: If it doesn't exist, create an empty file named
__init__.py
inside theagento-streamlit/streamlit_app/utils/
directory. -
Purpose: This makes the
utils
directory a Python package, allowing you to import modules from it. -
File:
agento-streamlit/streamlit_app/utils/__init__.py
# This file can be empty
Step 2: Create tracing_utils.py
-
Action: Create a new Python file named
tracing_utils.py
inside theagento-streamlit/streamlit_app/utils/
directory. -
File:
agento-streamlit/streamlit_app/utils/tracing_utils.py
-
Content: Copy and paste the following code into this file:
import json import os import csv from datetime import datetime from typing import List, Dict, Any, Optional from pydantic import BaseModel from agents import TraceProcessor, Span, Trace, add_trace_processor from agents.processors import OTLPHTTPTraceSpanProcessor # For OpenTelemetry # --- Directory Setup --- # Correctly navigate up two levels from utils to agento-streamlit, then into traces TRACES_ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "traces")) RAW_SDK_SPANS_DIR = os.path.join(TRACES_ROOT_DIR, "raw_sdk_spans") EVAL_SETS_DIR = os.path.join(TRACES_ROOT_DIR, "eval_sets") os.makedirs(RAW_SDK_SPANS_DIR, exist_ok=True) os.makedirs(EVAL_SETS_DIR, exist_ok=True) # --- Pydantic Model for CSV Data --- class LLMCallRecord(BaseModel): trace_id: str span_id: str parent_span_id: Optional[str] = None workflow_name: str module_name: str agent_name: Optional[str] = None timestamp: str # ISO format model: Optional[str] = None system_prompt: Optional[str] = None full_input_prompt: str input_tool_results_json: Optional[str] = None # JSON string of tool results part of input llm_output_text: Optional[str] = None output_tool_calls_json: Optional[str] = None # JSON string of tool calls generated prompt_tokens: Optional[int] = None completion_tokens: Optional[int] = None total_tokens: Optional[int] = None latency_ms: Optional[float] = None cost_usd: Optional[float] = None expected_output: str = "" # --- Cost Estimation --- # (Simplified: uses total_tokens and an average rate if specific prompt/completion tokens aren't parsed) COST_PER_MODEL_PER_TOKEN = { "gpt-4o": {"prompt": 0.005 / 1000, "completion": 0.015 / 1000, "default_avg": 0.010 / 1000}, "gpt-4": {"prompt": 0.03 / 1000, "completion": 0.06 / 1000, "default_avg": 0.045 / 1000}, "gpt-3.5-turbo": {"prompt": 0.0005 / 1000, "completion": 0.0015 / 1000, "default_avg": 0.001 / 1000}, # Add other models as needed } def _calculate_cost(model_name: Optional[str], prompt_tokens: Optional[int], completion_tokens: Optional[int], total_tokens: Optional[int]) -> Optional[float]: if not model_name: return None normalized_model_name = model_name.lower() for model_key, costs in COST_PER_MODEL_PER_TOKEN.items(): if model_key in normalized_model_name: if prompt_tokens is not None and completion_tokens is not None: return (prompt_tokens * costs["prompt"]) + (completion_tokens * costs["completion"]) elif total_tokens is not None: # Fallback if only total tokens available return total_tokens * costs["default_avg"] return None # Model not in our cost list or no token info # --- Custom Trace Processor --- class AgentoTraceProcessor(TraceProcessor): def __init__(self, module_name: str, run_id: str): self.module_name = module_name self.run_id = run_id # Unique ID for this module execution self.llm_calls_buffer: List[LLMCallRecord] = [] self.raw_spans_buffer: List[Dict] = [] # Buffer for all spans in this run # File paths will be set when files are written self.raw_sdk_jsonl_file_path: Optional[str] = None self.eval_csv_file_path: Optional[str] = None self.current_workflow_name: Optional[str] = None # To be set by agent_trace context def process_span(self, span: Span): """Buffers raw span data and extracts LLM call details.""" # Buffer all raw spans self.raw_spans_buffer.append(span.model_dump(exclude_none=True)) # If a workflow name isn't set yet from trace, try to get it from span's trace context if not self.current_workflow_name and span.trace: self.current_workflow_name = span.trace.workflow_name # Check for LLM Generation Span (more robust checks) is_llm_span = False operation_name = span.attributes.get("operation.name", "").lower() # OpenAI SDK might use this event_type = span.attributes.get("openai.event_type", "").lower() # Newer SDK versions span_name = span.name.lower() if "llm" in operation_name or "llm_run" in event_type or "llm.generation" in span_name or "generation" in span_name: # Heuristic: if "messages" or "prompt" in attributes, likely an LLM call if "messages" in span.attributes or "input" in span.attributes or "prompt" in span.attributes: is_llm_span = True if span.span_data and span.span_data.__class__.__name__ == 'GenerationSpanData': # Legacy check is_llm_span = True if is_llm_span: attributes = span.attributes # Reconstruct input prompts system_prompt_content = None full_input_parts = [] input_tool_results_list = [] input_messages = attributes.get("input", attributes.get("messages")) # Try both keys if isinstance(input_messages, list): # Chat model style for msg in input_messages: role = msg.get("role", "unknown") content = msg.get("content", "") if isinstance(content, list): # Handle content that might be a list (e.g. vision models) content_str_parts = [] for item in content: if isinstance(item, dict) and "type" in item: if item["type"] == "text": content_str_parts.append(item.get("text","")) elif item["type"] == "image_url": content_str_parts.append(f"[Image: {item.get('image_url',{}).get('url','')[:50]}...]") else: content_str_parts.append(f"[{item['type']}]") else: # Fallback for unexpected content item structure content_str_parts.append(str(item)) content = "\n".join(content_str_parts) full_input_parts.append(f"{role}: {content}") if role == "system": system_prompt_content = content if role == "tool": tool_call_id = msg.get("tool_call_id", "unknown_tool_call") input_tool_results_list.append({"tool_call_id": tool_call_id, "output": content}) elif isinstance(input_messages, str): # Completion model style full_input_parts.append(input_messages) full_input_prompt_str = "\n".join(full_input_parts) input_tool_results_json_str = json.dumps(input_tool_results_list) if input_tool_results_list else None # Extract output llm_output_text_content = None output_tool_calls_list = [] raw_output = attributes.get("output") if isinstance(raw_output, dict): # Often the case for chat completions choices = raw_output.get("choices", []) if choices and isinstance(choices, list) and choices[0].get("message"): message = choices[0]["message"] llm_output_text_content = message.get("content") if message.get("tool_calls"): output_tool_calls_list = message["tool_calls"] elif isinstance(raw_output, str): llm_output_text_content = raw_output output_tool_calls_json_str = json.dumps(output_tool_calls_list) if output_tool_calls_list else None # Usage and timing usage = attributes.get("usage", {}) prompt_tokens = usage.get("prompt_tokens") completion_tokens = usage.get("completion_tokens") total_tokens = usage.get("total_tokens") latency = None if span.start_time and span.end_time: latency = (span.end_time - span.start_time).total_seconds() * 1000 model_name = attributes.get("model", attributes.get("openai.llm.model")) cost = _calculate_cost(model_name, prompt_tokens, completion_tokens, total_tokens) record = LLMCallRecord( trace_id=span.trace_id, span_id=span.span_id, parent_span_id=span.parent_id, workflow_name=self.current_workflow_name or f"{self.module_name}_Workflow_{self.run_id}", module_name=self.module_name, agent_name=span.attributes.get("agent.name", span.attributes.get("agent_name")), # Try common keys timestamp=span.end_time.isoformat() if span.end_time else datetime.now().isoformat(), model=model_name, system_prompt=system_prompt_content, full_input_prompt=full_input_prompt_str, input_tool_results_json=input_tool_results_json_str, llm_output_text=llm_output_text_content, output_tool_calls_json=output_tool_calls_json_str, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total_tokens, latency_ms=latency, cost_usd=cost, expected_output="" # Blank as requested ) self.llm_calls_buffer.append(record) def process_trace(self, trace: Trace): """This method is called when a Trace object is finalized by the SDK. We'll use it to ensure the workflow name is captured from the Trace object. The main file writing will happen in finalize_and_write_files. """ if trace and trace.workflow_name: self.current_workflow_name = trace.workflow_name # Individual spans within this trace would have already been processed by process_span. def finalize_and_write_files(self): """Writes all buffered data to their respective files for the current module run.""" # 1. Write Raw SDK Spans (JSONL) # One file per module run, containing all spans from that run. self.raw_sdk_jsonl_file_path = os.path.join(RAW_SDK_SPANS_DIR, f"raw_sdk_spans_{self.module_name}_{self.run_id}.jsonl") with open(self.raw_sdk_jsonl_file_path, 'w', encoding='utf-8') as f: for span_dict in self.raw_spans_buffer: f.write(json.dumps(span_dict) + '\n') print(f"Saved raw SDK spans to: {self.raw_sdk_jsonl_file_path}") # For debugging # 2. Write Eval Set (CSV) # One file per module run, containing all LLM calls from that run. if self.llm_calls_buffer: self.eval_csv_file_path = os.path.join(EVAL_SETS_DIR, f"eval_data_{self.module_name}_{self.run_id}.csv") field_names = LLMCallRecord.model_fields.keys() with open(self.eval_csv_file_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_names) writer.writeheader() for record in self.llm_calls_buffer: writer.writerow(record.model_dump(exclude_none=True)) print(f"Saved eval data to: {self.eval_csv_file_path}") # For debugging # Clear buffers for this run self.raw_spans_buffer = [] self.llm_calls_buffer = [] def get_generated_file_paths(self) -> Dict[str, Optional[str]]: return { "raw_sdk_spans_jsonl": self.raw_sdk_jsonl_file_path, "eval_data_csv": self.eval_csv_file_path, } # --- Global Tracing Setup Function --- def init_tracing(module_name: str, run_id: str) -> AgentoTraceProcessor: """Initializes and registers trace processors for the current module run.""" # Create and register our custom processor for local file saving agento_file_processor = AgentoTraceProcessor(module_name=module_name, run_id=run_id) add_trace_processor(agento_file_processor) print(f"Registered AgentoTraceProcessor for {module_name}, run {run_id}") # Check for OpenTelemetry Exporter Endpoint environment variable otel_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") if otel_endpoint: try: # This processor is from the openai-agents SDK itself otel_sdk_processor = OTLPHTTPTraceSpanProcessor(endpoint=otel_endpoint) add_trace_processor(otel_sdk_processor) print(f"Registered OTLPHTTPTraceSpanProcessor to endpoint: {otel_endpoint}") except Exception as e: print(f"Failed to initialize OTLPHTTPTraceSpanProcessor: {e}") else: print("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT not set. Skipping OTLPHTTPTraceSpanProcessor.") return agento_file_processor # Return our custom processor instance
-
Review:
-
TRACES_ROOT_DIR
now correctly points toagento-streamlit/traces/
. -
LLMCallRecord
includes all requested fields. -
_calculate_cost
has a placeholder table. -
process_span
buffers raw spans and extracts LLM data. Robust LLM span detection is attempted. -
finalize_and_write_files
is added to be called explicitly to write one JSONL and one CSV per module run. -
init_tracing
sets up bothAgentoTraceProcessor
andOTLPHTTPTraceSpanProcessor
(if endpoint is configured).
-
Step 3: Modify agento-streamlit/module1.py
-
Add/Update Imports: At the top of
module1.py
:from datetime import datetime # Make sure this is imported from agents import trace as agent_trace_context # For naming the workflow trace # Corrected import path, assuming streamlit_app is in sys.path or structure allows this from streamlit_app.utils.tracing_utils import init_tracing
-
Update
run_module_1
Function:-
Modify Signature:
async def run_module_1(user_goal: str, output_file: str) -> Optional[Dict[str, Any]]: # Returns dict of trace file paths
-
Initialize Tracing (at the beginning of the function):
module_name = "module1" run_id = datetime.now().strftime("%Y%m%d%H%M%S%f") # Unique ID for this specific run # trace_processor will be our AgentoTraceProcessor instance trace_processor = init_tracing(module_name=module_name, run_id=run_id)
-
Wrap Core Logic (main agent calls):
# ... (after trace_processor is initialized) final_module_output_data_dict = None # To store the module's own output data as a dict generated_trace_files = None # This outer try-finally ensures trace files are written even if an error occurs in the core logic try: # This context manager from 'agents' SDK helps name the overall trace/workflow async with agent_trace_context(f"{module_name}_MainWorkflow_{run_id}"): # All Runner.run calls hereafter will be part of this named trace # and processed by registered trace processors (including our AgentoTraceProcessor) log_info(f"Starting Module 1 with goal: {user_goal}", truncate=True) # ... # (YOUR EXISTING MODULE 1 LOGIC: search_agent, generate_criteria_agent calls, etc.) # Make sure `Runner.run` is used for agent calls so SDK picks them up. # For example: # search_result = await Runner.run(search_agent, input=..., context=context) # ... # module_1_output_pydantic = Module1Output(goal=..., success_criteria=..., selected_criteria=...) # ... # (Ensure module_1_output_pydantic is defined and populated by your existing logic) if 'module_1_output_pydantic' in locals() and module_1_output_pydantic: final_module_output_data_dict = module_1_output_pydantic.model_dump() else: # Fallback or error if module_1_output_pydantic wasn't created log_info("Warning: module_1_output_pydantic not created as expected.", truncate=False) # Create a minimal error structure or handle appropriately final_module_output_data_dict = {"error": "Module 1 core logic did not produce output."} # --- End of `async with agent_trace_context` block --- except Exception as e_core_logic: logger.error(f"Error during Module 1 core logic (within agent_trace_context): {e_core_logic}") verbose_logger.error(f"Error during Module 1 core logic: {e_core_logic}", exc_info=True) # final_module_output_data_dict might be None or partial # Still proceed to finalize traces with whatever was captured if final_module_output_data_dict is None: final_module_output_data_dict = {"error": f"Core logic failed: {e_core_logic}"} finally: # This block executes regardless of success or failure in the try block above. # Crucial for ensuring trace files are written. if trace_processor: try: trace_processor.finalize_and_write_files() generated_trace_files = trace_processor.get_generated_file_paths() log_info(f"Trace files finalized for Module 1, Run ID {run_id}: {generated_trace_files}", truncate=True) except Exception as e_finalize: logger.error(f"Error finalizing/writing trace files for Module 1, Run ID {run_id}: {e_finalize}") verbose_logger.error(f"Error finalizing/writing trace files: {e_finalize}", exc_info=True) # --- Save the module's own output (this is separate from traces) --- # This part should happen *after* the core logic and trace finalization if final_module_output_data_dict: try: # Ensure output_file (for module's primary JSON output) is written # This is the file Streamlit reads. module_output_dir = os.path.dirname(output_file) os.makedirs(module_output_dir, exist_ok=True) # Timestamp for the module's primary output file (distinct from run_id for traces) module_output_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") base_output_filename, output_ext = os.path.splitext(os.path.basename(output_file)) # Write the main output file that Streamlit expects with open(output_file, "w", encoding="utf-8") as f: json.dump(final_module_output_data_dict, f, indent=4) log_info(f"Module 1 primary output saved to {output_file}", truncate=True) # Write the timestamped version of the module's primary output timestamped_module_output_filename = os.path.join(module_output_dir, f"{base_output_filename}_{module_output_timestamp}{output_ext}") with open(timestamped_module_output_filename, "w", encoding="utf-8") as f_ts: json.dump(final_module_output_data_dict, f, indent=4) log_info(f"Timestamped Module 1 primary output saved to {timestamped_module_output_filename}", truncate=True) except Exception as e_save_output: logger.error(f"Error saving Module 1 primary output file: {e_save_output}") verbose_logger.error(f"Error saving Module 1 primary output: {e_save_output}", exc_info=True) # If saving the primary output fails, it's a significant issue. # We might still have trace files, so we'll return them. else: log_info("No final module output data to save for Module 1.", truncate=False) return generated_trace_files # Return the dictionary of trace file paths
-
Explanation for Junior Dev:
- We get a unique
run_id
for this specific execution of Module 1. -
init_tracing
sets up ourAgentoTraceProcessor
so it starts listening for trace events from the OpenAI Agents SDK. -
async with agent_trace_context(...)
gives a name to the overall operation of thisrun_module_1
call. This name will appear in the traces. - All calls to
await Runner.run(...)
inside thisasync with
block will automatically generate spans that ourAgentoTraceProcessor
'sprocess_span
method will capture. - The
finally
block ensures thattrace_processor.finalize_and_write_files()
is called. This is where all buffered raw spans are written to a.jsonl
file and all buffered LLM call records are written to a.csv
file for this run. - The function now returns
generated_trace_files
, which is a dictionary containing the paths to these newly created trace files. The Streamlit page will use these paths to offer downloads. - The module's own primary JSON output (e.g.,
module1_output.json
) is still saved as before, as Streamlit relies on reading this file.
- We get a unique
-
Step 4: Update Streamlit Page for Module 1 (agento-streamlit/streamlit_app/pages/2_Module_1_Criteria.py
)
-
Modify
run_module_async
and its Call:- The
run_module_1
function (frommodule1.py
) now returns the dictionary of trace file paths. Yourrun_module_async
wrapper in the Streamlit page needs to capture and return this.
# In agento-streamlit/streamlit_app/pages/2_Module_1_Criteria.py # Inside the `async def run_module_async():` function: # ... # with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): # # MODIFIED LINE: # # run_module_1 now returns a dictionary of trace file paths or None # returned_value_from_backend = await run_module_1(user_goal, output_file) # ... # with debug_container: # st.write("Step 5: run_module_1 completed") # # MODIFIED LINE: Log what was returned # st.code(f"Return value from backend run_module_1: {returned_value_from_backend}") # # MODIFIED LINE: Return this dictionary (or None if it failed) # return returned_value_from_backend # ... # In the "if st.button('🚀 Run Module 1', ...):" block: # ... # # MODIFIED LINE: Capture the dictionary of trace file paths (or None) # trace_files_info_dict_or_none = run_async_function(run_module_async()) # ...
- The
-
Store Trace File Paths in Session State:
- After
output_data = json.loads(content)
(where you load the module's main output), add logic to store thetrace_files_info_dict_or_none
.
# In agento-streamlit/streamlit_app/pages/2_Module_1_Criteria.py # Inside the "if st.button(...)" block, after reading `output_file` content into `output_data` # ... # save_module_output('module1', output_data) # This is for the module's primary output # ADD THIS BLOCK for storing trace file information: if trace_files_info_dict_or_none: # Check if it's not None if 'current_logs' not in st.session_state: st.session_state.current_logs = {} if 'module1' not in st.session_state.current_logs: st.session_state.current_logs['module1'] = {} # Store the dictionary of trace file paths st.session_state.current_logs['module1']['trace_files_info'] = trace_files_info_dict_or_none with debug_container: # Or st.sidebar.write for less intrusive debug st.write("Debug: Stored trace_files_info for module1 into session state.") st.json(trace_files_info_dict_or_none) else: with debug_container: st.warning("Debug: No trace_files_info_dict returned or it was None. Trace files might not have been generated.") # ... # Your existing save_logs('module1', standard_log, verbose_log) can remain as is for stdout/stderr logs.
- After
-
Add Download Buttons for Traces:
- In the "Output section" -> "Downloads" part of
2_Module_1_Criteria.py
:
# In agento-streamlit/streamlit_app/pages/2_Module_1_Criteria.py # ... # # Download options # st.subheader("📥 Downloads") # col1, col2, col3 = st.columns(3) # Your existing columns for module output & logs # # with col1: # download_json(output_data, "module1_output.json") # Existing # # logs = st.session_state.current_logs.get('module1', {}) # Existing # with col2: # if logs.get('standard'): # download_text(logs['standard'], "module1_standard.log", "📥 Download Standard Log") # Existing # with col3: # if logs.get('verbose'): # download_text(logs['verbose'], "module1_verbose.log", "📥 Download Verbose Log") # Existing # ADD NEW SUBHEADER AND COLUMNS FOR TRACE DOWNLOADS: st.markdown("---") # Visual separator st.subheader("📊 Trace File Downloads") trace_dl_cols = st.columns(2) # Using 2 columns for trace files for now # Retrieve trace file info from session state module1_logs_session = st.session_state.current_logs.get('module1', {}) trace_files_info_from_session = module1_logs_session.get('trace_files_info') if trace_files_info_from_session and isinstance(trace_files_info_from_session, dict): # Download Raw SDK Spans (JSONL) raw_sdk_path = trace_files_info_from_session.get("raw_sdk_spans_jsonl") if raw_sdk_path and os.path.exists(raw_sdk_path): with open(raw_sdk_path, 'r', encoding='utf-8') as f_raw_sdk: raw_sdk_content = f_raw_sdk.read() # Read content for download button with trace_dl_cols[0]: st.download_button( # Use st.download_button directly label="📥 Download Raw SDK Spans (JSONL)", data=raw_sdk_content, file_name=os.path.basename(raw_sdk_path), # Dynamic filename mime='application/jsonl', key=f"download_raw_sdk_{os.path.basename(raw_sdk_path)}" # Unique key ) elif raw_sdk_path: # Path was provided but file doesn't exist with trace_dl_cols[0]: st.caption(f"File not found: {os.path.basename(raw_sdk_path)}") # Download Eval Data (CSV) eval_csv_path = trace_files_info_from_session.get("eval_data_csv") if eval_csv_path and os.path.exists(eval_csv_path): with open(eval_csv_path, 'r', encoding='utf-8') as f_eval_csv: eval_csv_content = f_eval_csv.read() # Read content for download button with trace_dl_cols[1]: st.download_button( label="📥 Download Eval Data (CSV)", data=eval_csv_content, file_name=os.path.basename(eval_csv_path), # Dynamic filename mime='text/csv', key=f"download_eval_csv_{os.path.basename(eval_csv_path)}" # Unique key ) elif eval_csv_path: # Path was provided but file doesn't exist with trace_dl_cols[1]: st.caption(f"File not found: {os.path.basename(eval_csv_path)}") else: st.caption("Trace files for Module 1 are not yet available or failed to generate.")
-
Developer Note:
st.download_button
needs the actualdata
(as string or bytes). We read the file content before passing it. Thefile_name
parameter sets the name the user sees when downloading. Usingos.path.basename
makes sure we get just the filename. Thekey
parameter is important for Streamlit to differentiate buttons if multiple exist.
- In the "Output section" -> "Downloads" part of
Step 5: Testing and Verification
-
Environment Variable for OTel (Optional):
- If you want to test OTel exporting to an HTTP endpoint (e.g., a local Jaeger, SigNoz, or OpenTelemetry Collector), set this environment variable before starting Streamlit:
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="http://localhost:4318/v1/traces"
(Replace the URL if your collector uses a different one). - If this variable is not set,
init_tracing
will skip adding theOTLPHTTPTraceSpanProcessor
, and only local file traces fromAgentoTraceProcessor
will be active.
- If you want to test OTel exporting to an HTTP endpoint (e.g., a local Jaeger, SigNoz, or OpenTelemetry Collector), set this environment variable before starting Streamlit:
-
Run the Streamlit Application:
- Open your terminal.
- Navigate to the
prototypejam-agento_app/agento-streamlit/streamlit_app/
directory. - Run the command:
streamlit run app.py
-
Perform Module 1 Test:
- In your web browser, go to the Streamlit app (usually
http://localhost:8501
). - Navigate to the "API Configuration" page from the sidebar and enter your OpenAI API key.
- Navigate to the "Module 1 Criteria" page.
- Enter a test goal in the text area (e.g., "Develop a personal finance tracker app").
- Click the "🚀 Run Module 1" button.
- In your web browser, go to the Streamlit app (usually
-
Observe and Verify:
- UI Feedback: The Streamlit page should show spinners and status updates. Check the "Debug Information" expander for step-by-step logs if you added them, especially for trace file info storage.
-
Terminal Logs: Check the terminal where you ran
streamlit run app.py
. You should seeprint
statements fromtracing_utils.py
(like "Registered AgentoTraceProcessor..." and "Saved raw SDK spans to..."). -
File System (
agento-streamlit/traces/
directory):- After the module run completes, go to the
agento-streamlit/traces/
directory in your project. - You should see
raw_sdk_spans/
andeval_sets/
subdirectories. - Inside
raw_sdk_spans/
, look for a file likeraw_sdk_spans_module1_<run_id>.jsonl
. - Inside
eval_sets/
, look for a file likeeval_data_module1_<run_id>.csv
. - The
<run_id>
part will be a timestamp like20231027123045123456
.
- After the module run completes, go to the
-
Streamlit Download Buttons:
- On the Module 1 page, scroll down to the "Output" section, then to "Trace File Downloads".
- You should see buttons "📥 Download Raw SDK Spans (JSONL)" and "📥 Download Eval Data (CSV)".
- Click each button. The files should download with the correct names.
-
Inspect Downloaded/Generated Files:
-
raw_sdk_spans_module1_....jsonl
: Open this file. Each line should be a JSON object representing one span captured by the OpenAI Agents SDK during the Module 1 run. Look for spans related toSearchAgent
,CriteriaGenerator
, andCriteriaEvaluator
. -
eval_data_module1_....csv
: Open this with a spreadsheet program or text editor.- Check the headers: They should match the fields in your
LLMCallRecord
Pydantic model (e.g.,trace_id
,span_id
,workflow_name
,module_name
,agent_name
,timestamp
,model
,system_prompt
,full_input_prompt
,input_tool_results_json
,llm_output_text
,output_tool_calls_json
,prompt_tokens
,completion_tokens
,total_tokens
,latency_ms
,cost_usd
,expected_output
). - Each row should correspond to an LLM call made by an agent in Module 1.
- Verify
system_prompt
andfull_input_prompt
look correct (system prompt captured, messages concatenated). - Verify
llm_output_text
contains the agent's response. - Check if
model
, token counts,latency_ms
, andcost_usd
are populated (cost might be0.0
orNone
if the model isn't in yourCOST_PER_MODEL_PER_TOKEN
table or tokens are zero). - The
expected_output
column must be present and be blank. - Verify
output_tool_calls_json
is populated if an agent decided to call a tool (e.g.SearchAgent
callingWebSearchTool
). - Verify
input_tool_results_json
is populated if an agent received tool results as input (less common in Module 1's current structure for the primary LLM calls, but good to check).
- Check the headers: They should match the fields in your
-
Step 6: Debugging Tips for the Junior Developer
-
Start Simple: If things don't work, temporarily simplify
AgentoTraceProcessor
. For example, inprocess_span
, justprint(span.model_dump())
to see what data you're getting before trying to parse it complexly. -
Check Paths Carefully: Path issues are common. Use
print(os.path.abspath(TRACES_ROOT_DIR))
intracing_utils.py
to confirm where it thinks it's writing files. -
Incremental Testing: Test the
tracing_utils.py
logic independently if possible by creating mockSpan
andTrace
objects. -
Python Imports: If you get
ModuleNotFoundError
, it's likely an issue with how Python is finding your files.- When running
streamlit run app.py
fromagento-streamlit/streamlit_app/
, imports likefrom utils.tracing_utils import ...
should work becausestreamlit_app/
is effectively added tosys.path
. - The import
from streamlit_app.utils.tracing_utils import init_tracing
inmodule1.py
assumes that theagento-streamlit
directory (the parent ofstreamlit_app
andmodule1.py
) is part ofPYTHONPATH
or thatmodule1.py
is run in a way that Python can findstreamlit_app
as a package. This is usually handled by how Streamlit invokes the backend scripts.
- When running
- Small Steps: Make one change from the plan, then test. Don't try to implement everything at once.
- Read Error Messages: Python error messages are very helpful. Read them carefully.
-
Pydantic Errors: If you get
ValidationError
from Pydantic, it means the data being passed toLLMCallRecord(...)
doesn't match the expected types or is missing required fields. Print the dictionary you're trying to validate.
This detailed plan for Module 1 should guide the junior developer through the implementation. The key is careful creation of tracing_utils.py
and then correctly integrating its initialization and file finalization steps into module1.py
and its Streamlit page.