feat: structured-output Trader and Research Manager (#434, finishes the trio)

Extends the canonical structured-output pattern from the Portfolio Manager
to the other two decision-making agents.  Each of the three agents now
returns a typed Pydantic instance via llm.with_structured_output() in a
single primary call, and a render helper turns the result into the same
markdown shape downstream agents and saved reports already consume.

- ResearchPlan: 5-tier recommendation, conversational rationale, concrete
  strategic actions for the trader.
- TraderProposal: 3-tier action (transaction direction is naturally Buy /
  Hold / Sell — position sizing happens later at the Portfolio Manager),
  reasoning, and optional entry_price / stop_loss / position_sizing.
  Rendered output preserves the trailing "FINAL TRANSACTION PROPOSAL:
  **BUY/HOLD/SELL**" line for backward compatibility with the analyst
  stop-signal text.
- PortfolioDecision: 5-tier rating, executive summary, investment thesis,
  optional price_target / time_horizon (unchanged).

The shared try-structured-then-fallback pattern is extracted into
tradingagents/agents/utils/structured.py (bind_structured +
invoke_structured_or_freetext) so all three agents go through the same
code path and log the same warning when a provider lacks structured
output and the agent falls back to free-text generation.

Net effect for users: every saved markdown report (research/manager.md,
trading/trader.md, portfolio/decision.md) now has consistent section
headers across runs and providers, easier to scan.

Net effect for the runtime: the rating extraction round-trip is gone —
the rating comes from the structured response itself, not a second
LLM call. SignalProcessor was already simplified to a heuristic adapter
in the previous commit.

11 new tests in tests/test_structured_agents.py cover the Trader and
Research Manager render functions, structured-output happy paths, and
free-text fallback. Full suite: 88 tests pass in ~2s without API keys.
This commit is contained in:
Yijia-Xiao
2026-04-25 20:27:23 +00:00
parent 0fda24515f
commit bba147798f
6 changed files with 519 additions and 69 deletions

View File

@@ -0,0 +1,232 @@
"""Tests for structured-output agents (Trader and Research Manager).
The Portfolio Manager has its own coverage in tests/test_memory_log.py
(which exercises the full memory-log → PM injection cycle). This file
covers the parallel schemas, render functions, and graceful-fallback
behavior we added for the Trader and Research Manager so all three
decision-making agents share the same shape.
"""
from unittest.mock import MagicMock
import pytest
from tradingagents.agents.managers.research_manager import create_research_manager
from tradingagents.agents.schemas import (
PortfolioRating,
ResearchPlan,
TraderAction,
TraderProposal,
render_research_plan,
render_trader_proposal,
)
from tradingagents.agents.trader.trader import create_trader
# ---------------------------------------------------------------------------
# Render functions
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestRenderTraderProposal:
def test_minimal_required_fields(self):
p = TraderProposal(action=TraderAction.HOLD, reasoning="Balanced setup; no edge.")
md = render_trader_proposal(p)
assert "**Action**: Hold" in md
assert "**Reasoning**: Balanced setup; no edge." in md
# The trailing FINAL TRANSACTION PROPOSAL line is preserved for the
# analyst stop-signal text and any external code that greps for it.
assert "FINAL TRANSACTION PROPOSAL: **HOLD**" in md
def test_optional_fields_included_when_present(self):
p = TraderProposal(
action=TraderAction.BUY,
reasoning="Strong technicals + fundamentals.",
entry_price=189.5,
stop_loss=178.0,
position_sizing="6% of portfolio",
)
md = render_trader_proposal(p)
assert "**Action**: Buy" in md
assert "**Entry Price**: 189.5" in md
assert "**Stop Loss**: 178.0" in md
assert "**Position Sizing**: 6% of portfolio" in md
assert "FINAL TRANSACTION PROPOSAL: **BUY**" in md
def test_optional_fields_omitted_when_absent(self):
p = TraderProposal(action=TraderAction.SELL, reasoning="Guidance cut.")
md = render_trader_proposal(p)
assert "Entry Price" not in md
assert "Stop Loss" not in md
assert "Position Sizing" not in md
assert "FINAL TRANSACTION PROPOSAL: **SELL**" in md
@pytest.mark.unit
class TestRenderResearchPlan:
def test_required_fields(self):
p = ResearchPlan(
recommendation=PortfolioRating.OVERWEIGHT,
rationale="Bull case carried; tailwinds intact.",
strategic_actions="Build position over two weeks; cap at 5%.",
)
md = render_research_plan(p)
assert "**Recommendation**: Overweight" in md
assert "**Rationale**: Bull case carried" in md
assert "**Strategic Actions**: Build position" in md
def test_all_5_tier_ratings_render(self):
for rating in PortfolioRating:
p = ResearchPlan(
recommendation=rating,
rationale="r",
strategic_actions="s",
)
md = render_research_plan(p)
assert f"**Recommendation**: {rating.value}" in md
# ---------------------------------------------------------------------------
# Trader agent: structured happy path + fallback
# ---------------------------------------------------------------------------
def _make_trader_state():
return {
"company_of_interest": "NVDA",
"investment_plan": "**Recommendation**: Buy\n**Rationale**: ...\n**Strategic Actions**: ...",
}
def _structured_trader_llm(captured: dict, proposal: TraderProposal | None = None):
"""Build a MagicMock LLM whose with_structured_output binding captures the
prompt and returns a real TraderProposal so render_trader_proposal works.
"""
if proposal is None:
proposal = TraderProposal(
action=TraderAction.BUY,
reasoning="Strong setup.",
)
structured = MagicMock()
structured.invoke.side_effect = lambda prompt: (
captured.__setitem__("prompt", prompt) or proposal
)
llm = MagicMock()
llm.with_structured_output.return_value = structured
return llm
@pytest.mark.unit
class TestTraderAgent:
def test_structured_path_produces_rendered_markdown(self):
captured = {}
proposal = TraderProposal(
action=TraderAction.BUY,
reasoning="AI capex cycle intact; institutional flows constructive.",
entry_price=189.5,
stop_loss=178.0,
position_sizing="6% of portfolio",
)
llm = _structured_trader_llm(captured, proposal)
trader = create_trader(llm)
result = trader(_make_trader_state())
plan = result["trader_investment_plan"]
assert "**Action**: Buy" in plan
assert "**Entry Price**: 189.5" in plan
assert "FINAL TRANSACTION PROPOSAL: **BUY**" in plan
# The same rendered markdown is also added to messages for downstream agents.
assert plan in result["messages"][0].content
def test_prompt_includes_investment_plan(self):
captured = {}
llm = _structured_trader_llm(captured)
trader = create_trader(llm)
trader(_make_trader_state())
# The investment plan is in the user message of the captured prompt.
prompt = captured["prompt"]
assert any("Proposed Investment Plan" in m["content"] for m in prompt)
def test_falls_back_to_freetext_when_structured_unavailable(self):
plain_response = (
"**Action**: Sell\n\nGuidance cut hits margins.\n\n"
"FINAL TRANSACTION PROPOSAL: **SELL**"
)
llm = MagicMock()
llm.with_structured_output.side_effect = NotImplementedError("provider unsupported")
llm.invoke.return_value = MagicMock(content=plain_response)
trader = create_trader(llm)
result = trader(_make_trader_state())
assert result["trader_investment_plan"] == plain_response
# ---------------------------------------------------------------------------
# Research Manager agent: structured happy path + fallback
# ---------------------------------------------------------------------------
def _make_rm_state():
return {
"company_of_interest": "NVDA",
"investment_debate_state": {
"history": "Bull and bear arguments here.",
"bull_history": "Bull says...",
"bear_history": "Bear says...",
"current_response": "",
"judge_decision": "",
"count": 1,
},
}
def _structured_rm_llm(captured: dict, plan: ResearchPlan | None = None):
if plan is None:
plan = ResearchPlan(
recommendation=PortfolioRating.HOLD,
rationale="Balanced view across both sides.",
strategic_actions="Hold current position; reassess after earnings.",
)
structured = MagicMock()
structured.invoke.side_effect = lambda prompt: (
captured.__setitem__("prompt", prompt) or plan
)
llm = MagicMock()
llm.with_structured_output.return_value = structured
return llm
@pytest.mark.unit
class TestResearchManagerAgent:
def test_structured_path_produces_rendered_markdown(self):
captured = {}
plan = ResearchPlan(
recommendation=PortfolioRating.OVERWEIGHT,
rationale="Bull case is stronger; AI tailwind intact.",
strategic_actions="Build position gradually over two weeks.",
)
llm = _structured_rm_llm(captured, plan)
rm = create_research_manager(llm)
result = rm(_make_rm_state())
ip = result["investment_plan"]
assert "**Recommendation**: Overweight" in ip
assert "**Rationale**: Bull case" in ip
assert "**Strategic Actions**: Build position" in ip
def test_prompt_uses_5_tier_rating_scale(self):
"""The RM prompt must list all five tiers so the schema enum matches user expectations."""
captured = {}
llm = _structured_rm_llm(captured)
rm = create_research_manager(llm)
rm(_make_rm_state())
prompt = captured["prompt"]
for tier in ("Buy", "Overweight", "Hold", "Underweight", "Sell"):
assert f"**{tier}**" in prompt, f"missing {tier} in prompt"
def test_falls_back_to_freetext_when_structured_unavailable(self):
plain_response = "**Recommendation**: Sell\n\n**Rationale**: ...\n\n**Strategic Actions**: ..."
llm = MagicMock()
llm.with_structured_output.side_effect = NotImplementedError("provider unsupported")
llm.invoke.return_value = MagicMock(content=plain_response)
rm = create_research_manager(llm)
result = rm(_make_rm_state())
assert result["investment_plan"] == plain_response

View File

@@ -5,35 +5,24 @@ Uses LangChain's ``with_structured_output`` so the LLM produces a typed
back to markdown for storage in ``final_trade_decision`` so memory log, back to markdown for storage in ``final_trade_decision`` so memory log,
CLI display, and saved reports continue to consume the same shape they do CLI display, and saved reports continue to consume the same shape they do
today. When a provider does not expose structured output, the agent falls today. When a provider does not expose structured output, the agent falls
back to a free-text invocation and the existing heuristic rating parser. back gracefully to free-text generation.
""" """
from __future__ import annotations from __future__ import annotations
import logging
from tradingagents.agents.schemas import PortfolioDecision, render_pm_decision from tradingagents.agents.schemas import PortfolioDecision, render_pm_decision
from tradingagents.agents.utils.agent_utils import ( from tradingagents.agents.utils.agent_utils import (
build_instrument_context, build_instrument_context,
get_language_instruction, get_language_instruction,
) )
from tradingagents.agents.utils.structured import (
logger = logging.getLogger(__name__) bind_structured,
invoke_structured_or_freetext,
)
def create_portfolio_manager(llm): def create_portfolio_manager(llm):
# Wrap once at agent construction; if the provider does not support structured_llm = bind_structured(llm, PortfolioDecision, "Portfolio Manager")
# structured output we keep ``structured_llm`` as None and use the
# free-text fallback for every call.
try:
structured_llm = llm.with_structured_output(PortfolioDecision)
except (NotImplementedError, AttributeError) as exc:
logger.warning(
"Portfolio Manager: provider does not support with_structured_output (%s); "
"falling back to free-text generation",
exc,
)
structured_llm = None
def portfolio_manager_node(state) -> dict: def portfolio_manager_node(state) -> dict:
instrument_context = build_instrument_context(state["company_of_interest"]) instrument_context = build_instrument_context(state["company_of_interest"])
@@ -74,7 +63,13 @@ def create_portfolio_manager(llm):
Be decisive and ground every conclusion in specific evidence from the analysts.{get_language_instruction()}""" Be decisive and ground every conclusion in specific evidence from the analysts.{get_language_instruction()}"""
final_trade_decision = _invoke_pm(structured_llm, llm, prompt) final_trade_decision = invoke_structured_or_freetext(
structured_llm,
llm,
prompt,
render_pm_decision,
"Portfolio Manager",
)
new_risk_debate_state = { new_risk_debate_state = {
"judge_decision": final_trade_decision, "judge_decision": final_trade_decision,
@@ -95,26 +90,3 @@ Be decisive and ground every conclusion in specific evidence from the analysts.{
} }
return portfolio_manager_node return portfolio_manager_node
def _invoke_pm(structured_llm, plain_llm, prompt: str) -> str:
"""Run the PM call and return the markdown-rendered decision.
Tries the structured-output path first; if it fails for any reason
(provider does not support it, model returns malformed JSON, network
glitch on the structured endpoint), falls back to the plain free-text
invocation so the pipeline still produces a result.
"""
if structured_llm is not None:
try:
decision = structured_llm.invoke(prompt)
return render_pm_decision(decision)
except Exception as exc:
logger.warning(
"Portfolio Manager: structured-output invocation failed (%s); "
"retrying once as free text",
exc,
)
response = plain_llm.invoke(prompt)
return response.content

View File

@@ -1,8 +1,18 @@
"""Research Manager: turns the bull/bear debate into a structured investment plan for the trader."""
from __future__ import annotations
from tradingagents.agents.schemas import ResearchPlan, render_research_plan
from tradingagents.agents.utils.agent_utils import build_instrument_context from tradingagents.agents.utils.agent_utils import build_instrument_context
from tradingagents.agents.utils.structured import (
bind_structured,
invoke_structured_or_freetext,
)
def create_research_manager(llm): def create_research_manager(llm):
structured_llm = bind_structured(llm, ResearchPlan, "Research Manager")
def research_manager_node(state) -> dict: def research_manager_node(state) -> dict:
instrument_context = build_instrument_context(state["company_of_interest"]) instrument_context = build_instrument_context(state["company_of_interest"])
history = state["investment_debate_state"].get("history", "") history = state["investment_debate_state"].get("history", "")
@@ -24,31 +34,31 @@ def create_research_manager(llm):
Commit to a clear stance whenever the debate's strongest arguments warrant one; reserve Hold for situations where the evidence on both sides is genuinely balanced. Commit to a clear stance whenever the debate's strongest arguments warrant one; reserve Hold for situations where the evidence on both sides is genuinely balanced.
**Required Output Structure:**
1. **Recommendation**: State one of Buy / Overweight / Hold / Underweight / Sell.
2. **Rationale**: Summarise the key points from both sides and explain which arguments led to this recommendation.
3. **Strategic Actions**: Concrete steps for the trader to implement the recommendation, including position sizing guidance consistent with the rating.
Present your analysis conversationally, as if speaking naturally to a teammate.
--- ---
**Debate History:** **Debate History:**
{history}""" {history}"""
response = llm.invoke(prompt)
investment_plan = invoke_structured_or_freetext(
structured_llm,
llm,
prompt,
render_research_plan,
"Research Manager",
)
new_investment_debate_state = { new_investment_debate_state = {
"judge_decision": response.content, "judge_decision": investment_plan,
"history": investment_debate_state.get("history", ""), "history": investment_debate_state.get("history", ""),
"bear_history": investment_debate_state.get("bear_history", ""), "bear_history": investment_debate_state.get("bear_history", ""),
"bull_history": investment_debate_state.get("bull_history", ""), "bull_history": investment_debate_state.get("bull_history", ""),
"current_response": response.content, "current_response": investment_plan,
"count": investment_debate_state["count"], "count": investment_debate_state["count"],
} }
return { return {
"investment_debate_state": new_investment_debate_state, "investment_debate_state": new_investment_debate_state,
"investment_plan": response.content, "investment_plan": investment_plan,
} }
return research_manager_node return research_manager_node

View File

@@ -1,15 +1,16 @@
"""Pydantic schemas used by agents that produce structured output. """Pydantic schemas used by agents that produce structured output.
The framework's primary artifact is still prose: each agent's natural-language The framework's primary artifact is still prose: each agent's natural-language
reasoning is what users read, what gets stored in the memory log, and what reasoning is what users read in the saved markdown reports and what the
gets saved as markdown reports. Structured output is layered onto agents downstream agents read as context. Structured output is layered onto the
whose results have downstream machine-readable consumers (currently only three decision-making agents (Research Manager, Trader, Portfolio Manager)
the Portfolio Manager) so that: so that:
- The rating is type-safe and never has to be regex-extracted - Their outputs follow consistent section headers across runs and providers
- Schema field descriptions become the model's output instructions
- Each provider's native structured-output mode is used (json_schema for - Each provider's native structured-output mode is used (json_schema for
OpenAI/xAI, response_schema for Gemini, tool-use for Anthropic) OpenAI/xAI, response_schema for Gemini, tool-use for Anthropic)
- Schema field descriptions become the model's output instructions, freeing
the prompt body to focus on context and the rating-scale guidance
- A render helper turns the parsed Pydantic instance back into the same - A render helper turns the parsed Pydantic instance back into the same
markdown shape the rest of the system already consumes, so display, markdown shape the rest of the system already consumes, so display,
memory log, and saved reports keep working unchanged memory log, and saved reports keep working unchanged
@@ -23,8 +24,13 @@ from typing import Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Shared rating types
# ---------------------------------------------------------------------------
class PortfolioRating(str, Enum): class PortfolioRating(str, Enum):
"""5-tier portfolio rating used by the Research Manager and Portfolio Manager.""" """5-tier rating used by the Research Manager and Portfolio Manager."""
BUY = "Buy" BUY = "Buy"
OVERWEIGHT = "Overweight" OVERWEIGHT = "Overweight"
@@ -33,6 +39,135 @@ class PortfolioRating(str, Enum):
SELL = "Sell" SELL = "Sell"
class TraderAction(str, Enum):
"""3-tier transaction direction used by the Trader.
The Trader's job is to translate the Research Manager's investment plan
into a concrete transaction proposal: should the desk execute a Buy, a
Sell, or sit on Hold this round. Position sizing and the nuanced
Overweight / Underweight calls happen later at the Portfolio Manager.
"""
BUY = "Buy"
HOLD = "Hold"
SELL = "Sell"
# ---------------------------------------------------------------------------
# Research Manager
# ---------------------------------------------------------------------------
class ResearchPlan(BaseModel):
"""Structured investment plan produced by the Research Manager.
Hand-off to the Trader: the recommendation pins the directional view,
the rationale captures which side of the bull/bear debate carried the
argument, and the strategic actions translate that into concrete
instructions the trader can execute against.
"""
recommendation: PortfolioRating = Field(
description=(
"The investment recommendation. Exactly one of Buy / Overweight / "
"Hold / Underweight / Sell. Reserve Hold for situations where the "
"evidence on both sides is genuinely balanced; otherwise commit to "
"the side with the stronger arguments."
),
)
rationale: str = Field(
description=(
"Conversational summary of the key points from both sides of the "
"debate, ending with which arguments led to the recommendation. "
"Speak naturally, as if to a teammate."
),
)
strategic_actions: str = Field(
description=(
"Concrete steps for the trader to implement the recommendation, "
"including position sizing guidance consistent with the rating."
),
)
def render_research_plan(plan: ResearchPlan) -> str:
"""Render a ResearchPlan to markdown for storage and the trader's prompt context."""
return "\n".join([
f"**Recommendation**: {plan.recommendation.value}",
"",
f"**Rationale**: {plan.rationale}",
"",
f"**Strategic Actions**: {plan.strategic_actions}",
])
# ---------------------------------------------------------------------------
# Trader
# ---------------------------------------------------------------------------
class TraderProposal(BaseModel):
"""Structured transaction proposal produced by the Trader.
The trader reads the Research Manager's investment plan and the analyst
reports, then turns them into a concrete transaction: what action to
take, the reasoning that justifies it, and the practical levels for
entry, stop-loss, and sizing.
"""
action: TraderAction = Field(
description="The transaction direction. Exactly one of Buy / Hold / Sell.",
)
reasoning: str = Field(
description=(
"The case for this action, anchored in the analysts' reports and "
"the research plan. Two to four sentences."
),
)
entry_price: Optional[float] = Field(
default=None,
description="Optional entry price target in the instrument's quote currency.",
)
stop_loss: Optional[float] = Field(
default=None,
description="Optional stop-loss price in the instrument's quote currency.",
)
position_sizing: Optional[str] = Field(
default=None,
description="Optional sizing guidance, e.g. '5% of portfolio'.",
)
def render_trader_proposal(proposal: TraderProposal) -> str:
"""Render a TraderProposal to markdown.
The trailing ``FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**`` line is
preserved for backward compatibility with the analyst stop-signal text
and any external code that greps for it.
"""
parts = [
f"**Action**: {proposal.action.value}",
"",
f"**Reasoning**: {proposal.reasoning}",
]
if proposal.entry_price is not None:
parts.extend(["", f"**Entry Price**: {proposal.entry_price}"])
if proposal.stop_loss is not None:
parts.extend(["", f"**Stop Loss**: {proposal.stop_loss}"])
if proposal.position_sizing:
parts.extend(["", f"**Position Sizing**: {proposal.position_sizing}"])
parts.extend([
"",
f"FINAL TRANSACTION PROPOSAL: **{proposal.action.value.upper()}**",
])
return "\n".join(parts)
# ---------------------------------------------------------------------------
# Portfolio Manager
# ---------------------------------------------------------------------------
class PortfolioDecision(BaseModel): class PortfolioDecision(BaseModel):
"""Structured output produced by the Portfolio Manager. """Structured output produced by the Portfolio Manager.

View File

@@ -1,32 +1,60 @@
"""Trader: turns the Research Manager's investment plan into a concrete transaction proposal."""
from __future__ import annotations
import functools import functools
from langchain_core.messages import AIMessage
from tradingagents.agents.schemas import TraderProposal, render_trader_proposal
from tradingagents.agents.utils.agent_utils import build_instrument_context from tradingagents.agents.utils.agent_utils import build_instrument_context
from tradingagents.agents.utils.structured import (
bind_structured,
invoke_structured_or_freetext,
)
def create_trader(llm): def create_trader(llm):
structured_llm = bind_structured(llm, TraderProposal, "Trader")
def trader_node(state, name): def trader_node(state, name):
company_name = state["company_of_interest"] company_name = state["company_of_interest"]
instrument_context = build_instrument_context(company_name) instrument_context = build_instrument_context(company_name)
investment_plan = state["investment_plan"] investment_plan = state["investment_plan"]
context = {
"role": "user",
"content": f"Based on a comprehensive analysis by a team of analysts, here is an investment plan tailored for {company_name}. {instrument_context} This plan incorporates insights from current technical market trends, macroeconomic indicators, and social media sentiment. Use this plan as a foundation for evaluating your next trading decision.\n\nProposed Investment Plan: {investment_plan}\n\nLeverage these insights to make an informed and strategic decision.",
}
messages = [ messages = [
{ {
"role": "system", "role": "system",
"content": "You are a trading agent analyzing market data to make investment decisions. Based on your analysis, provide a specific recommendation to buy, sell, or hold. End with a firm decision and always conclude your response with 'FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**' to confirm your recommendation.", "content": (
"You are a trading agent analyzing market data to make investment decisions. "
"Based on your analysis, provide a specific recommendation to buy, sell, or hold. "
"Anchor your reasoning in the analysts' reports and the research plan."
),
},
{
"role": "user",
"content": (
f"Based on a comprehensive analysis by a team of analysts, here is an investment "
f"plan tailored for {company_name}. {instrument_context} This plan incorporates "
f"insights from current technical market trends, macroeconomic indicators, and "
f"social media sentiment. Use this plan as a foundation for evaluating your next "
f"trading decision.\n\nProposed Investment Plan: {investment_plan}\n\n"
f"Leverage these insights to make an informed and strategic decision."
),
}, },
context,
] ]
result = llm.invoke(messages) trader_plan = invoke_structured_or_freetext(
structured_llm,
llm,
messages,
render_trader_proposal,
"Trader",
)
return { return {
"messages": [result], "messages": [AIMessage(content=trader_plan)],
"trader_investment_plan": result.content, "trader_investment_plan": trader_plan,
"sender": name, "sender": name,
} }

View File

@@ -0,0 +1,73 @@
"""Shared helpers for invoking an agent with structured output and a graceful fallback.
The Portfolio Manager, Trader, and Research Manager all follow the same
canonical pattern:
1. At agent creation, wrap the LLM with ``with_structured_output(Schema)``
so the model returns a typed Pydantic instance. If the provider does
not support structured output (rare; mostly older Ollama models), the
wrap is skipped and the agent uses free-text generation instead.
2. At invocation, run the structured call and render the result back to
markdown. If the structured call itself fails for any reason
(malformed JSON from a weak model, transient provider issue), fall
back to a plain ``llm.invoke`` so the pipeline never blocks.
Centralising the pattern here keeps the agent factories small and ensures
all three agents log the same warnings when fallback fires.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Optional, TypeVar
from pydantic import BaseModel
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseModel)
def bind_structured(llm: Any, schema: type[T], agent_name: str) -> Optional[Any]:
"""Return ``llm.with_structured_output(schema)`` or ``None`` if unsupported.
Logs a warning when the binding fails so the user understands the agent
will use free-text generation for every call instead of one-shot fallback.
"""
try:
return llm.with_structured_output(schema)
except (NotImplementedError, AttributeError) as exc:
logger.warning(
"%s: provider does not support with_structured_output (%s); "
"falling back to free-text generation",
agent_name, exc,
)
return None
def invoke_structured_or_freetext(
structured_llm: Optional[Any],
plain_llm: Any,
prompt: Any,
render: Callable[[T], str],
agent_name: str,
) -> str:
"""Run the structured call and render to markdown; fall back to free-text on any failure.
``prompt`` is whatever the underlying LLM accepts (a string for chat
invocations, a list of message dicts for chat models that take that
shape). The same value is forwarded to the free-text path so the
fallback sees the same input the structured call did.
"""
if structured_llm is not None:
try:
result = structured_llm.invoke(prompt)
return render(result)
except Exception as exc:
logger.warning(
"%s: structured-output invocation failed (%s); retrying once as free text",
agent_name, exc,
)
response = plain_llm.invoke(prompt)
return response.content