fix(reddit): go RSS-first with 429 backoff and robust transport errors

The JSON search endpoint is reliably WAF-blocked (403) for public clients, so
probing it on every call doubled request volume against Reddit's per-IP rate
limit and tripped 429 on the RSS fallback, blanking the sentiment feed. Fetch
the Atom/RSS feed directly (JSON kept as an opt-in path that still degrades to
RSS on 403), back off once on a 429 honouring Retry-After, and pace requests a
little wider. Also broaden the error handling to catch http.client chunked
transfer errors (IncompleteRead/BadStatusLine) alongside OSError, which on their
own slipped through and crashed the pipeline.
This commit is contained in:
Yijia-Xiao
2026-06-14 07:23:19 +00:00
parent 9fd54f8368
commit eeb84aa63b
2 changed files with 171 additions and 43 deletions

View File

@@ -1,7 +1,9 @@
"""Tests for the Reddit RSS/Atom fallback when the JSON endpoint 403s (#862).""" """Tests for the RSS-first Reddit fetcher, its 429 backoff, the opt-in JSON
path's degradation (#862), and chunked-transfer error handling (#1024)."""
from __future__ import annotations from __future__ import annotations
import http.client
from unittest.mock import patch from unittest.mock import patch
from urllib.error import HTTPError from urllib.error import HTTPError
@@ -9,7 +11,6 @@ import pytest
from tradingagents.dataflows import reddit from tradingagents.dataflows import reddit
_SAMPLE_ATOM = """<?xml version="1.0" encoding="UTF-8"?> _SAMPLE_ATOM = """<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom"> <feed xmlns="http://www.w3.org/2005/Atom">
<entry> <entry>
@@ -26,6 +27,30 @@ _SAMPLE_ATOM = """<?xml version="1.0" encoding="UTF-8"?>
""" """
def _resp(read_fn):
"""A minimal context-manager response whose read() runs ``read_fn``."""
class _Resp:
def __enter__(self_inner):
return self_inner
def __exit__(self_inner, *a):
return False
def read(self_inner):
return read_fn()
return _Resp()
def _atom_resp():
return _resp(lambda: _SAMPLE_ATOM.encode("utf-8"))
def _raise(exc):
def _r():
raise exc
return _resp(_r)
@pytest.mark.unit @pytest.mark.unit
class TestIsoToTimestamp: class TestIsoToTimestamp:
def test_parses_offset_and_z(self): def test_parses_offset_and_z(self):
@@ -48,19 +73,9 @@ class TestStripHtml:
@pytest.mark.unit @pytest.mark.unit
class TestRssFallbackParsing: class TestRssParsing:
def _patch_rss_response(self, xml_bytes):
class _Resp:
def __enter__(self_inner):
return self_inner
def __exit__(self_inner, *a):
return False
def read(self_inner):
return xml_bytes
return patch.object(reddit, "urlopen", return_value=_Resp())
def test_parses_atom_entries(self): def test_parses_atom_entries(self):
with self._patch_rss_response(_SAMPLE_ATOM.encode("utf-8")): with patch.object(reddit, "urlopen", return_value=_atom_resp()):
posts = reddit._fetch_subreddit_rss("NVDA", "stocks", limit=5, timeout=5.0) posts = reddit._fetch_subreddit_rss("NVDA", "stocks", limit=5, timeout=5.0)
assert len(posts) == 2 assert len(posts) == 2
assert posts[0]["title"] == "NVDA earnings beat, stock pops" assert posts[0]["title"] == "NVDA earnings beat, stock pops"
@@ -71,21 +86,84 @@ class TestRssFallbackParsing:
assert "datacenter unit" in posts[0]["selftext"] assert "datacenter unit" in posts[0]["selftext"]
def test_malformed_xml_fails_open(self): def test_malformed_xml_fails_open(self):
with self._patch_rss_response(b"<<not xml>>"): with patch.object(reddit, "urlopen", return_value=_resp(lambda: b"<<not xml>>")):
assert reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) == [] assert reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) == []
@pytest.mark.unit @pytest.mark.unit
class TestJsonFallsBackToRss: class TestFetchSubredditIsRssFirst:
def test_403_triggers_rss(self): """The default per-subreddit fetch goes straight to RSS — it must not hit
err = HTTPError("url", 403, "Blocked", {}, None) the WAF-blocked JSON endpoint, which only burned rate-limit budget."""
with patch.object(reddit, "urlopen", side_effect=err), \
patch.object(reddit, "_fetch_subreddit_rss", return_value=[{"title": "x", "source": "rss", "score": None, "num_comments": None, "created_utc": None, "selftext": ""}]) as rss: def test_delegates_to_rss_without_touching_json(self):
sentinel = [{"title": "x", "source": "rss", "score": None,
"num_comments": None, "created_utc": None, "selftext": ""}]
with patch.object(reddit, "_fetch_subreddit_rss", return_value=sentinel) as rss, \
patch.object(reddit, "urlopen",
side_effect=AssertionError("JSON endpoint must not be called")):
out = reddit._fetch_subreddit("NVDA", "stocks", 5, 5.0) out = reddit._fetch_subreddit("NVDA", "stocks", 5, 5.0)
rss.assert_called_once() rss.assert_called_once()
assert out is sentinel
@pytest.mark.unit
class TestJsonPathFallsBackToRss:
"""The opt-in JSON path still degrades to RSS on a 403 (kept for #862)."""
def test_403_triggers_rss(self):
err = HTTPError("url", 403, "Blocked", {}, None)
rss_posts = [{"title": "x", "source": "rss", "score": None,
"num_comments": None, "created_utc": None, "selftext": ""}]
with patch.object(reddit, "urlopen", side_effect=err), \
patch.object(reddit, "_fetch_subreddit_rss", return_value=rss_posts) as rss:
out = reddit._fetch_subreddit_json("NVDA", "stocks", 5, 5.0)
rss.assert_called_once()
assert out and out[0]["source"] == "rss" assert out and out[0]["source"] == "rss"
@pytest.mark.unit
class TestRss429Backoff:
def test_429_then_success_retries_once(self):
err = HTTPError("url", 429, "Too Many Requests", {}, None)
with patch.object(reddit, "urlopen", side_effect=[err, _atom_resp()]) as op, \
patch.object(reddit.time, "sleep") as slept:
posts = reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0)
assert op.call_count == 2 # original + exactly one retry
slept.assert_called_once() # backed off before retrying
assert len(posts) == 2
def test_429_twice_gives_up_after_one_retry(self):
err = HTTPError("url", 429, "Too Many Requests", {}, None)
with patch.object(reddit, "urlopen", side_effect=[err, err]) as op, \
patch.object(reddit.time, "sleep"):
posts = reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0)
assert op.call_count == 2 # one retry, then gives up cleanly
assert posts == []
def test_retry_after_header_is_honoured(self):
err = HTTPError("url", 429, "Too Many Requests", {"Retry-After": "12"}, None)
with patch.object(reddit, "urlopen", side_effect=[err, _atom_resp()]), \
patch.object(reddit.time, "sleep") as slept:
reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0)
slept.assert_called_once_with(12.0)
@pytest.mark.unit
class TestChunkedTransferErrorsHandled:
"""IncompleteRead/RemoteDisconnected come from http.client and are NOT
OSErrors, so they were previously uncaught and crashed the pipeline (#1024)."""
def test_rss_incomplete_read_degrades_to_empty(self):
with patch.object(reddit, "urlopen", return_value=_raise(http.client.IncompleteRead(b""))):
assert reddit._fetch_subreddit_rss("NVDA", "stocks", 5, 5.0) == []
def test_json_incomplete_read_falls_back_to_rss(self):
with patch.object(reddit, "urlopen", return_value=_raise(http.client.IncompleteRead(b""))), \
patch.object(reddit, "_fetch_subreddit_rss", return_value=[]) as rss:
reddit._fetch_subreddit_json("NVDA", "stocks", 5, 5.0)
rss.assert_called_once()
@pytest.mark.unit @pytest.mark.unit
class TestFormatterHandlesRssPosts: class TestFormatterHandlesRssPosts:
def test_rss_posts_omit_fake_counts_and_note_source(self): def test_rss_posts_omit_fake_counts_and_note_source(self):

