refactor: clean up codebase and streamline documentation

- Remove debug prints from vendor routing (interface.py)
- Simplify vendor fallback to only handle rate limits
- Reorder CLI provider menu: OpenAI, Google, Anthropic, xAI, OpenRouter, Ollama
- Remove dead files: local.py, reddit_utils.py, openai.py, google.py, googlenews_utils.py, yfin_utils.py
This commit is contained in:
Yijia Xiao
2026-02-03 22:10:01 +00:00
parent 224941d8c2
commit 102b026d23
10 changed files with 30 additions and 1085 deletions

View File

@@ -1,2 +1,5 @@
ALPHA_VANTAGE_API_KEY=alpha_vantage_api_key_placeholder
OPENAI_API_KEY=openai_api_key_placeholder
# LLM Providers (set the one you use)
OPENAI_API_KEY=
GOOGLE_API_KEY=
ANTHROPIC_API_KEY=
OPENROUTER_API_KEY=

View File

@@ -114,21 +114,23 @@ pip install -r requirements.txt
### Required APIs
You will need the OpenAI API for all the agents, and [Alpha Vantage API](https://www.alphavantage.co/support/#api-key) for fundamental and news data (default configuration).
TradingAgents supports multiple LLM providers. Set the API key for your chosen provider:
```bash
export OPENAI_API_KEY=$YOUR_OPENAI_API_KEY
export ALPHA_VANTAGE_API_KEY=$YOUR_ALPHA_VANTAGE_API_KEY
export OPENAI_API_KEY=... # OpenAI
export GOOGLE_API_KEY=... # Google (Gemini)
export ANTHROPIC_API_KEY=... # Anthropic (Claude)
export OPENROUTER_API_KEY=... # OpenRouter
export ALPHA_VANTAGE_API_KEY=... # Alpha Vantage
```
Alternatively, you can create a `.env` file in the project root with your API keys (see `.env.example` for reference):
For local models, configure Ollama with `llm_provider: "ollama"` in your config.
Alternatively, copy `.env.example` to `.env` and fill in your keys:
```bash
cp .env.example .env
# Edit .env with your actual API keys
```
**Note:** We are happy to partner with Alpha Vantage to provide robust API support for TradingAgents. You can get a free AlphaVantage API [here](https://www.alphavantage.co/support/#api-key), TradingAgents-sourced requests also have increased rate limits to 60 requests per minute with no daily limits. Typically the quota is sufficient for performing complex tasks with TradingAgents thanks to Alpha Vantages open-source support program. If you prefer to use OpenAI for these data sources instead, you can modify the data vendor settings in `tradingagents/default_config.py`.
### CLI Usage
You can also try out the CLI directly by running:
@@ -155,7 +157,7 @@ An interface will appear showing results as they load, letting you track the age
### Implementation Details
We built TradingAgents with LangGraph to ensure flexibility and modularity. We utilize `o1-preview` and `gpt-4o` as our deep thinking and fast thinking LLMs for our experiments. However, for testing purposes, we recommend you use `o4-mini` and `gpt-4.1-mini` to save on costs as our framework makes **lots of** API calls.
We built TradingAgents with LangGraph to ensure flexibility and modularity. The framework supports multiple LLM providers: OpenAI, Google, Anthropic, OpenRouter, and Ollama.
### Python Usage
@@ -168,7 +170,7 @@ from tradingagents.default_config import DEFAULT_CONFIG
ta = TradingAgentsGraph(debug=True, config=DEFAULT_CONFIG.copy())
# forward propagate
_, decision = ta.propagate("NVDA", "2024-05-10")
_, decision = ta.propagate("NVDA", "2026-01-15")
print(decision)
```
@@ -178,31 +180,18 @@ You can also adjust the default configuration to set your own choice of LLMs, de
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
# Create a custom config
config = DEFAULT_CONFIG.copy()
config["deep_think_llm"] = "gpt-4.1-nano" # Use a different model
config["quick_think_llm"] = "gpt-4.1-nano" # Use a different model
config["max_debate_rounds"] = 1 # Increase debate rounds
config["llm_provider"] = "openai" # openai, google, anthropic, openrouter, ollama
config["deep_think_llm"] = "gpt-5.2" # Model for complex reasoning
config["quick_think_llm"] = "gpt-5-mini" # Model for quick tasks
config["max_debate_rounds"] = 2
# Configure data vendors (default uses yfinance and Alpha Vantage)
config["data_vendors"] = {
"core_stock_apis": "yfinance", # Options: yfinance, alpha_vantage, local
"technical_indicators": "yfinance", # Options: yfinance, alpha_vantage, local
"fundamental_data": "alpha_vantage", # Options: openai, alpha_vantage, local
"news_data": "alpha_vantage", # Options: openai, alpha_vantage, google, local
}
# Initialize with custom config
ta = TradingAgentsGraph(debug=True, config=config)
# forward propagate
_, decision = ta.propagate("NVDA", "2024-05-10")
_, decision = ta.propagate("NVDA", "2026-01-15")
print(decision)
```
> The default configuration uses yfinance for stock price and technical data, and Alpha Vantage for fundamental and news data. For production use or if you encounter rate limits, consider upgrading to [Alpha Vantage Premium](https://www.alphavantage.co/premium/) for more stable and reliable data access. For offline experimentation, there's a local data vendor option that uses our **Tauric TradingDB**, a curated dataset for backtesting, though this is still in development. We're currently refining this dataset and plan to release it soon alongside our upcoming projects. Stay tuned!
You can view the full list of configurations in `tradingagents/default_config.py`.
See `tradingagents/default_config.py` for all configuration options.
## Contributing

View File

@@ -257,8 +257,8 @@ def select_llm_provider() -> tuple[str, str]:
# Define OpenAI api options with their corresponding endpoints
BASE_URLS = [
("OpenAI", "https://api.openai.com/v1"),
("Anthropic", "https://api.anthropic.com/"),
("Google", "https://generativelanguage.googleapis.com/v1"),
("Anthropic", "https://api.anthropic.com/"),
("xAI", "https://api.x.ai/v1"),
("Openrouter", "https://openrouter.ai/api/v1"),
("Ollama", "http://localhost:11434/v1"),

View File

@@ -1,30 +0,0 @@
from typing import Annotated
from datetime import datetime
from dateutil.relativedelta import relativedelta
from .googlenews_utils import getNewsData
def get_google_news(
query: Annotated[str, "Query to search with"],
curr_date: Annotated[str, "Curr date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "how many days to look back"],
) -> str:
query = query.replace(" ", "+")
start_date = datetime.strptime(curr_date, "%Y-%m-%d")
before = start_date - relativedelta(days=look_back_days)
before = before.strftime("%Y-%m-%d")
news_results = getNewsData(query, before, curr_date)
news_str = ""
for news in news_results:
news_str += (
f"### {news['title']} (source: {news['source']}) \n\n{news['snippet']}\n\n"
)
if len(news_results) == 0:
return ""
return f"## {query} Google News, from {before} to {curr_date}:\n\n{news_str}"

View File

@@ -1,108 +0,0 @@
import json
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
import random
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
retry_if_result,
)
def is_rate_limited(response):
"""Check if the response indicates rate limiting (status code 429)"""
return response.status_code == 429
@retry(
retry=(retry_if_result(is_rate_limited)),
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5),
)
def make_request(url, headers):
"""Make a request with retry logic for rate limiting"""
# Random delay before each request to avoid detection
time.sleep(random.uniform(2, 6))
response = requests.get(url, headers=headers)
return response
def getNewsData(query, start_date, end_date):
"""
Scrape Google News search results for a given query and date range.
query: str - search query
start_date: str - start date in the format yyyy-mm-dd or mm/dd/yyyy
end_date: str - end date in the format yyyy-mm-dd or mm/dd/yyyy
"""
if "-" in start_date:
start_date = datetime.strptime(start_date, "%Y-%m-%d")
start_date = start_date.strftime("%m/%d/%Y")
if "-" in end_date:
end_date = datetime.strptime(end_date, "%Y-%m-%d")
end_date = end_date.strftime("%m/%d/%Y")
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/101.0.4951.54 Safari/537.36"
)
}
news_results = []
page = 0
while True:
offset = page * 10
url = (
f"https://www.google.com/search?q={query}"
f"&tbs=cdr:1,cd_min:{start_date},cd_max:{end_date}"
f"&tbm=nws&start={offset}"
)
try:
response = make_request(url, headers)
soup = BeautifulSoup(response.content, "html.parser")
results_on_page = soup.select("div.SoaBEf")
if not results_on_page:
break # No more results found
for el in results_on_page:
try:
link = el.find("a")["href"]
title = el.select_one("div.MBeuO").get_text()
snippet = el.select_one(".GI74Re").get_text()
date = el.select_one(".LfVVr").get_text()
source = el.select_one(".NUnG9d span").get_text()
news_results.append(
{
"link": link,
"title": title,
"snippet": snippet,
"date": date,
"source": source,
}
)
except Exception as e:
print(f"Error processing result: {e}")
# If one of the fields is not found, skip this result
continue
# Update the progress bar with the current count of results scraped
# Check for the "Next" link (pagination)
next_link = soup.find("a", id="pnnext")
if not next_link:
break
page += 1
except Exception as e:
print(f"Failed after multiple retries: {e}")
break
return news_results

View File

@@ -135,103 +135,28 @@ def route_to_vendor(method: str, *args, **kwargs):
"""Route method calls to appropriate vendor implementation with fallback support."""
category = get_category_for_method(method)
vendor_config = get_vendor(category, method)
# Handle comma-separated vendors
primary_vendors = [v.strip() for v in vendor_config.split(',')]
if method not in VENDOR_METHODS:
raise ValueError(f"Method '{method}' not supported")
# Get all available vendors for this method for fallback
# Build fallback chain: primary vendors first, then remaining available vendors
all_available_vendors = list(VENDOR_METHODS[method].keys())
# Create fallback vendor list: primary vendors first, then remaining vendors as fallbacks
fallback_vendors = primary_vendors.copy()
for vendor in all_available_vendors:
if vendor not in fallback_vendors:
fallback_vendors.append(vendor)
# Debug: Print fallback ordering
primary_str = "".join(primary_vendors)
fallback_str = "".join(fallback_vendors)
print(f"DEBUG: {method} - Primary: [{primary_str}] | Full fallback order: [{fallback_str}]")
# Track results and execution state
results = []
vendor_attempt_count = 0
any_primary_vendor_attempted = False
successful_vendor = None
for vendor in fallback_vendors:
if vendor not in VENDOR_METHODS[method]:
if vendor in primary_vendors:
print(f"INFO: Vendor '{vendor}' not supported for method '{method}', falling back to next vendor")
continue
vendor_impl = VENDOR_METHODS[method][vendor]
is_primary_vendor = vendor in primary_vendors
vendor_attempt_count += 1
impl_func = vendor_impl[0] if isinstance(vendor_impl, list) else vendor_impl
# Track if we attempted any primary vendor
if is_primary_vendor:
any_primary_vendor_attempted = True
# Debug: Print current attempt
vendor_type = "PRIMARY" if is_primary_vendor else "FALLBACK"
print(f"DEBUG: Attempting {vendor_type} vendor '{vendor}' for {method} (attempt #{vendor_attempt_count})")
# Handle list of methods for a vendor
if isinstance(vendor_impl, list):
vendor_methods = [(impl, vendor) for impl in vendor_impl]
print(f"DEBUG: Vendor '{vendor}' has multiple implementations: {len(vendor_methods)} functions")
else:
vendor_methods = [(vendor_impl, vendor)]
# Run methods for this vendor
vendor_results = []
for impl_func, vendor_name in vendor_methods:
try:
print(f"DEBUG: Calling {impl_func.__name__} from vendor '{vendor_name}'...")
result = impl_func(*args, **kwargs)
vendor_results.append(result)
print(f"SUCCESS: {impl_func.__name__} from vendor '{vendor_name}' completed successfully")
return impl_func(*args, **kwargs)
except AlphaVantageRateLimitError:
continue # Only rate limits trigger fallback
except AlphaVantageRateLimitError as e:
if vendor == "alpha_vantage":
print(f"RATE_LIMIT: Alpha Vantage rate limit exceeded, falling back to next available vendor")
print(f"DEBUG: Rate limit details: {e}")
# Continue to next vendor for fallback
continue
except Exception as e:
# Log error but continue with other implementations
print(f"FAILED: {impl_func.__name__} from vendor '{vendor_name}' failed: {e}")
continue
# Add this vendor's results
if vendor_results:
results.extend(vendor_results)
successful_vendor = vendor
result_summary = f"Got {len(vendor_results)} result(s)"
print(f"SUCCESS: Vendor '{vendor}' succeeded - {result_summary}")
# Stopping logic: Stop after first successful vendor for single-vendor configs
# Multiple vendor configs (comma-separated) may want to collect from multiple sources
if len(primary_vendors) == 1:
print(f"DEBUG: Stopping after successful vendor '{vendor}' (single-vendor config)")
break
else:
print(f"FAILED: Vendor '{vendor}' produced no results")
# Final result summary
if not results:
print(f"FAILURE: All {vendor_attempt_count} vendor attempts failed for method '{method}'")
raise RuntimeError(f"All vendor implementations failed for method '{method}'")
else:
print(f"FINAL: Method '{method}' completed with {len(results)} result(s) from {vendor_attempt_count} vendor attempt(s)")
# Return single result if only one, otherwise concatenate as string
if len(results) == 1:
return results[0]
else:
# Convert all results to strings and concatenate
return '\n'.join(str(result) for result in results)
raise RuntimeError(f"No available vendor for '{method}'")

View File

@@ -1,475 +0,0 @@
from typing import Annotated
import pandas as pd
import os
from .config import DATA_DIR
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json
from .reddit_utils import fetch_top_from_category
from tqdm import tqdm
def get_YFin_data_window(
symbol: Annotated[str, "ticker symbol of the company"],
curr_date: Annotated[str, "Start date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "how many days to look back"],
) -> str:
# calculate past days
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
before = date_obj - relativedelta(days=look_back_days)
start_date = before.strftime("%Y-%m-%d")
# read in data
data = pd.read_csv(
os.path.join(
DATA_DIR,
f"market_data/price_data/{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
)
)
# Extract just the date part for comparison
data["DateOnly"] = data["Date"].str[:10]
# Filter data between the start and end dates (inclusive)
filtered_data = data[
(data["DateOnly"] >= start_date) & (data["DateOnly"] <= curr_date)
]
# Drop the temporary column we created
filtered_data = filtered_data.drop("DateOnly", axis=1)
# Set pandas display options to show the full DataFrame
with pd.option_context(
"display.max_rows", None, "display.max_columns", None, "display.width", None
):
df_string = filtered_data.to_string()
return (
f"## Raw Market Data for {symbol} from {start_date} to {curr_date}:\n\n"
+ df_string
)
def get_YFin_data(
symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
# read in data
data = pd.read_csv(
os.path.join(
DATA_DIR,
f"market_data/price_data/{symbol}-YFin-data-2015-01-01-2025-03-25.csv",
)
)
if end_date > "2025-03-25":
raise Exception(
f"Get_YFin_Data: {end_date} is outside of the data range of 2015-01-01 to 2025-03-25"
)
# Extract just the date part for comparison
data["DateOnly"] = data["Date"].str[:10]
# Filter data between the start and end dates (inclusive)
filtered_data = data[
(data["DateOnly"] >= start_date) & (data["DateOnly"] <= end_date)
]
# Drop the temporary column we created
filtered_data = filtered_data.drop("DateOnly", axis=1)
# remove the index from the dataframe
filtered_data = filtered_data.reset_index(drop=True)
return filtered_data
def get_finnhub_news(
query: Annotated[str, "Search query or ticker symbol"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
):
"""
Retrieve news about a company within a time frame
Args
query (str): Search query or ticker symbol
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns
str: dataframe containing the news of the company in the time frame
"""
result = get_data_in_range(query, start_date, end_date, "news_data", DATA_DIR)
if len(result) == 0:
return ""
combined_result = ""
for day, data in result.items():
if len(data) == 0:
continue
for entry in data:
current_news = (
"### " + entry["headline"] + f" ({day})" + "\n" + entry["summary"]
)
combined_result += current_news + "\n\n"
return f"## {query} News, from {start_date} to {end_date}:\n" + str(combined_result)
def get_finnhub_company_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve insider sentiment about a company (retrieved from public SEC information) for the past 15 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading on, yyyy-mm-dd
Returns:
str: a report of the sentiment in the past 15 days starting at curr_date
"""
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
before = date_obj - relativedelta(days=15) # Default 15 days lookback
before = before.strftime("%Y-%m-%d")
data = get_data_in_range(ticker, before, curr_date, "insider_senti", DATA_DIR)
if len(data) == 0:
return ""
result_str = ""
seen_dicts = []
for date, senti_list in data.items():
for entry in senti_list:
if entry not in seen_dicts:
result_str += f"### {entry['year']}-{entry['month']}:\nChange: {entry['change']}\nMonthly Share Purchase Ratio: {entry['mspr']}\n\n"
seen_dicts.append(entry)
return (
f"## {ticker} Insider Sentiment Data for {before} to {curr_date}:\n"
+ result_str
+ "The change field refers to the net buying/selling from all insiders' transactions. The mspr field refers to monthly share purchase ratio."
)
def get_finnhub_company_insider_transactions(
ticker: Annotated[str, "ticker symbol"],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
"""
Retrieve insider transcaction information about a company (retrieved from public SEC information) for the past 15 days
Args:
ticker (str): ticker symbol of the company
curr_date (str): current date you are trading at, yyyy-mm-dd
Returns:
str: a report of the company's insider transaction/trading informtaion in the past 15 days
"""
date_obj = datetime.strptime(curr_date, "%Y-%m-%d")
before = date_obj - relativedelta(days=15) # Default 15 days lookback
before = before.strftime("%Y-%m-%d")
data = get_data_in_range(ticker, before, curr_date, "insider_trans", DATA_DIR)
if len(data) == 0:
return ""
result_str = ""
seen_dicts = []
for date, senti_list in data.items():
for entry in senti_list:
if entry not in seen_dicts:
result_str += f"### Filing Date: {entry['filingDate']}, {entry['name']}:\nChange:{entry['change']}\nShares: {entry['share']}\nTransaction Price: {entry['transactionPrice']}\nTransaction Code: {entry['transactionCode']}\n\n"
seen_dicts.append(entry)
return (
f"## {ticker} insider transactions from {before} to {curr_date}:\n"
+ result_str
+ "The change field reflects the variation in share count—here a negative number indicates a reduction in holdings—while share specifies the total number of shares involved. The transactionPrice denotes the per-share price at which the trade was executed, and transactionDate marks when the transaction occurred. The name field identifies the insider making the trade, and transactionCode (e.g., S for sale) clarifies the nature of the transaction. FilingDate records when the transaction was officially reported, and the unique id links to the specific SEC filing, as indicated by the source. Additionally, the symbol ties the transaction to a particular company, isDerivative flags whether the trade involves derivative securities, and currency notes the currency context of the transaction."
)
def get_data_in_range(ticker, start_date, end_date, data_type, data_dir, period=None):
"""
Gets finnhub data saved and processed on disk.
Args:
start_date (str): Start date in YYYY-MM-DD format.
end_date (str): End date in YYYY-MM-DD format.
data_type (str): Type of data from finnhub to fetch. Can be insider_trans, SEC_filings, news_data, insider_senti, or fin_as_reported.
data_dir (str): Directory where the data is saved.
period (str): Default to none, if there is a period specified, should be annual or quarterly.
"""
if period:
data_path = os.path.join(
data_dir,
"finnhub_data",
data_type,
f"{ticker}_{period}_data_formatted.json",
)
else:
data_path = os.path.join(
data_dir, "finnhub_data", data_type, f"{ticker}_data_formatted.json"
)
data = open(data_path, "r")
data = json.load(data)
# filter keys (date, str in format YYYY-MM-DD) by the date range (str, str in format YYYY-MM-DD)
filtered_data = {}
for key, value in data.items():
if start_date <= key <= end_date and len(value) > 0:
filtered_data[key] = value
return filtered_data
def get_simfin_balance_sheet(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual / quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
data_path = os.path.join(
DATA_DIR,
"fundamental_data",
"simfin_data_all",
"balance_sheet",
"companies",
"us",
f"us-balance-{freq}.csv",
)
df = pd.read_csv(data_path, sep=";")
# Convert date strings to datetime objects and remove any time components
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
# Convert the current date to datetime and normalize
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
# Check if there are any available reports; if not, return a notification
if filtered_df.empty:
print("No balance sheet available before the given current date.")
return ""
# Get the most recent balance sheet by selecting the row with the latest Publish Date
latest_balance_sheet = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
# drop the SimFinID column
latest_balance_sheet = latest_balance_sheet.drop("SimFinId")
return (
f"## {freq} balance sheet for {ticker} released on {str(latest_balance_sheet['Publish Date'])[0:10]}: \n"
+ str(latest_balance_sheet)
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a breakdown of assets, liabilities, and equity. Assets are grouped as current (liquid items like cash and receivables) and noncurrent (long-term investments and property). Liabilities are split between short-term obligations and long-term debts, while equity reflects shareholder funds such as paid-in capital and retained earnings. Together, these components ensure that total assets equal the sum of liabilities and equity."
)
def get_simfin_cashflow(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual / quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
data_path = os.path.join(
DATA_DIR,
"fundamental_data",
"simfin_data_all",
"cash_flow",
"companies",
"us",
f"us-cashflow-{freq}.csv",
)
df = pd.read_csv(data_path, sep=";")
# Convert date strings to datetime objects and remove any time components
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
# Convert the current date to datetime and normalize
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
# Check if there are any available reports; if not, return a notification
if filtered_df.empty:
print("No cash flow statement available before the given current date.")
return ""
# Get the most recent cash flow statement by selecting the row with the latest Publish Date
latest_cash_flow = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
# drop the SimFinID column
latest_cash_flow = latest_cash_flow.drop("SimFinId")
return (
f"## {freq} cash flow statement for {ticker} released on {str(latest_cash_flow['Publish Date'])[0:10]}: \n"
+ str(latest_cash_flow)
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a breakdown of cash movements. Operating activities show cash generated from core business operations, including net income adjustments for non-cash items and working capital changes. Investing activities cover asset acquisitions/disposals and investments. Financing activities include debt transactions, equity issuances/repurchases, and dividend payments. The net change in cash represents the overall increase or decrease in the company's cash position during the reporting period."
)
def get_simfin_income_statements(
ticker: Annotated[str, "ticker symbol"],
freq: Annotated[
str,
"reporting frequency of the company's financial history: annual / quarterly",
],
curr_date: Annotated[str, "current date you are trading at, yyyy-mm-dd"],
):
data_path = os.path.join(
DATA_DIR,
"fundamental_data",
"simfin_data_all",
"income_statements",
"companies",
"us",
f"us-income-{freq}.csv",
)
df = pd.read_csv(data_path, sep=";")
# Convert date strings to datetime objects and remove any time components
df["Report Date"] = pd.to_datetime(df["Report Date"], utc=True).dt.normalize()
df["Publish Date"] = pd.to_datetime(df["Publish Date"], utc=True).dt.normalize()
# Convert the current date to datetime and normalize
curr_date_dt = pd.to_datetime(curr_date, utc=True).normalize()
# Filter the DataFrame for the given ticker and for reports that were published on or before the current date
filtered_df = df[(df["Ticker"] == ticker) & (df["Publish Date"] <= curr_date_dt)]
# Check if there are any available reports; if not, return a notification
if filtered_df.empty:
print("No income statement available before the given current date.")
return ""
# Get the most recent income statement by selecting the row with the latest Publish Date
latest_income = filtered_df.loc[filtered_df["Publish Date"].idxmax()]
# drop the SimFinID column
latest_income = latest_income.drop("SimFinId")
return (
f"## {freq} income statement for {ticker} released on {str(latest_income['Publish Date'])[0:10]}: \n"
+ str(latest_income)
+ "\n\nThis includes metadata like reporting dates and currency, share details, and a comprehensive breakdown of the company's financial performance. Starting with Revenue, it shows Cost of Revenue and resulting Gross Profit. Operating Expenses are detailed, including SG&A, R&D, and Depreciation. The statement then shows Operating Income, followed by non-operating items and Interest Expense, leading to Pretax Income. After accounting for Income Tax and any Extraordinary items, it concludes with Net Income, representing the company's bottom-line profit or loss for the period."
)
def get_reddit_global_news(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "Number of days to look back"] = 7,
limit: Annotated[int, "Maximum number of articles to return"] = 5,
) -> str:
"""
Retrieve the latest top reddit news
Args:
curr_date: Current date in yyyy-mm-dd format
look_back_days: Number of days to look back (default 7)
limit: Maximum number of articles to return (default 5)
Returns:
str: A formatted string containing the latest news articles posts on reddit
"""
curr_date_dt = datetime.strptime(curr_date, "%Y-%m-%d")
before = curr_date_dt - relativedelta(days=look_back_days)
before = before.strftime("%Y-%m-%d")
posts = []
# iterate from before to curr_date
curr_iter_date = datetime.strptime(before, "%Y-%m-%d")
total_iterations = (curr_date_dt - curr_iter_date).days + 1
pbar = tqdm(desc=f"Getting Global News on {curr_date}", total=total_iterations)
while curr_iter_date <= curr_date_dt:
curr_date_str = curr_iter_date.strftime("%Y-%m-%d")
fetch_result = fetch_top_from_category(
"global_news",
curr_date_str,
limit,
data_path=os.path.join(DATA_DIR, "reddit_data"),
)
posts.extend(fetch_result)
curr_iter_date += relativedelta(days=1)
pbar.update(1)
pbar.close()
if len(posts) == 0:
return ""
news_str = ""
for post in posts:
if post["content"] == "":
news_str += f"### {post['title']}\n\n"
else:
news_str += f"### {post['title']}\n\n{post['content']}\n\n"
return f"## Global News Reddit, from {before} to {curr_date}:\n{news_str}"
def get_reddit_company_news(
query: Annotated[str, "Search query or ticker symbol"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve the latest top reddit news
Args:
query: Search query or ticker symbol
start_date: Start date in yyyy-mm-dd format
end_date: End date in yyyy-mm-dd format
Returns:
str: A formatted string containing news articles posts on reddit
"""
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
posts = []
# iterate from start_date to end_date
curr_date = start_date_dt
total_iterations = (end_date_dt - curr_date).days + 1
pbar = tqdm(
desc=f"Getting Company News for {query} from {start_date} to {end_date}",
total=total_iterations,
)
while curr_date <= end_date_dt:
curr_date_str = curr_date.strftime("%Y-%m-%d")
fetch_result = fetch_top_from_category(
"company_news",
curr_date_str,
10, # max limit per day
query,
data_path=os.path.join(DATA_DIR, "reddit_data"),
)
posts.extend(fetch_result)
curr_date += relativedelta(days=1)
pbar.update(1)
pbar.close()
if len(posts) == 0:
return ""
news_str = ""
for post in posts:
if post["content"] == "":
news_str += f"### {post['title']}\n\n"
else:
news_str += f"### {post['title']}\n\n{post['content']}\n\n"
return f"##{query} News Reddit, from {start_date} to {end_date}:\n\n{news_str}"

View File

@@ -1,107 +0,0 @@
from openai import OpenAI
from .config import get_config
def get_stock_news_openai(query, start_date, end_date):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
response = client.responses.create(
model=config["quick_think_llm"],
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search Social Media for {query} from {start_date} to {end_date}? Make sure you only get the data posted during that period.",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_global_news_openai(curr_date, look_back_days=7, limit=5):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
response = client.responses.create(
model=config["quick_think_llm"],
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search global or macroeconomics news from {look_back_days} days before {curr_date} to {curr_date} that would be informative for trading purposes? Make sure you only get the data posted during that period. Limit the results to {limit} articles.",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text
def get_fundamentals_openai(ticker, curr_date):
config = get_config()
client = OpenAI(base_url=config["backend_url"])
response = client.responses.create(
model=config["quick_think_llm"],
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": f"Can you search Fundamental for discussions on {ticker} during of the month before {curr_date} to the month of {curr_date}. Make sure you only get the data posted during that period. List as a table, with PE/PS/Cash flow/ etc",
}
],
}
],
text={"format": {"type": "text"}},
reasoning={},
tools=[
{
"type": "web_search_preview",
"user_location": {"type": "approximate"},
"search_context_size": "low",
}
],
temperature=1,
max_output_tokens=4096,
top_p=1,
store=True,
)
return response.output[1].content[0].text

View File

@@ -1,135 +0,0 @@
import requests
import time
import json
from datetime import datetime, timedelta
from contextlib import contextmanager
from typing import Annotated
import os
import re
ticker_to_company = {
"AAPL": "Apple",
"MSFT": "Microsoft",
"GOOGL": "Google",
"AMZN": "Amazon",
"TSLA": "Tesla",
"NVDA": "Nvidia",
"TSM": "Taiwan Semiconductor Manufacturing Company OR TSMC",
"JPM": "JPMorgan Chase OR JP Morgan",
"JNJ": "Johnson & Johnson OR JNJ",
"V": "Visa",
"WMT": "Walmart",
"META": "Meta OR Facebook",
"AMD": "AMD",
"INTC": "Intel",
"QCOM": "Qualcomm",
"BABA": "Alibaba",
"ADBE": "Adobe",
"NFLX": "Netflix",
"CRM": "Salesforce",
"PYPL": "PayPal",
"PLTR": "Palantir",
"MU": "Micron",
"SQ": "Block OR Square",
"ZM": "Zoom",
"CSCO": "Cisco",
"SHOP": "Shopify",
"ORCL": "Oracle",
"X": "Twitter OR X",
"SPOT": "Spotify",
"AVGO": "Broadcom",
"ASML": "ASML ",
"TWLO": "Twilio",
"SNAP": "Snap Inc.",
"TEAM": "Atlassian",
"SQSP": "Squarespace",
"UBER": "Uber",
"ROKU": "Roku",
"PINS": "Pinterest",
}
def fetch_top_from_category(
category: Annotated[
str, "Category to fetch top post from. Collection of subreddits."
],
date: Annotated[str, "Date to fetch top posts from."],
max_limit: Annotated[int, "Maximum number of posts to fetch."],
query: Annotated[str, "Optional query to search for in the subreddit."] = None,
data_path: Annotated[
str,
"Path to the data folder. Default is 'reddit_data'.",
] = "reddit_data",
):
base_path = data_path
all_content = []
if max_limit < len(os.listdir(os.path.join(base_path, category))):
raise ValueError(
"REDDIT FETCHING ERROR: max limit is less than the number of files in the category. Will not be able to fetch any posts"
)
limit_per_subreddit = max_limit // len(
os.listdir(os.path.join(base_path, category))
)
for data_file in os.listdir(os.path.join(base_path, category)):
# check if data_file is a .jsonl file
if not data_file.endswith(".jsonl"):
continue
all_content_curr_subreddit = []
with open(os.path.join(base_path, category, data_file), "rb") as f:
for i, line in enumerate(f):
# skip empty lines
if not line.strip():
continue
parsed_line = json.loads(line)
# select only lines that are from the date
post_date = datetime.utcfromtimestamp(
parsed_line["created_utc"]
).strftime("%Y-%m-%d")
if post_date != date:
continue
# if is company_news, check that the title or the content has the company's name (query) mentioned
if "company" in category and query:
search_terms = []
if "OR" in ticker_to_company[query]:
search_terms = ticker_to_company[query].split(" OR ")
else:
search_terms = [ticker_to_company[query]]
search_terms.append(query)
found = False
for term in search_terms:
if re.search(
term, parsed_line["title"], re.IGNORECASE
) or re.search(term, parsed_line["selftext"], re.IGNORECASE):
found = True
break
if not found:
continue
post = {
"title": parsed_line["title"],
"content": parsed_line["selftext"],
"url": parsed_line["url"],
"upvotes": parsed_line["ups"],
"posted_date": post_date,
}
all_content_curr_subreddit.append(post)
# sort all_content_curr_subreddit by upvote_ratio in descending order
all_content_curr_subreddit.sort(key=lambda x: x["upvotes"], reverse=True)
all_content.extend(all_content_curr_subreddit[:limit_per_subreddit])
return all_content

View File

@@ -1,117 +0,0 @@
# gets data/stats
import yfinance as yf
from typing import Annotated, Callable, Any, Optional
from pandas import DataFrame
import pandas as pd
from functools import wraps
from .utils import save_output, SavePathType, decorate_all_methods
def init_ticker(func: Callable) -> Callable:
"""Decorator to initialize yf.Ticker and pass it to the function."""
@wraps(func)
def wrapper(symbol: Annotated[str, "ticker symbol"], *args, **kwargs) -> Any:
ticker = yf.Ticker(symbol)
return func(ticker, *args, **kwargs)
return wrapper
@decorate_all_methods(init_ticker)
class YFinanceUtils:
def get_stock_data(
symbol: Annotated[str, "ticker symbol"],
start_date: Annotated[
str, "start date for retrieving stock price data, YYYY-mm-dd"
],
end_date: Annotated[
str, "end date for retrieving stock price data, YYYY-mm-dd"
],
save_path: SavePathType = None,
) -> DataFrame:
"""retrieve stock price data for designated ticker symbol"""
ticker = symbol
# add one day to the end_date so that the data range is inclusive
end_date = pd.to_datetime(end_date) + pd.DateOffset(days=1)
end_date = end_date.strftime("%Y-%m-%d")
stock_data = ticker.history(start=start_date, end=end_date)
# save_output(stock_data, f"Stock data for {ticker.ticker}", save_path)
return stock_data
def get_stock_info(
symbol: Annotated[str, "ticker symbol"],
) -> dict:
"""Fetches and returns latest stock information."""
ticker = symbol
stock_info = ticker.info
return stock_info
def get_company_info(
symbol: Annotated[str, "ticker symbol"],
save_path: Optional[str] = None,
) -> DataFrame:
"""Fetches and returns company information as a DataFrame."""
ticker = symbol
info = ticker.info
company_info = {
"Company Name": info.get("shortName", "N/A"),
"Industry": info.get("industry", "N/A"),
"Sector": info.get("sector", "N/A"),
"Country": info.get("country", "N/A"),
"Website": info.get("website", "N/A"),
}
company_info_df = DataFrame([company_info])
if save_path:
company_info_df.to_csv(save_path)
print(f"Company info for {ticker.ticker} saved to {save_path}")
return company_info_df
def get_stock_dividends(
symbol: Annotated[str, "ticker symbol"],
save_path: Optional[str] = None,
) -> DataFrame:
"""Fetches and returns the latest dividends data as a DataFrame."""
ticker = symbol
dividends = ticker.dividends
if save_path:
dividends.to_csv(save_path)
print(f"Dividends for {ticker.ticker} saved to {save_path}")
return dividends
def get_income_stmt(symbol: Annotated[str, "ticker symbol"]) -> DataFrame:
"""Fetches and returns the latest income statement of the company as a DataFrame."""
ticker = symbol
income_stmt = ticker.financials
return income_stmt
def get_balance_sheet(symbol: Annotated[str, "ticker symbol"]) -> DataFrame:
"""Fetches and returns the latest balance sheet of the company as a DataFrame."""
ticker = symbol
balance_sheet = ticker.balance_sheet
return balance_sheet
def get_cash_flow(symbol: Annotated[str, "ticker symbol"]) -> DataFrame:
"""Fetches and returns the latest cash flow statement of the company as a DataFrame."""
ticker = symbol
cash_flow = ticker.cashflow
return cash_flow
def get_analyst_recommendations(symbol: Annotated[str, "ticker symbol"]) -> tuple:
"""Fetches the latest analyst recommendations and returns the most common recommendation and its count."""
ticker = symbol
recommendations = ticker.recommendations
if recommendations.empty:
return None, 0 # No recommendations available
# Assuming 'period' column exists and needs to be excluded
row_0 = recommendations.iloc[0, 1:] # Exclude 'period' column if necessary
# Find the maximum voting result
max_votes = row_0.max()
majority_voting_result = row_0[row_0 == max_votes].index.tolist()
return majority_voting_result[0], max_votes