From eeb84aa63b6f39b332f5e1eb03f8dd2cf6c9ff78 Mon Sep 17 00:00:00 2001 From: Yijia-Xiao Date: Sun, 14 Jun 2026 07:23:19 +0000 Subject: [PATCH] fix(reddit): go RSS-first with 429 backoff and robust transport errors The JSON search endpoint is reliably WAF-blocked (403) for public clients, so probing it on every call doubled request volume against Reddit's per-IP rate limit and tripped 429 on the RSS fallback, blanking the sentiment feed. Fetch the Atom/RSS feed directly (JSON kept as an opt-in path that still degrades to RSS on 403), back off once on a 429 honouring Retry-After, and pace requests a little wider. Also broaden the error handling to catch http.client chunked transfer errors (IncompleteRead/BadStatusLine) alongside OSError, which on their own slipped through and crashed the pipeline. --- tests/test_reddit_fallback.py | 118 +++++++++++++++++++++++++----- tradingagents/dataflows/reddit.py | 96 ++++++++++++++++++------ 2 files changed, 171 insertions(+), 43 deletions(-) diff --git a/tests/test_reddit_fallback.py b/tests/test_reddit_fallback.py index ecb944f71..7fd98a90c 100644 --- a/tests/test_reddit_fallback.py +++ b/tests/test_reddit_fallback.py @@ -1,7 +1,9 @@ -"""Tests for the Reddit RSS/Atom fallback when the JSON endpoint 403s (#862).""" +"""Tests for the RSS-first Reddit fetcher, its 429 backoff, the opt-in JSON +path's degradation (#862), and chunked-transfer error handling (#1024).""" from __future__ import annotations +import http.client from unittest.mock import patch from urllib.error import HTTPError @@ -9,7 +11,6 @@ import pytest from tradingagents.dataflows import reddit - _SAMPLE_ATOM = """ @@ -26,6 +27,30 @@ _SAMPLE_ATOM = """ """ +def _resp(read_fn): + """A minimal context-manager response whose read() runs ``read_fn``.""" + class _Resp: + def __enter__(self_inner): + return self_inner + + def __exit__(self_inner, *a): + return False + + def read(self_inner): + return read_fn() + return _Resp() + + +def _atom_resp(): + return _resp(lambda: _SAMPLE_ATOM.encode("utf-8")) + + +def _raise(exc): + def _r(): + raise exc + return _resp(_r) + + @pytest.mark.unit class TestIsoToTimestamp: def test_parses_offset_and_z(self): @@ -48,19 +73,9 @@ class TestStripHtml: @pytest.mark.unit -class TestRssFallbackParsing: - def _patch_rss_response(self, xml_bytes): - class _Resp: - def __enter__(self_inner): - return self_inner - def __exit__(self_inner, *a): - return False - def read(self_inner): - return xml_bytes - return patch.object(reddit, "urlopen", return_value=_Resp()) - +class TestRssParsing: def test_parses_atom_entries(self): - with self._patch_rss_response(_SAMPLE_ATOM.encode("utf-8")): + with patch.object(reddit, "urlopen", return_value=_atom_resp()): posts = reddit._fetch_subreddit_rss("NVDA", "stocks", limit=5, timeout=5.0) assert len(posts) == 2 assert posts[0]["title"] == "NVDA earnings beat, stock pops" @@ -71,21 +86,84 @@ class TestRssFallbackParsing: assert "datacenter unit" in posts[0]["selftext"] def test_malformed_xml_fails_open(self): - with self._patch_rss_response(b"<>"): + with patch.object(reddit, "urlopen", return_value=_resp(lambda: b"<>")): assert reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) == [] @pytest.mark.unit -class TestJsonFallsBackToRss: - def test_403_triggers_rss(self): - err = HTTPError("url", 403, "Blocked", {}, None) - with patch.object(reddit, "urlopen", side_effect=err), \ - patch.object(reddit, "_fetch_subreddit_rss", return_value=[{"title": "x", "source": "rss", "score": None, "num_comments": None, "created_utc": None, "selftext": ""}]) as rss: +class TestFetchSubredditIsRssFirst: + """The default per-subreddit fetch goes straight to RSS — it must not hit + the WAF-blocked JSON endpoint, which only burned rate-limit budget.""" + + def test_delegates_to_rss_without_touching_json(self): + sentinel = [{"title": "x", "source": "rss", "score": None, + "num_comments": None, "created_utc": None, "selftext": ""}] + with patch.object(reddit, "_fetch_subreddit_rss", return_value=sentinel) as rss, \ + patch.object(reddit, "urlopen", + side_effect=AssertionError("JSON endpoint must not be called")): out = reddit._fetch_subreddit("NVDA", "stocks", 5, 5.0) rss.assert_called_once() + assert out is sentinel + + +@pytest.mark.unit +class TestJsonPathFallsBackToRss: + """The opt-in JSON path still degrades to RSS on a 403 (kept for #862).""" + + def test_403_triggers_rss(self): + err = HTTPError("url", 403, "Blocked", {}, None) + rss_posts = [{"title": "x", "source": "rss", "score": None, + "num_comments": None, "created_utc": None, "selftext": ""}] + with patch.object(reddit, "urlopen", side_effect=err), \ + patch.object(reddit, "_fetch_subreddit_rss", return_value=rss_posts) as rss: + out = reddit._fetch_subreddit_json("NVDA", "stocks", 5, 5.0) + rss.assert_called_once() assert out and out[0]["source"] == "rss" +@pytest.mark.unit +class TestRss429Backoff: + def test_429_then_success_retries_once(self): + err = HTTPError("url", 429, "Too Many Requests", {}, None) + with patch.object(reddit, "urlopen", side_effect=[err, _atom_resp()]) as op, \ + patch.object(reddit.time, "sleep") as slept: + posts = reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) + assert op.call_count == 2 # original + exactly one retry + slept.assert_called_once() # backed off before retrying + assert len(posts) == 2 + + def test_429_twice_gives_up_after_one_retry(self): + err = HTTPError("url", 429, "Too Many Requests", {}, None) + with patch.object(reddit, "urlopen", side_effect=[err, err]) as op, \ + patch.object(reddit.time, "sleep"): + posts = reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) + assert op.call_count == 2 # one retry, then gives up cleanly + assert posts == [] + + def test_retry_after_header_is_honoured(self): + err = HTTPError("url", 429, "Too Many Requests", {"Retry-After": "12"}, None) + with patch.object(reddit, "urlopen", side_effect=[err, _atom_resp()]), \ + patch.object(reddit.time, "sleep") as slept: + reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) + slept.assert_called_once_with(12.0) + + +@pytest.mark.unit +class TestChunkedTransferErrorsHandled: + """IncompleteRead/RemoteDisconnected come from http.client and are NOT + OSErrors, so they were previously uncaught and crashed the pipeline (#1024).""" + + def test_rss_incomplete_read_degrades_to_empty(self): + with patch.object(reddit, "urlopen", return_value=_raise(http.client.IncompleteRead(b""))): + assert reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) == [] + + def test_json_incomplete_read_falls_back_to_rss(self): + with patch.object(reddit, "urlopen", return_value=_raise(http.client.IncompleteRead(b""))), \ + patch.object(reddit, "_fetch_subreddit_rss", return_value=[]) as rss: + reddit._fetch_subreddit_json("NVDA", "stocks", 5, 5.0) + rss.assert_called_once() + + @pytest.mark.unit class TestFormatterHandlesRssPosts: def test_rss_posts_omit_fake_counts_and_note_source(self): diff --git a/tradingagents/dataflows/reddit.py b/tradingagents/dataflows/reddit.py index 82303d9b4..bfb87700b 100644 --- a/tradingagents/dataflows/reddit.py +++ b/tradingagents/dataflows/reddit.py @@ -1,31 +1,32 @@ """Reddit search fetcher for ticker-specific discussion posts. -Primary path is Reddit's public JSON search endpoint -(``reddit.com/r/{sub}/search.json``), which carries the richest data -(score, comment count, body). Reddit's WAF increasingly returns -``HTTP 403 Blocked`` on that endpoint (issue #862), so when the JSON request -fails we transparently fall back to the public Atom/RSS search feed -(``/search.rss``). The RSS feed is gated less aggressively and serves the -same descriptive User-Agent we already send; the fallback lacks score / -comment counts, so RSS-sourced posts are marked and the formatter omits those -metrics rather than printing fake zeros. +Default path is Reddit's public Atom/RSS search feed +(``reddit.com/r/{sub}/search.rss``). The richer JSON search endpoint +(``/search.json``) is reliably WAF-blocked (``HTTP 403``) for public clients +(issue #862), and probing it on every call only doubled our request volume +against Reddit's per-IP rate limit — tripping ``429`` on the RSS fallback — so +it is kept (``_fetch_subreddit_json``) but not used by default. On a 429 we back +off once (honouring ``Retry-After``). RSS lacks score / comment counts, so those +posts are marked and the formatter omits the metrics rather than printing fake +zeros. -No API key required either way. Returns formatted plaintext blocks ready for -prompt injection and degrades gracefully — returns a placeholder string -rather than raising, so callers never special-case missing data. +No API key required. Returns formatted plaintext blocks ready for prompt +injection and degrades gracefully — returns a placeholder string rather than +raising, so callers never special-case missing data. """ from __future__ import annotations import html +import http.client import json import logging import re import time import xml.etree.ElementTree as ET +from collections.abc import Iterable from datetime import datetime -from typing import Iterable, Optional -from urllib.error import HTTPError, URLError +from urllib.error import HTTPError from urllib.parse import urlencode from urllib.request import Request, urlopen @@ -56,7 +57,7 @@ def _search_qs(ticker: str, limit: int) -> str: }) -def _iso_to_timestamp(iso_str: Optional[str]) -> Optional[float]: +def _iso_to_timestamp(iso_str: str | None) -> float | None: """Parse an Atom ``published`` timestamp to a UTC epoch, or None.""" if not iso_str: return None @@ -78,23 +79,48 @@ def _strip_html(content: str) -> str: return " ".join(html.unescape(text).split()) +def _retry_after_seconds(exc: HTTPError) -> float | None: + """Seconds to wait from a 429's ``Retry-After`` header, capped at 30s.""" + try: + val = exc.headers.get("Retry-After") if getattr(exc, "headers", None) else None + return min(float(val), 30.0) if val else None + except (ValueError, TypeError, AttributeError): + return None + + def _fetch_subreddit_rss( ticker: str, sub: str, limit: int, timeout: float, + _retry: bool = True, ) -> list[dict]: - """Fallback path: parse the public Atom search feed for a subreddit. + """Default path: parse the public Atom search feed for a subreddit. Carries no score / comment counts, so those fields are left None and the - post is tagged ``source="rss"`` for honest display. + post is tagged ``source="rss"`` for honest display. On a 429 (Reddit's + per-IP rate limit) we back off once — honouring ``Retry-After`` when + present — before giving up, so a transient burst doesn't blank the feed. """ url = _RSS.format(sub=sub, qs=_search_qs(ticker, limit)) req = Request(url, headers={"User-Agent": _UA}) try: with urlopen(req, timeout=timeout) as resp: root = ET.fromstring(resp.read()) - except (HTTPError, URLError, TimeoutError, ET.ParseError) as exc: + except HTTPError as exc: + if exc.code == 429 and _retry: + wait = _retry_after_seconds(exc) or 5.0 + logger.warning( + "Reddit RSS 429 for r/%s · %s — backing off %.1fs then retrying once", + sub, ticker, wait, + ) + time.sleep(wait) + return _fetch_subreddit_rss(ticker, sub, limit, timeout, _retry=False) + logger.warning("Reddit RSS fetch failed for r/%s · %s: %s", sub, ticker, exc) + return [] + except (OSError, http.client.HTTPException, ET.ParseError) as exc: + # OSError covers URLError/TimeoutError/connection resets; HTTPException + # covers chunked-transfer errors (IncompleteRead/BadStatusLine, #1024). logger.warning("Reddit RSS fetch failed for r/%s · %s: %s", sub, ticker, exc) return [] @@ -116,12 +142,20 @@ def _fetch_subreddit_rss( return posts -def _fetch_subreddit( +def _fetch_subreddit_json( ticker: str, sub: str, limit: int, timeout: float, ) -> list[dict]: + """Richer JSON search path (carries score / comment counts). + + Reddit's WAF currently returns ``403 Blocked`` on this endpoint for + non-OAuth clients (issue #862), so it is NOT used by default — calling it on + every request only doubled our volume against the per-IP rate limit and + triggered 429s on the RSS fallback. Kept for the day the WAF relaxes or an + OAuth token is wired in; degrades to RSS on failure. + """ url = _API.format(sub=sub, qs=_search_qs(ticker, limit)) req = Request(url, headers={"User-Agent": _UA, "Accept": "application/json"}) try: @@ -129,7 +163,7 @@ def _fetch_subreddit( payload = json.loads(resp.read()) children = (payload.get("data") or {}).get("children") or [] return [c.get("data", {}) for c in children if isinstance(c, dict)] - except (HTTPError, URLError, json.JSONDecodeError, TimeoutError) as exc: + except (OSError, http.client.HTTPException, json.JSONDecodeError) as exc: logger.warning( "Reddit JSON fetch failed for r/%s · %s: %s — falling back to RSS feed.", sub, ticker, exc, @@ -137,18 +171,34 @@ def _fetch_subreddit( return _fetch_subreddit_rss(ticker, sub, limit, timeout) +def _fetch_subreddit( + ticker: str, + sub: str, + limit: int, + timeout: float, +) -> list[dict]: + """Fetch one subreddit, RSS-first. + + The JSON search endpoint is reliably WAF-blocked (403) for public clients, + so we go straight to the RSS feed — which serves our identified User-Agent + reliably — halving our request volume against Reddit's per-IP rate limit. + """ + return _fetch_subreddit_rss(ticker, sub, limit, timeout) + + def fetch_reddit_posts( ticker: str, subreddits: Iterable[str] = DEFAULT_SUBREDDITS, limit_per_sub: int = 5, timeout: float = 10.0, - inter_request_delay: float = 0.4, + inter_request_delay: float = 1.0, ) -> str: """Fetch recent Reddit posts mentioning ``ticker`` across finance subreddits and return them as a formatted plaintext block. - ``inter_request_delay`` keeps us under Reddit's public rate limit - (~10 req/min per IP) even if the caller queries many subreddits. + ``inter_request_delay`` paces the (now RSS-only) per-subreddit requests to + stay under Reddit's public per-IP rate limit; combined with the RSS-first + path it makes 429s rare even when several analyses run back-to-back. """ blocks = [] total_posts = 0