feat: structured-output Portfolio Manager + 5-tier rating consistency (#434)

Three related changes that take the rating pipeline from heuristic-only
to type-safe at the source.

1) Research Manager prompt now uses the same 5-tier scale (Buy /
   Overweight / Hold / Underweight / Sell) as the Portfolio Manager,
   signal_processing, and the memory log.  The prior 3-tier wording
   (Buy / Sell / Hold) was the only remaining inconsistency in the
   pipeline.

2) Centralise the 5-tier vocabulary and the heuristic prose-rating
   parser into tradingagents/agents/utils/rating.py.  Both the memory
   log and the signal processor now share the same parser instead of
   duplicating regex and word-walker logic.

3) Make structured output a first-class part of the Portfolio Manager's
   primary call.  The PM uses llm.with_structured_output(PortfolioDecision)
   so each provider's native structured-output mode (json_schema for
   OpenAI/xAI, response_schema for Gemini, tool-use for Anthropic,
   function_calling for OpenAI-compatible providers) yields a typed
   Pydantic instance directly.  A render helper turns that instance back
   into the same markdown shape downstream consumers (memory log, CLI
   display, saved reports) already expect, so no other code has to know
   the PM now produces structured output.  Providers without structured
   support fall back gracefully to free-text + the deterministic
   heuristic.

   The previous SignalProcessor had been making a second LLM call to
   re-extract the rating from the PM's prose; that round-trip is now
   eliminated.  SignalProcessor is a thin adapter over parse_rating(),
   makes zero LLM calls, and stays for backwards compatibility with
   process_signal() callers.

Schema (PortfolioDecision) captures rating + executive_summary +
investment_thesis + optional price_target + time_horizon, with field
descriptions doubling as output instructions.  Agent prose remains the
primary artifact; structured output is layered onto the PM only because
it is the one agent whose output has machine-readable downstream
consumers.

15 new tests cover the heuristic parser (markdown-bold edge cases that
had no coverage before), the structured PM happy path, the free-text
fallback path, and that SignalProcessor never invokes the LLM.  Full
suite: 77 tests pass in ~2s without API keys.
This commit is contained in:
Yijia-Xiao
2026-04-25 19:57:26 +00:00
parent 4cbd4b086f
commit 0fda24515f
8 changed files with 399 additions and 87 deletions

View File

