organizing
This commit is contained in:
233
old_stuff/data_collection_daily.py
Normal file
233
old_stuff/data_collection_daily.py
Normal file
@@ -0,0 +1,233 @@
|
||||
import signal
|
||||
from ibapi.client import EClient
|
||||
from ibapi.wrapper import EWrapper
|
||||
from ibapi.contract import Contract
|
||||
import threading
|
||||
import time
|
||||
import pandas as pd
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from tqdm import tqdm # For progress bar
|
||||
import os
|
||||
|
||||
class IBApi(EWrapper, EClient):
|
||||
def __init__(self):
|
||||
EClient.__init__(self, self)
|
||||
self.data = []
|
||||
self.df = pd.DataFrame()
|
||||
self.data_retrieved = False
|
||||
|
||||
def historicalData(self, reqId, bar):
|
||||
# Debug: Print each received bar
|
||||
print(f"Received bar: Date={bar.date}, Open={bar.open}, High={bar.high}, Low={bar.low}, Close={bar.close}, Volume={bar.volume}")
|
||||
self.data.append({
|
||||
"Date": bar.date,
|
||||
"Open": bar.open,
|
||||
"High": bar.high,
|
||||
"Low": bar.low,
|
||||
"Close": bar.close,
|
||||
"Volume": bar.volume
|
||||
})
|
||||
|
||||
def historicalDataEnd(self, reqId, start, end):
|
||||
# Debug: Indicate end of data reception
|
||||
print(f"HistoricalDataEnd received. Start: {start}, End: {end}. Number of bars fetched: {len(self.data)}")
|
||||
chunk_df = pd.DataFrame(self.data)
|
||||
if not chunk_df.empty:
|
||||
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
|
||||
else:
|
||||
print("No data received in this request.")
|
||||
self.data_retrieved = True
|
||||
self.data = [] # Reset data list for next request
|
||||
|
||||
class IBApp:
|
||||
def __init__(self):
|
||||
self.app = IBApi()
|
||||
|
||||
def connect(self):
|
||||
# Connect to IB API (ensure IB Gateway or TWS is running)
|
||||
print("Connecting to IB API...")
|
||||
self.app.connect("127.0.0.1", 4002, clientId=1)
|
||||
# Start the API thread
|
||||
thread = threading.Thread(target=self.run_app, daemon=True)
|
||||
thread.start()
|
||||
time.sleep(1) # Allow time for connection
|
||||
print("Connected to IB API.")
|
||||
|
||||
def run_app(self):
|
||||
self.app.run()
|
||||
|
||||
def request_data(self, contract, end_date, duration, bar_size):
|
||||
# Request historical data
|
||||
print(f"Requesting data: endDateTime={end_date}, durationStr={duration}, barSizeSetting={bar_size}")
|
||||
self.app.reqHistoricalData(
|
||||
reqId=1,
|
||||
contract=contract,
|
||||
endDateTime=end_date,
|
||||
durationStr=duration,
|
||||
barSizeSetting=bar_size,
|
||||
whatToShow="TRADES",
|
||||
useRTH=1, # Use regular trading hours
|
||||
formatDate=1,
|
||||
keepUpToDate=False,
|
||||
chartOptions=[]
|
||||
)
|
||||
# Wait until data is retrieved
|
||||
while not self.app.data_retrieved:
|
||||
time.sleep(0.1)
|
||||
self.app.data_retrieved = False # Reset flag for next request
|
||||
|
||||
def fetch_historical_data_yearly(self, symbol, sec_type, exchange, currency, start_date, end_date, bar_size="1 day"):
|
||||
"""
|
||||
Fetch historical data in yearly chunks to cover 3 years.
|
||||
"""
|
||||
try:
|
||||
contract = Contract()
|
||||
contract.symbol = symbol
|
||||
contract.secType = sec_type
|
||||
contract.exchange = exchange
|
||||
contract.currency = currency
|
||||
|
||||
delta = timedelta(days=365)
|
||||
current_end_date = end_date
|
||||
|
||||
total_years = 3 # Fetch 3 years of data
|
||||
with tqdm(total=total_years, desc="Fetching Data", unit="year") as pbar:
|
||||
for _ in range(total_years):
|
||||
current_start_date = current_end_date - delta
|
||||
end_date_str = current_end_date.strftime("%Y%m%d %H:%M:%S UTC")
|
||||
self.request_data(contract, end_date_str, "1 Y", bar_size)
|
||||
pbar.update(1)
|
||||
current_end_date = current_start_date
|
||||
time.sleep(1) # Respect IB API pacing
|
||||
except Exception as e:
|
||||
print(f"Error fetching data: {e}")
|
||||
|
||||
def fetch_historical_data(self, symbol, sec_type, exchange, currency, existing_df=None):
|
||||
"""
|
||||
Fetch historical data for the given symbol.
|
||||
If existing_df is provided, fetch data after the last date in existing_df.
|
||||
Otherwise, fetch the entire 3 years of data.
|
||||
"""
|
||||
try:
|
||||
contract = Contract()
|
||||
contract.symbol = symbol
|
||||
contract.secType = sec_type
|
||||
contract.exchange = exchange
|
||||
contract.currency = currency
|
||||
|
||||
bar_size = "1 day" # Set bar size to 1 day for daily data
|
||||
duration = "1 Y" # Fetch 1 year at a time
|
||||
|
||||
if existing_df is not None and not existing_df.empty:
|
||||
# Get the last date from existing data
|
||||
last_date_str = existing_df['Date'].iloc[-1]
|
||||
# Clean up the date string to have single space
|
||||
last_date_str = last_date_str.strip().replace(' ', ' ')
|
||||
# Parse the last date as timezone-aware datetime (assuming UTC)
|
||||
try:
|
||||
# Try parsing in 'YYYYMMDD HH:MM:SS' format
|
||||
last_date = datetime.strptime(last_date_str, "%Y%m%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
try:
|
||||
# If that fails, try 'YYYY-MM-DD HH:MM:SS' format
|
||||
last_date = datetime.strptime(last_date_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
print(f"Error parsing last_date_str: {last_date_str}")
|
||||
return
|
||||
|
||||
# Remove any future dates if present
|
||||
current_time = datetime.now(timezone.utc)
|
||||
existing_df = existing_df[existing_df['Date'] <= current_time]
|
||||
print(f"Last valid date after cleaning: {last_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# Fetch new data in yearly chunks
|
||||
# Since we need 3 years of data, and assuming existing_df has some, adjust accordingly
|
||||
# For simplicity, fetch the entire 3 years again
|
||||
# Alternatively, fetch data from last_date forward
|
||||
|
||||
# Here, we'll fetch 3 years of data up to current_date
|
||||
end_date = datetime.now(timezone.utc)
|
||||
start_date = end_date - timedelta(days=365 * 3)
|
||||
self.fetch_historical_data_yearly(symbol, sec_type, exchange, currency, start_date, end_date, bar_size)
|
||||
else:
|
||||
# No existing data, fetch all 3 years
|
||||
end_date = datetime.now(timezone.utc)
|
||||
self.fetch_historical_data_yearly(symbol, sec_type, exchange, currency, end_date - timedelta(days=365*3), end_date, bar_size)
|
||||
except Exception as e:
|
||||
print(f"Error fetching data: {e}")
|
||||
|
||||
def disconnect(self):
|
||||
self.app.disconnect()
|
||||
print("Disconnected from IB API.")
|
||||
|
||||
def get_user_input():
|
||||
print("Provide the stock details for historical data retrieval.")
|
||||
try:
|
||||
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
|
||||
sec_type = "STK" # Automatically set to Stock
|
||||
exchange = "SMART" # Automatically set to SMART routing
|
||||
currency = "USD" # Automatically set to USD
|
||||
|
||||
if not symbol:
|
||||
raise ValueError("Stock symbol is required. Please try again.")
|
||||
|
||||
return symbol, sec_type, exchange, currency
|
||||
except Exception as e:
|
||||
print(f"Input Error: {e}")
|
||||
return None
|
||||
|
||||
def graceful_exit(signal_received, frame):
|
||||
print("\nTerminating program...")
|
||||
app.disconnect()
|
||||
exit(0)
|
||||
|
||||
# Handle graceful exit on Ctrl+C
|
||||
signal.signal(signal.SIGINT, graceful_exit)
|
||||
|
||||
# Initialize and connect the IBApp
|
||||
app = IBApp()
|
||||
app.connect()
|
||||
|
||||
try:
|
||||
user_input = get_user_input()
|
||||
if user_input:
|
||||
symbol, sec_type, exchange, currency = user_input
|
||||
|
||||
# Define the filename (save directly in current directory)
|
||||
filename = f"{symbol}_3yr_daily_data.csv"
|
||||
|
||||
# Fetch historical data
|
||||
app.fetch_historical_data(symbol, sec_type, exchange, currency)
|
||||
|
||||
# Retrieve fetched data
|
||||
data = app.app.df
|
||||
if not data.empty:
|
||||
print(f"Number of data points fetched: {len(data)}")
|
||||
|
||||
# Clean and parse the 'Date' column
|
||||
# Attempt multiple formats
|
||||
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
|
||||
|
||||
# Check if timezone is present; if not, localize to UTC
|
||||
if data['Date'].dt.tz is None:
|
||||
data['Date'] = data['Date'].dt.tz_localize(timezone.utc, ambiguous='NaT', nonexistent='NaT')
|
||||
|
||||
# Remove any rows with NaT in 'Date'
|
||||
data.dropna(subset=['Date'], inplace=True)
|
||||
|
||||
# Sort by 'Date' ascending
|
||||
data.sort_values(by='Date', inplace=True)
|
||||
|
||||
# Reset index
|
||||
data.reset_index(drop=True, inplace=True)
|
||||
|
||||
# Save to CSV
|
||||
data.to_csv(filename, index=False)
|
||||
print(f"Data saved to {filename}.")
|
||||
print(data.head())
|
||||
else:
|
||||
print("No data retrieved.")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
finally:
|
||||
app.disconnect()
|
||||
Reference in New Issue
Block a user