View File

@@ -1,31 +1,32 @@
"""Reddit search fetcher for ticker-specific discussion posts. """Reddit search fetcher for ticker-specific discussion posts.
Primary path is Reddit's public JSON search endpoint Default path is Reddit's public Atom/RSS search feed
(``reddit.com/r/{sub}/search.json``), which carries the richest data (``reddit.com/r/{sub}/search.rss``). The richer JSON search endpoint
(score, comment count, body). Reddit's WAF increasingly returns (``/search.json``) is reliably WAF-blocked (``HTTP 403``) for public clients
``HTTP 403 Blocked`` on that endpoint (issue #862), so when the JSON request (issue #862), and probing it on every call only doubled our request volume
fails we transparently fall back to the public Atom/RSS search feed against Reddit's per-IP rate limit — tripping ``429`` on the RSS fallback — so
(``/search.rss``). The RSS feed is gated less aggressively and serves the it is kept (``_fetch_subreddit_json``) but not used by default. On a 429 we back
same descriptive User-Agent we already send; the fallback lacks score / off once (honouring ``Retry-After``). RSS lacks score / comment counts, so those
comment counts, so RSS-sourced posts are marked and the formatter omits those posts are marked and the formatter omits the metrics rather than printing fake
metrics rather than printing fake zeros. zeros.
No API key required either way. Returns formatted plaintext blocks ready for No API key required. Returns formatted plaintext blocks ready for prompt
prompt injection and degrades gracefully — returns a placeholder string injection and degrades gracefully — returns a placeholder string rather than
rather than raising, so callers never special-case missing data. raising, so callers never special-case missing data.
""" """
from __future__ import annotations from __future__ import annotations
import html import html
import http.client
import json import json
import logging import logging
import re import re
import time import time
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from collections.abc import Iterable
from datetime import datetime from datetime import datetime
from typing import Iterable, Optional from urllib.error import HTTPError
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode from urllib.parse import urlencode
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
@@ -56,7 +57,7 @@ def _search_qs(ticker: str, limit: int) -> str:
}) })
def _iso_to_timestamp(iso_str: Optional[str]) -> Optional[float]: def _iso_to_timestamp(iso_str: str | None) -> float | None:
"""Parse an Atom ``published`` timestamp to a UTC epoch, or None.""" """Parse an Atom ``published`` timestamp to a UTC epoch, or None."""
if not iso_str: if not iso_str:
return None return None
@@ -78,23 +79,48 @@ def _strip_html(content: str) -> str:
return " ".join(html.unescape(text).split()) return " ".join(html.unescape(text).split())
def _retry_after_seconds(exc: HTTPError) -> float | None:
"""Seconds to wait from a 429's ``Retry-After`` header, capped at 30s."""
try:
val = exc.headers.get("Retry-After") if getattr(exc, "headers", None) else None
return min(float(val), 30.0) if val else None
except (ValueError, TypeError, AttributeError):
return None
def _fetch_subreddit_rss( def _fetch_subreddit_rss(
ticker: str, ticker: str,
sub: str, sub: str,
limit: int, limit: int,
timeout: float, timeout: float,
_retry: bool = True,
) -> list[dict]: ) -> list[dict]:
"""Fallback path: parse the public Atom search feed for a subreddit. """Default path: parse the public Atom search feed for a subreddit.
Carries no score / comment counts, so those fields are left None and the Carries no score / comment counts, so those fields are left None and the
post is tagged ``source="rss"`` for honest display. post is tagged ``source="rss"`` for honest display. On a 429 (Reddit's
per-IP rate limit) we back off once — honouring ``Retry-After`` when
present — before giving up, so a transient burst doesn't blank the feed.
""" """
url = _RSS.format(sub=sub, qs=_search_qs(ticker, limit)) url = _RSS.format(sub=sub, qs=_search_qs(ticker, limit))
req = Request(url, headers={"User-Agent": _UA}) req = Request(url, headers={"User-Agent": _UA})
try: try:
with urlopen(req, timeout=timeout) as resp: with urlopen(req, timeout=timeout) as resp:
root = ET.fromstring(resp.read()) root = ET.fromstring(resp.read())
except (HTTPError, URLError, TimeoutError, ET.ParseError) as exc: except HTTPError as exc:
if exc.code == 429 and _retry:
wait = _retry_after_seconds(exc) or 5.0
logger.warning(
"Reddit RSS 429 for r/%s · %s — backing off %.1fs then retrying once",
sub, ticker, wait,
)
time.sleep(wait)
return _fetch_subreddit_rss(ticker, sub, limit, timeout, _retry=False)
logger.warning("Reddit RSS fetch failed for r/%s · %s: %s", sub, ticker, exc)
return []
except (OSError, http.client.HTTPException, ET.ParseError) as exc:
# OSError covers URLError/TimeoutError/connection resets; HTTPException
# covers chunked-transfer errors (IncompleteRead/BadStatusLine, #1024).
logger.warning("Reddit RSS fetch failed for r/%s · %s: %s", sub, ticker, exc) logger.warning("Reddit RSS fetch failed for r/%s · %s: %s", sub, ticker, exc)
return [] return []
@@ -116,12 +142,20 @@ def _fetch_subreddit_rss(
return posts return posts
def _fetch_subreddit( def _fetch_subreddit_json(
ticker: str, ticker: str,
sub: str, sub: str,
limit: int, limit: int,
timeout: float, timeout: float,
) -> list[dict]: ) -> list[dict]:
"""Richer JSON search path (carries score / comment counts).
Reddit's WAF currently returns ``403 Blocked`` on this endpoint for
non-OAuth clients (issue #862), so it is NOT used by default — calling it on
every request only doubled our volume against the per-IP rate limit and
triggered 429s on the RSS fallback. Kept for the day the WAF relaxes or an
OAuth token is wired in; degrades to RSS on failure.
"""
url = _API.format(sub=sub, qs=_search_qs(ticker, limit)) url = _API.format(sub=sub, qs=_search_qs(ticker, limit))
req = Request(url, headers={"User-Agent": _UA, "Accept": "application/json"}) req = Request(url, headers={"User-Agent": _UA, "Accept": "application/json"})
try: try:
@@ -129,7 +163,7 @@ def _fetch_subreddit(
payload = json.loads(resp.read()) payload = json.loads(resp.read())
children = (payload.get("data") or {}).get("children") or [] children = (payload.get("data") or {}).get("children") or []
return [c.get("data", {}) for c in children if isinstance(c, dict)] return [c.get("data", {}) for c in children if isinstance(c, dict)]
except (HTTPError, URLError, json.JSONDecodeError, TimeoutError) as exc: except (OSError, http.client.HTTPException, json.JSONDecodeError) as exc:
logger.warning( logger.warning(
"Reddit JSON fetch failed for r/%s · %s: %s — falling back to RSS feed.", "Reddit JSON fetch failed for r/%s · %s: %s — falling back to RSS feed.",
sub, ticker, exc, sub, ticker, exc,
@@ -137,18 +171,34 @@ def _fetch_subreddit(
return _fetch_subreddit_rss(ticker, sub, limit, timeout) return _fetch_subreddit_rss(ticker, sub, limit, timeout)
def _fetch_subreddit(
ticker: str,
sub: str,
limit: int,
timeout: float,
) -> list[dict]:
"""Fetch one subreddit, RSS-first.
The JSON search endpoint is reliably WAF-blocked (403) for public clients,
so we go straight to the RSS feed — which serves our identified User-Agent
reliably — halving our request volume against Reddit's per-IP rate limit.
"""
return _fetch_subreddit_rss(ticker, sub, limit, timeout)
def fetch_reddit_posts( def fetch_reddit_posts(
ticker: str, ticker: str,
subreddits: Iterable[str] = DEFAULT_SUBREDDITS, subreddits: Iterable[str] = DEFAULT_SUBREDDITS,
limit_per_sub: int = 5, limit_per_sub: int = 5,
timeout: float = 10.0, timeout: float = 10.0,
inter_request_delay: float = 0.4, inter_request_delay: float = 1.0,
) -> str: ) -> str:
"""Fetch recent Reddit posts mentioning ``ticker`` across finance """Fetch recent Reddit posts mentioning ``ticker`` across finance
subreddits and return them as a formatted plaintext block. subreddits and return them as a formatted plaintext block.
``inter_request_delay`` keeps us under Reddit's public rate limit ``inter_request_delay`` paces the (now RSS-only) per-subreddit requests to
(~10 req/min per IP) even if the caller queries many subreddits. stay under Reddit's public per-IP rate limit; combined with the RSS-first
path it makes 429s rare even when several analyses run back-to-back.
""" """
blocks = [] blocks = []
total_posts = 0 total_posts = 0