@@ -5,6 +5,7 @@ import pandas as pd
from unittest.mock import MagicMock, patch
from tradingagents.agents.utils.memory import TradingMemoryLog
from tradingagents.agents.schemas import PortfolioDecision, PortfolioRating
from tradingagents.graph.reflection import Reflector
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.graph.propagation import Propagator
@@ -82,6 +83,25 @@ def _make_pm_state(past_context=""):
}
def _structured_pm_llm(captured: dict, decision: PortfolioDecision | None = None):
"""Build a MagicMock LLM whose with_structured_output binding captures the
prompt and returns a real PortfolioDecision (so render_pm_decision works).
"""
if decision is None:
decision = PortfolioDecision(
rating=PortfolioRating.HOLD,
executive_summary="Hold the position; await catalyst.",
investment_thesis="Balanced view; neither side carried the debate.",
)
structured = MagicMock()
structured.invoke.side_effect = lambda prompt: (
captured.__setitem__("prompt", prompt) or decision
)
llm = MagicMock()
llm.with_structured_output.return_value = structured
return llm
# ---------------------------------------------------------------------------
# Core: storage and read path
# ---------------------------------------------------------------------------
@@ -518,29 +538,55 @@ class TestPortfolioManagerInjection:
def test_pm_prompt_includes_past_context(self):
captured = {}
mock_llm = MagicMock()
mock_llm.invoke.side_effect = lambda prompt: (
captured.__setitem__("prompt", prompt) or MagicMock(content="Rating: Hold\nHold.")
)
pm_node = create_portfolio_manager(mock_llm)
llm = _structured_pm_llm(captured)
pm_node = create_portfolio_manager(llm)
state = _make_pm_state(past_context="[2026-01-05 | NVDA | Buy | +5.0% | +2.0% | 5d]\nGreat call.")
pm_node(state)
assert "Lessons from prior decisions and outcomes" in captured["prompt"]
assert "Great call." in captured["prompt"]
assert "and the lessons from prior decisions" in captured["prompt"]
def test_pm_no_past_context_no_section(self):
"""PM prompt omits the lessons section entirely when past_context is empty."""
captured = {}
mock_llm = MagicMock()
mock_llm.invoke.side_effect = lambda prompt: (
captured.__setitem__("prompt", prompt) or MagicMock(content="Rating: Hold\nHold.")
)
pm_node = create_portfolio_manager(mock_llm)
llm = _structured_pm_llm(captured)
pm_node = create_portfolio_manager(llm)
state = _make_pm_state(past_context="")
pm_node(state)
assert "Lessons from prior decisions" not in captured["prompt"]
assert "and the lessons from prior decisions" not in captured["prompt"]
def test_pm_returns_rendered_markdown_with_rating(self):
"""The structured PortfolioDecision is rendered to markdown that
downstream consumers (memory log, signal processor, CLI display)
can parse without any extra LLM call."""
captured = {}
decision = PortfolioDecision(
rating=PortfolioRating.OVERWEIGHT,
executive_summary="Build position gradually over the next two weeks.",
investment_thesis="AI capex cycle remains intact; institutional flows constructive.",
price_target=215.0,
time_horizon="3-6 months",
)
llm = _structured_pm_llm(captured, decision)
pm_node = create_portfolio_manager(llm)
result = pm_node(_make_pm_state())
md = result["final_trade_decision"]
assert "**Rating**: Overweight" in md
assert "**Executive Summary**: Build position gradually" in md
assert "**Investment Thesis**: AI capex cycle" in md
assert "**Price Target**: 215.0" in md
assert "**Time Horizon**: 3-6 months" in md
def test_pm_falls_back_to_freetext_when_structured_unavailable(self):
"""If a provider does not support with_structured_output, the agent
falls back to a plain invoke and returns whatever prose the model
produced, so the pipeline never blocks."""
plain_response = "**Rating**: Sell\n\nExit ahead of guidance."
llm = MagicMock()
llm.with_structured_output.side_effect = NotImplementedError("provider unsupported")
llm.invoke.return_value = MagicMock(content=plain_response)
pm_node = create_portfolio_manager(llm)
result = pm_node(_make_pm_state())
assert result["final_trade_decision"] == plain_response
# get_past_context ordering and limits

View File

@@ -0,0 +1,90 @@
"""Tests for the shared rating heuristic and the SignalProcessor adapter.
The Portfolio Manager produces a typed PortfolioDecision via structured
output and renders it to markdown that always contains a ``**Rating**: X``
header. The deterministic heuristic in ``tradingagents.agents.utils.rating``
is therefore sufficient to extract the rating downstream — no second LLM
call is needed — and SignalProcessor is now a thin adapter that delegates
to it.
"""
import pytest
from tradingagents.agents.utils.rating import RATINGS_5_TIER, parse_rating
from tradingagents.graph.signal_processing import SignalProcessor
# ---------------------------------------------------------------------------
# Heuristic parser
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestParseRating:
def test_explicit_label_buy(self):
assert parse_rating("Rating: Buy\nReasoning here.") == "Buy"
def test_explicit_label_overweight(self):
assert parse_rating("Rating: Overweight\nDetails.") == "Overweight"
def test_explicit_label_with_markdown_bold_value(self):
# Regression: Rating: **Sell** — markdown around the value.
assert parse_rating("Rating: **Sell**\nExit immediately.") == "Sell"
def test_explicit_label_with_markdown_bold_label(self):
assert parse_rating("**Rating**: Underweight\nTrim exposure.") == "Underweight"
def test_rendered_pm_markdown_shape(self):
# The exact shape produced by render_pm_decision must always parse.
text = (
"**Rating**: Buy\n\n"
"**Executive Summary**: Enter at $189-192, 6% portfolio cap.\n\n"
"**Investment Thesis**: AI capex cycle intact; institutional flows constructive."
)
assert parse_rating(text) == "Buy"
def test_explicit_label_wins_over_prose_with_markdown(self):
text = (
"The buy thesis is weakened by guidance.\n"
"Rating: **Sell**\n"
"Exit before earnings."
)
assert parse_rating(text) == "Sell"
def test_no_rating_returns_default(self):
assert parse_rating("No clear directional signal at this time.") == "Hold"
def test_no_rating_custom_default(self):
assert parse_rating("Plain prose.", default="Underweight") == "Underweight"
def test_all_five_tiers_recognised(self):
for r in RATINGS_5_TIER:
assert parse_rating(f"Rating: {r}") == r
# ---------------------------------------------------------------------------
# SignalProcessor: thin adapter over the heuristic
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestSignalProcessor:
def test_returns_rating_from_pm_markdown(self):
sp = SignalProcessor()
md = "**Rating**: Overweight\n\n**Executive Summary**: Build gradually."
assert sp.process_signal(md) == "Overweight"
def test_makes_no_llm_calls(self):
"""SignalProcessor must not invoke the LLM it was constructed with —
the rating is parseable from the rendered PM markdown directly."""
from unittest.mock import MagicMock
llm = MagicMock()
sp = SignalProcessor(llm)
sp.process_signal("Rating: Buy\nDetails.")
llm.invoke.assert_not_called()
llm.with_structured_output.assert_not_called()
def test_default_when_no_rating_present(self):
sp = SignalProcessor()
assert sp.process_signal("Plain prose without a recommendation.") == "Hold"

