Merge #487 — analyst execution planning and timing hooks

refactor(graph): add analyst execution planning and timing hooks
This commit is contained in:
Yijia Xiao
2026-05-17 00:01:46 -07:00
committed by GitHub
6 changed files with 271 additions and 57 deletions

View File

@@ -74,6 +74,7 @@ DEFAULT_CONFIG = _apply_env_overrides({
"max_debate_rounds": 1,
"max_risk_discuss_rounds": 1,
"max_recur_limit": 100,
"analyst_concurrency_limit": 1,
# News / data fetching parameters
# Increase for longer lookback strategies or to broaden macro coverage;
# decrease to reduce token usage in agent prompts.

View File

@@ -0,0 +1,136 @@
from dataclasses import dataclass
from time import monotonic
from typing import Dict, Iterable, List, Optional
@dataclass(frozen=True)
class AnalystNodeSpec:
key: str
agent_node: str
clear_node: str
tool_node: str
report_key: str
@dataclass(frozen=True)
class AnalystExecutionPlan:
specs: List[AnalystNodeSpec]
concurrency_limit: int
ANALYST_NODE_SPECS: Dict[str, AnalystNodeSpec] = {
"market": AnalystNodeSpec(
key="market",
agent_node="Market Analyst",
clear_node="Msg Clear Market",
tool_node="tools_market",
report_key="market_report",
),
"social": AnalystNodeSpec(
key="social",
agent_node="Social Analyst",
clear_node="Msg Clear Social",
tool_node="tools_social",
report_key="sentiment_report",
),
"news": AnalystNodeSpec(
key="news",
agent_node="News Analyst",
clear_node="Msg Clear News",
tool_node="tools_news",
report_key="news_report",
),
"fundamentals": AnalystNodeSpec(
key="fundamentals",
agent_node="Fundamentals Analyst",
clear_node="Msg Clear Fundamentals",
tool_node="tools_fundamentals",
report_key="fundamentals_report",
),
}
def build_analyst_execution_plan(
selected_analysts: Iterable[str],
concurrency_limit: int = 1,
) -> AnalystExecutionPlan:
if concurrency_limit < 1:
raise ValueError("analyst concurrency limit must be >= 1")
specs: List[AnalystNodeSpec] = []
for analyst_key in selected_analysts:
spec = ANALYST_NODE_SPECS.get(analyst_key)
if spec is None:
raise ValueError(f"unknown analyst key: {analyst_key}")
specs.append(spec)
if not specs:
raise ValueError("at least one analyst must be selected")
return AnalystExecutionPlan(specs=specs, concurrency_limit=concurrency_limit)
def get_initial_analyst_node(plan: AnalystExecutionPlan) -> str:
return plan.specs[0].agent_node
class AnalystWallTimeTracker:
def __init__(self, plan: AnalystExecutionPlan):
self.plan = plan
self._started_at: Dict[str, float] = {}
self._wall_times: Dict[str, float] = {}
def mark_started(self, analyst_key: str, started_at: Optional[float] = None) -> None:
if analyst_key not in ANALYST_NODE_SPECS:
raise ValueError(f"unknown analyst key: {analyst_key}")
self._started_at.setdefault(analyst_key, monotonic() if started_at is None else started_at)
def mark_completed(
self,
analyst_key: str,
completed_at: Optional[float] = None,
) -> None:
if analyst_key not in ANALYST_NODE_SPECS:
raise ValueError(f"unknown analyst key: {analyst_key}")
if analyst_key in self._wall_times:
return
started_at = self._started_at.get(analyst_key)
if started_at is None:
return
finished_at = monotonic() if completed_at is None else completed_at
self._wall_times[analyst_key] = max(0.0, finished_at - started_at)
def get_wall_times(self) -> Dict[str, float]:
return dict(self._wall_times)
def format_summary(self) -> str:
parts = []
for spec in self.plan.specs:
duration = self._wall_times.get(spec.key)
if duration is not None:
label = spec.agent_node.removesuffix(" Analyst")
parts.append(f"{label} {duration:.2f}s")
if not parts:
return "Analyst wall time: pending"
return "Analyst wall time: " + " | ".join(parts)
def sync_analyst_tracker_from_chunk(
tracker: AnalystWallTimeTracker,
chunk: Dict[str, str],
now: Optional[float] = None,
) -> None:
current_time = monotonic() if now is None else now
active_found = False
for spec in tracker.plan.specs:
has_report = bool(chunk.get(spec.report_key))
if has_report:
tracker.mark_started(spec.key, started_at=current_time)
tracker.mark_completed(spec.key, completed_at=current_time)
continue
if not active_found:
tracker.mark_started(spec.key, started_at=current_time)
active_found = True

View File

@@ -7,6 +7,7 @@ from langgraph.prebuilt import ToolNode
from tradingagents.agents import *
from tradingagents.agents.utils.agent_states import AgentState
from .analyst_execution import build_analyst_execution_plan
from .conditional_logic import ConditionalLogic
@@ -19,12 +20,14 @@ class GraphSetup:
deep_thinking_llm: Any,
tool_nodes: Dict[str, ToolNode],
conditional_logic: ConditionalLogic,
analyst_concurrency_limit: int = 1,
):
"""Initialize with required components."""
self.quick_thinking_llm = quick_thinking_llm
self.deep_thinking_llm = deep_thinking_llm
self.tool_nodes = tool_nodes
self.conditional_logic = conditional_logic
self.analyst_concurrency_limit = analyst_concurrency_limit
def setup_graph(
self, selected_analysts=["market", "social", "news", "fundamentals"]
@@ -38,45 +41,17 @@ class GraphSetup:
- "news": News analyst
- "fundamentals": Fundamentals analyst
"""
if len(selected_analysts) == 0:
raise ValueError("Trading Agents Graph Setup Error: no analysts selected!")
plan = build_analyst_execution_plan(
selected_analysts,
concurrency_limit=self.analyst_concurrency_limit,
)
# Create analyst nodes
analyst_nodes = {}
delete_nodes = {}
tool_nodes = {}
if "market" in selected_analysts:
analyst_nodes["market"] = create_market_analyst(
self.quick_thinking_llm
)
delete_nodes["market"] = create_msg_delete()
tool_nodes["market"] = self.tool_nodes["market"]
if "social" in selected_analysts:
# "social" selector key preserved for back-compat with existing
# user configs; the underlying agent has been renamed to
# sentiment_analyst (the old name advertised social-media data
# the agent never had access to — see issue #557).
analyst_nodes["social"] = create_sentiment_analyst(
self.quick_thinking_llm
)
delete_nodes["social"] = create_msg_delete()
tool_nodes["social"] = self.tool_nodes["social"]
if "news" in selected_analysts:
analyst_nodes["news"] = create_news_analyst(
self.quick_thinking_llm
)
delete_nodes["news"] = create_msg_delete()
tool_nodes["news"] = self.tool_nodes["news"]
if "fundamentals" in selected_analysts:
analyst_nodes["fundamentals"] = create_fundamentals_analyst(
self.quick_thinking_llm
)
delete_nodes["fundamentals"] = create_msg_delete()
tool_nodes["fundamentals"] = self.tool_nodes["fundamentals"]
analyst_factories = {
"market": lambda: create_market_analyst(self.quick_thinking_llm),
"social": lambda: create_sentiment_analyst(self.quick_thinking_llm),
"news": lambda: create_news_analyst(self.quick_thinking_llm),
"fundamentals": lambda: create_fundamentals_analyst(self.quick_thinking_llm),
}
# Create researcher and manager nodes
bull_researcher_node = create_bull_researcher(self.quick_thinking_llm)
@@ -94,12 +69,10 @@ class GraphSetup:
workflow = StateGraph(AgentState)
# Add analyst nodes to the graph
for analyst_type, node in analyst_nodes.items():
workflow.add_node(f"{analyst_type.capitalize()} Analyst", node)
workflow.add_node(
f"Msg Clear {analyst_type.capitalize()}", delete_nodes[analyst_type]
)
workflow.add_node(f"tools_{analyst_type}", tool_nodes[analyst_type])
for spec in plan.specs:
workflow.add_node(spec.agent_node, analyst_factories[spec.key]())
workflow.add_node(spec.clear_node, create_msg_delete())
workflow.add_node(spec.tool_node, self.tool_nodes[spec.key])
# Add other nodes
workflow.add_node("Bull Researcher", bull_researcher_node)
@@ -113,27 +86,25 @@ class GraphSetup:
# Define edges
# Start with the first analyst
first_analyst = selected_analysts[0]
workflow.add_edge(START, f"{first_analyst.capitalize()} Analyst")
workflow.add_edge(START, plan.specs[0].agent_node)
# Connect analysts in sequence
for i, analyst_type in enumerate(selected_analysts):
current_analyst = f"{analyst_type.capitalize()} Analyst"
current_tools = f"tools_{analyst_type}"
current_clear = f"Msg Clear {analyst_type.capitalize()}"
for i, spec in enumerate(plan.specs):
current_analyst = spec.agent_node
current_tools = spec.tool_node
current_clear = spec.clear_node
# Add conditional edges for current analyst
workflow.add_conditional_edges(
current_analyst,
getattr(self.conditional_logic, f"should_continue_{analyst_type}"),
getattr(self.conditional_logic, f"should_continue_{spec.key}"),
[current_tools, current_clear],
)
workflow.add_edge(current_tools, current_analyst)
# Connect to next analyst or to Bull Researcher if this is the last analyst
if i < len(selected_analysts) - 1:
next_analyst = f"{selected_analysts[i+1].capitalize()} Analyst"
workflow.add_edge(current_clear, next_analyst)
if i < len(plan.specs) - 1:
workflow.add_edge(current_clear, plan.specs[i + 1].agent_node)
else:
workflow.add_edge(current_clear, "Bull Researcher")

View File

@@ -114,6 +114,7 @@ class TradingAgentsGraph:
self.deep_thinking_llm,
self.tool_nodes,
self.conditional_logic,
analyst_concurrency_limit=self.config.get("analyst_concurrency_limit", 1),
)
self.propagator = Propagator(