From 075e9179c1593bbea4d0b416ba2e3c28ab346fa0 Mon Sep 17 00:00:00 2001 From: Timothy Date: Tue, 27 Jan 2026 10:11:54 -0800 Subject: [PATCH] fix: retry logic broken by merge conflict --- core/framework/graph/node.py | 191 ++++++++++++++++++++--------------- 1 file changed, 108 insertions(+), 83 deletions(-) diff --git a/core/framework/graph/node.py b/core/framework/graph/node.py index a11898b5..1677d0fb 100644 --- a/core/framework/graph/node.py +++ b/core/framework/graph/node.py @@ -807,97 +807,122 @@ Keep the same JSON structure but with shorter content values. f" ⚠ Response still truncated after {compaction_attempt} compaction attempts" ) - total_input_tokens += response.input_tokens - total_output_tokens += response.output_tokens + # Phase 2: Validation retry loop for Pydantic models + max_validation_retries = ctx.node_spec.max_validation_retries if ctx.node_spec.output_model else 0 + validation_attempt = 0 + total_input_tokens = 0 + total_output_tokens = 0 + current_messages = messages.copy() - # Log the response - response_preview = ( - response.content[:200] if len(response.content) > 200 else response.content - ) - if len(response.content) > 200: - response_preview += "..." - logger.info(f" ← Response: {response_preview}") + while True: + total_input_tokens += response.input_tokens + total_output_tokens += response.output_tokens - # If no output_model, break immediately (no validation needed) - if ctx.node_spec.output_model is None: - break + # Log the response + response_preview = ( + response.content[:200] if len(response.content) > 200 else response.content + ) + if len(response.content) > 200: + response_preview += "..." + logger.info(f" ← Response: {response_preview}") - # Try to parse and validate the response - try: - import json - parsed = self._extract_json(response.content, ctx.node_spec.output_keys) + # If no output_model, break immediately (no validation needed) + if ctx.node_spec.output_model is None: + break - if isinstance(parsed, dict): - from framework.graph.validator import OutputValidator - validator = OutputValidator() - validation_result, validated_model = validator.validate_with_pydantic( - parsed, ctx.node_spec.output_model - ) + # Try to parse and validate the response + try: + import json + parsed = self._extract_json(response.content, ctx.node_spec.output_keys) - if validation_result.success: - # Validation passed, break out of retry loop - model_name = ctx.node_spec.output_model.__name__ - logger.info(f" ✓ Pydantic validation passed for {model_name}") - break - else: - # Validation failed - validation_attempt += 1 + if isinstance(parsed, dict): + from framework.graph.validator import OutputValidator + validator = OutputValidator() + validation_result, validated_model = validator.validate_with_pydantic( + parsed, ctx.node_spec.output_model + ) - if validation_attempt <= max_validation_retries: - # Add validation feedback to messages and retry - feedback = validator.format_validation_feedback( - validation_result, ctx.node_spec.output_model - ) - logger.warning( - f" ⚠ Pydantic validation failed " - f"(attempt {validation_attempt}/{max_validation_retries}): " - f"{validation_result.error}" - ) - logger.info(" 🔄 Retrying with validation feedback...") - - # Add the assistant's failed response and feedback - current_messages.append({ - "role": "assistant", - "content": response.content - }) - current_messages.append({ - "role": "user", - "content": feedback - }) - continue # Retry the LLM call - else: - # Max retries exceeded - latency_ms = int((time.time() - start) * 1000) - err = validation_result.error - logger.error( - f" ✗ Pydantic validation failed after " - f"{max_validation_retries} retries: {err}" - ) - ctx.runtime.record_outcome( - decision_id=decision_id, - success=False, - error=f"Validation failed: {validation_result.error}", - tokens_used=total_input_tokens + total_output_tokens, - latency_ms=latency_ms, - ) - error_msg = ( - f"Pydantic validation failed after " - f"{max_validation_retries} retries: {err}" - ) - return NodeResult( - success=False, - error=error_msg, - output=parsed, - tokens_used=total_input_tokens + total_output_tokens, - latency_ms=latency_ms, - validation_errors=validation_result.errors, - ) - else: - # Not a dict, can't validate - break and let downstream handle + if validation_result.success: + # Validation passed, break out of retry loop + model_name = ctx.node_spec.output_model.__name__ + logger.info(f" ✓ Pydantic validation passed for {model_name}") break - except Exception: - # JSON extraction failed - break and let downstream handle + else: + # Validation failed + validation_attempt += 1 + + if validation_attempt <= max_validation_retries: + # Add validation feedback to messages and retry + feedback = validator.format_validation_feedback( + validation_result, ctx.node_spec.output_model + ) + logger.warning( + f" ⚠ Pydantic validation failed " + f"(attempt {validation_attempt}/{max_validation_retries}): " + f"{validation_result.error}" + ) + logger.info(" 🔄 Retrying with validation feedback...") + + # Add the assistant's failed response and feedback + current_messages.append({ + "role": "assistant", + "content": response.content + }) + current_messages.append({ + "role": "user", + "content": feedback + }) + + # Re-call LLM with feedback + if ctx.available_tools and self.tool_executor: + response = ctx.llm.complete_with_tools( + messages=current_messages, + system=system, + tools=ctx.available_tools, + tool_executor=executor, + max_tokens=ctx.max_tokens, + ) + else: + response = ctx.llm.complete( + messages=current_messages, + system=system, + json_mode=use_json_mode, + max_tokens=ctx.max_tokens, + ) + continue # Retry validation + else: + # Max retries exceeded + latency_ms = int((time.time() - start) * 1000) + err = validation_result.error + logger.error( + f" ✗ Pydantic validation failed after " + f"{max_validation_retries} retries: {err}" + ) + ctx.runtime.record_outcome( + decision_id=decision_id, + success=False, + error=f"Validation failed: {validation_result.error}", + tokens_used=total_input_tokens + total_output_tokens, + latency_ms=latency_ms, + ) + error_msg = ( + f"Pydantic validation failed after " + f"{max_validation_retries} retries: {err}" + ) + return NodeResult( + success=False, + error=error_msg, + output=parsed, + tokens_used=total_input_tokens + total_output_tokens, + latency_ms=latency_ms, + validation_errors=validation_result.errors, + ) + else: + # Not a dict, can't validate - break and let downstream handle break + except Exception: + # JSON extraction failed - break and let downstream handle + break latency_ms = int((time.time() - start) * 1000)