View File

@@ -1,29 +1,53 @@
from tradingagents.agents.utils.agent_utils import build_instrument_context, get_language_instruction
"""Portfolio Manager: synthesises the risk-analyst debate into the final decision.
Uses LangChain's ``with_structured_output`` so the LLM produces a typed
``PortfolioDecision`` directly, in a single call. The result is rendered
back to markdown for storage in ``final_trade_decision`` so memory log,
CLI display, and saved reports continue to consume the same shape they do
today. When a provider does not expose structured output, the agent falls
back to a free-text invocation and the existing heuristic rating parser.
"""
from __future__ import annotations
import logging
from tradingagents.agents.schemas import PortfolioDecision, render_pm_decision
from tradingagents.agents.utils.agent_utils import (
build_instrument_context,
get_language_instruction,
)
logger = logging.getLogger(__name__)
def create_portfolio_manager(llm):
def portfolio_manager_node(state) -> dict:
# Wrap once at agent construction; if the provider does not support
# structured output we keep ``structured_llm`` as None and use the
# free-text fallback for every call.
try:
structured_llm = llm.with_structured_output(PortfolioDecision)
except (NotImplementedError, AttributeError) as exc:
logger.warning(
"Portfolio Manager: provider does not support with_structured_output (%s); "
"falling back to free-text generation",
exc,
)
structured_llm = None
def portfolio_manager_node(state) -> dict:
instrument_context = build_instrument_context(state["company_of_interest"])
history = state["risk_debate_state"]["history"]
risk_debate_state = state["risk_debate_state"]
market_research_report = state["market_report"]
news_report = state["news_report"]
fundamentals_report = state["fundamentals_report"]
sentiment_report = state["sentiment_report"]
research_plan = state["investment_plan"]
trader_plan = state["trader_investment_plan"]
past_context = state.get("past_context", "")
lessons_line = (
f"- Lessons from prior decisions and outcomes:\n{past_context}\n"
if past_context else ""
)
thesis_instruction = (
"3. **Investment Thesis**: Detailed reasoning anchored in the analysts' debate and the lessons from prior decisions."
if past_context
else "3. **Investment Thesis**: Detailed reasoning anchored in the analysts' debate."
else ""
)
prompt = f"""As the Portfolio Manager, synthesize the risk analysts' debate and deliver the final trading decision.
@@ -43,14 +67,6 @@ def create_portfolio_manager(llm):
- Research Manager's investment plan: **{research_plan}**
- Trader's transaction proposal: **{trader_plan}**
{lessons_line}
**Required Output Structure:**
1. **Rating**: State one of Buy / Overweight / Hold / Underweight / Sell.
2. **Executive Summary**: A concise action plan covering entry strategy, position sizing, key risk levels, and time horizon.
{thesis_instruction}
---
**Risk Analysts Debate History:**
{history}
@@ -58,10 +74,10 @@ def create_portfolio_manager(llm):
Be decisive and ground every conclusion in specific evidence from the analysts.{get_language_instruction()}"""
response = llm.invoke(prompt)
final_trade_decision = _invoke_pm(structured_llm, llm, prompt)
new_risk_debate_state = {
"judge_decision": response.content,
"judge_decision": final_trade_decision,
"history": risk_debate_state["history"],
"aggressive_history": risk_debate_state["aggressive_history"],
"conservative_history": risk_debate_state["conservative_history"],
@@ -75,7 +91,30 @@ Be decisive and ground every conclusion in specific evidence from the analysts.{
return {
"risk_debate_state": new_risk_debate_state,
"final_trade_decision": response.content,
"final_trade_decision": final_trade_decision,
}
return portfolio_manager_node
def _invoke_pm(structured_llm, plain_llm, prompt: str) -> str:
"""Run the PM call and return the markdown-rendered decision.
Tries the structured-output path first; if it fails for any reason
(provider does not support it, model returns malformed JSON, network
glitch on the structured endpoint), falls back to the plain free-text
invocation so the pipeline still produces a result.
"""
if structured_llm is not None:
try:
decision = structured_llm.invoke(prompt)
return render_pm_decision(decision)
except Exception as exc:
logger.warning(
"Portfolio Manager: structured-output invocation failed (%s); "
"retrying once as free text",
exc,
)
response = plain_llm.invoke(prompt)
return response.content

View File

@@ -9,21 +9,31 @@ def create_research_manager(llm):
investment_debate_state = state["investment_debate_state"]
prompt = f"""As the portfolio manager and debate facilitator, your role is to critically evaluate this round of debate and make a definitive decision: align with the bear analyst, the bull analyst, or choose Hold only if it is strongly justified based on the arguments presented.
Summarize the key points from both sides concisely, focusing on the most compelling evidence or reasoning. Your recommendation—Buy, Sell, or Hold—must be clear and actionable. Avoid defaulting to Hold simply because both sides have valid points; commit to a stance grounded in the debate's strongest arguments.
Additionally, develop a detailed investment plan for the trader. This should include:
Your Recommendation: A decisive stance supported by the most convincing arguments.
Rationale: An explanation of why these arguments lead to your conclusion.
Strategic Actions: Concrete steps for implementing the recommendation.
Present your analysis conversationally, as if speaking naturally, without special formatting.
prompt = f"""As the Research Manager and debate facilitator, your role is to critically evaluate this round of debate and deliver a clear, actionable investment plan for the trader.
{instrument_context}
Here is the debate:
Debate History:
---
**Rating Scale** (use exactly one):
- **Buy**: Strong conviction in the bull thesis; recommend taking or growing the position
- **Overweight**: Constructive view; recommend gradually increasing exposure
- **Hold**: Balanced view; recommend maintaining the current position
- **Underweight**: Cautious view; recommend trimming exposure
- **Sell**: Strong conviction in the bear thesis; recommend exiting or avoiding the position
Commit to a clear stance whenever the debate's strongest arguments warrant one; reserve Hold for situations where the evidence on both sides is genuinely balanced.
**Required Output Structure:**
1. **Recommendation**: State one of Buy / Overweight / Hold / Underweight / Sell.
2. **Rationale**: Summarise the key points from both sides and explain which arguments led to this recommendation.
3. **Strategic Actions**: Concrete steps for the trader to implement the recommendation, including position sizing guidance consistent with the rating.
Present your analysis conversationally, as if speaking naturally to a teammate.
---
**Debate History:**
{history}"""
response = llm.invoke(prompt)

View File

@@ -0,0 +1,93 @@
"""Pydantic schemas used by agents that produce structured output.
The framework's primary artifact is still prose: each agent's natural-language
reasoning is what users read, what gets stored in the memory log, and what
gets saved as markdown reports. Structured output is layered onto agents
whose results have downstream machine-readable consumers (currently only
the Portfolio Manager) so that:
- The rating is type-safe and never has to be regex-extracted
- Schema field descriptions become the model's output instructions
- Each provider's native structured-output mode is used (json_schema for
OpenAI/xAI, response_schema for Gemini, tool-use for Anthropic)
- A render helper turns the parsed Pydantic instance back into the same
markdown shape the rest of the system already consumes, so display,
memory log, and saved reports keep working unchanged
"""
from __future__ import annotations
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class PortfolioRating(str, Enum):
"""5-tier portfolio rating used by the Research Manager and Portfolio Manager."""
BUY = "Buy"
OVERWEIGHT = "Overweight"
HOLD = "Hold"
UNDERWEIGHT = "Underweight"
SELL = "Sell"
class PortfolioDecision(BaseModel):
"""Structured output produced by the Portfolio Manager.
The model fills every field as part of its primary LLM call; no separate
extraction pass is required. Field descriptions double as the model's
output instructions, so the prompt body only needs to convey context and
the rating-scale guidance.
"""
rating: PortfolioRating = Field(
description=(
"The final position rating. Exactly one of Buy / Overweight / Hold / "
"Underweight / Sell, picked based on the analysts' debate."
),
)
executive_summary: str = Field(
description=(
"A concise action plan covering entry strategy, position sizing, "
"key risk levels, and time horizon. Two to four sentences."
),
)
investment_thesis: str = Field(
description=(
"Detailed reasoning anchored in specific evidence from the analysts' "
"debate. If prior lessons are referenced in the prompt context, "
"incorporate them; otherwise rely solely on the current analysis."
),
)
price_target: Optional[float] = Field(
default=None,
description="Optional target price in the instrument's quote currency.",
)
time_horizon: Optional[str] = Field(
default=None,
description="Optional recommended holding period, e.g. '3-6 months'.",
)
def render_pm_decision(decision: PortfolioDecision) -> str:
"""Render a PortfolioDecision back to the markdown shape the rest of the system expects.
Memory log, CLI display, and saved report files all read this markdown,
so the rendered output preserves the exact section headers (``**Rating**``,
``**Executive Summary**``, ``**Investment Thesis**``) that downstream
parsers and the report writers already handle.
"""
parts = [
f"**Rating**: {decision.rating.value}",
"",
f"**Executive Summary**: {decision.executive_summary}",
"",
f"**Investment Thesis**: {decision.investment_thesis}",
]
if decision.price_target is not None:
parts.extend(["", f"**Price Target**: {decision.price_target}"])
if decision.time_horizon:
parts.extend(["", f"**Time Horizon**: {decision.time_horizon}"])
return "\n".join(parts)

View File

@@ -4,17 +4,17 @@ from typing import List, Optional
from pathlib import Path
import re
from tradingagents.agents.utils.rating import parse_rating
class TradingMemoryLog:
"""Append-only markdown log of trading decisions and reflections."""
RATINGS = {"buy", "overweight", "hold", "underweight", "sell"}
# HTML comment: cannot appear in LLM prose output, safe as a hard delimiter
_SEPARATOR = "\n\n<!-- ENTRY_END -->\n\n"
# Precompiled patterns — avoids re-compilation on every load_entries() call
_DECISION_RE = re.compile(r"DECISION:\n(.*?)(?=\nREFLECTION:|\Z)", re.DOTALL)
_REFLECTION_RE = re.compile(r"REFLECTION:\n(.*?)$", re.DOTALL)
_RATING_LABEL_RE = re.compile(r"rating.*?[:\-][\s*]*(\w+)", re.IGNORECASE)
def __init__(self, config: dict = None):
self._log_path = None
@@ -40,7 +40,7 @@ class TradingMemoryLog:
for line in raw.splitlines():
if line.startswith(f"[{trade_date} | {ticker} |") and line.endswith("| pending]"):
return
rating = self._parse_rating(final_trade_decision)
rating = parse_rating(final_trade_decision)
tag = f"[{trade_date} | {ticker} | {rating} | pending]"
entry = f"{tag}\n\nDECISION:\n{final_trade_decision}{self._SEPARATOR}"
with open(self._log_path, "a", encoding="utf-8") as f:
@@ -213,20 +213,6 @@ class TradingMemoryLog:
# --- Helpers ---
def _parse_rating(self, text: str) -> str:
# First pass: explicit "Rating: X" label — search handles markdown bold/numbered lists
for line in text.splitlines():
m = self._RATING_LABEL_RE.search(line)
if m and m.group(1).lower() in self.RATINGS:
return m.group(1).capitalize()
# Fallback: first rating word found anywhere in the text
for line in text.splitlines():
for word in line.lower().split():
clean = word.strip("*:.,")
if clean in self.RATINGS:
return clean.capitalize()
return "Hold"
def _parse_entry(self, raw: str) -> Optional[dict]:
lines = raw.strip().splitlines()
if not lines:

View File

@@ -0,0 +1,50 @@
"""Shared 5-tier rating vocabulary and a deterministic heuristic parser.
The same five-tier scale (Buy, Overweight, Hold, Underweight, Sell) is used by:
- The Research Manager (investment plan recommendation)
- The Portfolio Manager (final position decision)
- The signal processor (rating extracted for downstream consumers)
- The memory log (rating tag stored alongside each decision entry)
Centralising it here avoids drift between those call sites.
"""
from __future__ import annotations
import re
from typing import Tuple
# Canonical, ordered 5-tier scale (most bullish to most bearish).
RATINGS_5_TIER: Tuple[str, ...] = (
"Buy", "Overweight", "Hold", "Underweight", "Sell",
)
_RATING_SET = {r.lower() for r in RATINGS_5_TIER}
# Matches "Rating: X" / "rating - X" / "Rating: **X**" — tolerates markdown
# bold wrappers and either a colon or hyphen separator.
_RATING_LABEL_RE = re.compile(r"rating.*?[:\-][\s*]*(\w+)", re.IGNORECASE)
def parse_rating(text: str, default: str = "Hold") -> str:
"""Heuristically extract a 5-tier rating from prose text.
Two-pass strategy:
1. Look for an explicit "Rating: X" label (tolerant of markdown bold).
2. Fall back to the first 5-tier rating word found anywhere in the text.
Returns a Title-cased rating string, or ``default`` if no rating word appears.
"""
for line in text.splitlines():
m = _RATING_LABEL_RE.search(line)
if m and m.group(1).lower() in _RATING_SET:
return m.group(1).capitalize()
for line in text.splitlines():
for word in line.lower().split():
clean = word.strip("*:.,")
if clean in _RATING_SET:
return clean.capitalize()
return default

View File

@@ -1,33 +1,31 @@
# TradingAgents/graph/signal_processing.py
"""Extract the 5-tier portfolio rating from the Portfolio Manager's decision.
The Portfolio Manager produces a typed ``PortfolioDecision`` via structured
output and renders it to markdown that always carries a ``**Rating**: X``
header (see :func:`tradingagents.agents.schemas.render_pm_decision`). The
deterministic heuristic in :mod:`tradingagents.agents.utils.rating` is more
than sufficient to extract that rating; no extra LLM call is needed.
This module exists for backwards compatibility with callers that expect a
``SignalProcessor.process_signal(text)`` interface.
"""
from __future__ import annotations
from typing import Any
from tradingagents.agents.utils.rating import parse_rating
class SignalProcessor:
"""Processes trading signals to extract actionable decisions."""
"""Read the 5-tier rating out of a Portfolio Manager decision."""
def __init__(self, quick_thinking_llm: Any):
"""Initialize with an LLM for processing."""
def __init__(self, quick_thinking_llm: Any = None):
# The LLM argument is accepted for backwards compatibility but no
# longer used: the PM's structured output guarantees the rating is
# parseable from the rendered markdown without a second LLM call.
self.quick_thinking_llm = quick_thinking_llm
def process_signal(self, full_signal: str) -> str:
"""
Process a full trading signal to extract the core decision.
Args:
full_signal: Complete trading signal text
Returns:
Extracted rating (BUY, OVERWEIGHT, HOLD, UNDERWEIGHT, or SELL)
"""
messages = [
(
"system",
"You are an efficient assistant that extracts the trading decision from analyst reports. "
"Extract the rating as exactly one of: BUY, OVERWEIGHT, HOLD, UNDERWEIGHT, SELL. "
"Output only the single rating word, nothing else.",
),
("human", full_signal),
]
return self.quick_thinking_llm.invoke(messages).content
"""Return one of Buy / Overweight / Hold / Underweight / Sell."""
return parse_rating(full_signal)