Files
gwitt1Repo/data_collection_daily.py
2025-01-29 19:23:11 -05:00

234 lines
8.9 KiB
Python

import signal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
import pandas as pd
from datetime import datetime, timedelta, timezone
from tqdm import tqdm # For progress bar
import os
class IBApi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = []
self.df = pd.DataFrame()
self.data_retrieved = False
def historicalData(self, reqId, bar):
# Debug: Print each received bar
print(f"Received bar: Date={bar.date}, Open={bar.open}, High={bar.high}, Low={bar.low}, Close={bar.close}, Volume={bar.volume}")
self.data.append({
"Date": bar.date,
"Open": bar.open,
"High": bar.high,
"Low": bar.low,
"Close": bar.close,
"Volume": bar.volume
})
def historicalDataEnd(self, reqId, start, end):
# Debug: Indicate end of data reception
print(f"HistoricalDataEnd received. Start: {start}, End: {end}. Number of bars fetched: {len(self.data)}")
chunk_df = pd.DataFrame(self.data)
if not chunk_df.empty:
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
else:
print("No data received in this request.")
self.data_retrieved = True
self.data = [] # Reset data list for next request
class IBApp:
def __init__(self):
self.app = IBApi()
def connect(self):
# Connect to IB API (ensure IB Gateway or TWS is running)
print("Connecting to IB API...")
self.app.connect("127.0.0.1", 4002, clientId=1)
# Start the API thread
thread = threading.Thread(target=self.run_app, daemon=True)
thread.start()
time.sleep(1) # Allow time for connection
print("Connected to IB API.")
def run_app(self):
self.app.run()
def request_data(self, contract, end_date, duration, bar_size):
# Request historical data
print(f"Requesting data: endDateTime={end_date}, durationStr={duration}, barSizeSetting={bar_size}")
self.app.reqHistoricalData(
reqId=1,
contract=contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow="TRADES",
useRTH=1, # Use regular trading hours
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
# Wait until data is retrieved
while not self.app.data_retrieved:
time.sleep(0.1)
self.app.data_retrieved = False # Reset flag for next request
def fetch_historical_data_yearly(self, symbol, sec_type, exchange, currency, start_date, end_date, bar_size="1 day"):
"""
Fetch historical data in yearly chunks to cover 3 years.
"""
try:
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
delta = timedelta(days=365)
current_end_date = end_date
total_years = 3 # Fetch 3 years of data
with tqdm(total=total_years, desc="Fetching Data", unit="year") as pbar:
for _ in range(total_years):
current_start_date = current_end_date - delta
end_date_str = current_end_date.strftime("%Y%m%d %H:%M:%S UTC")
self.request_data(contract, end_date_str, "1 Y", bar_size)
pbar.update(1)
current_end_date = current_start_date
time.sleep(1) # Respect IB API pacing
except Exception as e:
print(f"Error fetching data: {e}")
def fetch_historical_data(self, symbol, sec_type, exchange, currency, existing_df=None):
"""
Fetch historical data for the given symbol.
If existing_df is provided, fetch data after the last date in existing_df.
Otherwise, fetch the entire 3 years of data.
"""
try:
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
bar_size = "1 day" # Set bar size to 1 day for daily data
duration = "1 Y" # Fetch 1 year at a time
if existing_df is not None and not existing_df.empty:
# Get the last date from existing data
last_date_str = existing_df['Date'].iloc[-1]
# Clean up the date string to have single space
last_date_str = last_date_str.strip().replace(' ', ' ')
# Parse the last date as timezone-aware datetime (assuming UTC)
try:
# Try parsing in 'YYYYMMDD HH:MM:SS' format
last_date = datetime.strptime(last_date_str, "%Y%m%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
try:
# If that fails, try 'YYYY-MM-DD HH:MM:SS' format
last_date = datetime.strptime(last_date_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
print(f"Error parsing last_date_str: {last_date_str}")
return
# Remove any future dates if present
current_time = datetime.now(timezone.utc)
existing_df = existing_df[existing_df['Date'] <= current_time]
print(f"Last valid date after cleaning: {last_date.strftime('%Y-%m-%d %H:%M:%S')}")
# Fetch new data in yearly chunks
# Since we need 3 years of data, and assuming existing_df has some, adjust accordingly
# For simplicity, fetch the entire 3 years again
# Alternatively, fetch data from last_date forward
# Here, we'll fetch 3 years of data up to current_date
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=365 * 3)
self.fetch_historical_data_yearly(symbol, sec_type, exchange, currency, start_date, end_date, bar_size)
else:
# No existing data, fetch all 3 years
end_date = datetime.now(timezone.utc)
self.fetch_historical_data_yearly(symbol, sec_type, exchange, currency, end_date - timedelta(days=365*3), end_date, bar_size)
except Exception as e:
print(f"Error fetching data: {e}")
def disconnect(self):
self.app.disconnect()
print("Disconnected from IB API.")
def get_user_input():
print("Provide the stock details for historical data retrieval.")
try:
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
sec_type = "STK" # Automatically set to Stock
exchange = "SMART" # Automatically set to SMART routing
currency = "USD" # Automatically set to USD
if not symbol:
raise ValueError("Stock symbol is required. Please try again.")
return symbol, sec_type, exchange, currency
except Exception as e:
print(f"Input Error: {e}")
return None
def graceful_exit(signal_received, frame):
print("\nTerminating program...")
app.disconnect()
exit(0)
# Handle graceful exit on Ctrl+C
signal.signal(signal.SIGINT, graceful_exit)
# Initialize and connect the IBApp
app = IBApp()
app.connect()
try:
user_input = get_user_input()
if user_input:
symbol, sec_type, exchange, currency = user_input
# Define the filename (save directly in current directory)
filename = f"{symbol}_3yr_daily_data.csv"
# Fetch historical data
app.fetch_historical_data(symbol, sec_type, exchange, currency)
# Retrieve fetched data
data = app.app.df
if not data.empty:
print(f"Number of data points fetched: {len(data)}")
# Clean and parse the 'Date' column
# Attempt multiple formats
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
# Check if timezone is present; if not, localize to UTC
if data['Date'].dt.tz is None:
data['Date'] = data['Date'].dt.tz_localize(timezone.utc, ambiguous='NaT', nonexistent='NaT')
# Remove any rows with NaT in 'Date'
data.dropna(subset=['Date'], inplace=True)
# Sort by 'Date' ascending
data.sort_values(by='Date', inplace=True)
# Reset index
data.reset_index(drop=True, inplace=True)
# Save to CSV
data.to_csv(filename, index=False)
print(f"Data saved to {filename}.")
print(data.head())
else:
print("No data retrieved.")
except Exception as e:
print(f"Error: {e}")
finally:
app.disconnect()