Files
gwitt1Repo/recent_data_pull.py
2025-01-29 19:23:11 -05:00

171 lines
5.7 KiB
Python

import signal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
import pandas as pd
from datetime import datetime, timedelta, timezone
import os
class IBApi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = []
self.df = pd.DataFrame()
self.data_retrieved = False
def historicalData(self, reqId, bar):
# Debug: Print each received bar
print(f"Received bar: Date={bar.date}, Open={bar.open}, High={bar.high}, Low={bar.low}, Close={bar.close}, Volume={bar.volume}")
self.data.append({
"Date": bar.date,
"Open": bar.open,
"High": bar.high,
"Low": bar.low,
"Close": bar.close,
"Volume": bar.volume
})
def historicalDataEnd(self, reqId, start, end):
# Debug: Indicate end of data reception
print(f"HistoricalDataEnd received. Start: {start}, End: {end}. Number of bars fetched: {len(self.data)}")
chunk_df = pd.DataFrame(self.data)
if not chunk_df.empty:
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
else:
print("No data received in this request.")
self.data_retrieved = True
self.data = [] # Reset data list for next request
class IBApp:
def __init__(self):
self.app = IBApi()
def connect(self):
# Connect to IB API (ensure IB Gateway or TWS is running)
print("Connecting to IB API...")
self.app.connect("127.0.0.1", 4002, clientId=1)
# Start the API thread
thread = threading.Thread(target=self.run_app, daemon=True)
thread.start()
time.sleep(1) # Allow time for connection
print("Connected to IB API.")
def run_app(self):
self.app.run()
def request_data(self, contract, end_date, duration, bar_size):
# Request historical data
print(f"Requesting data: endDateTime={end_date}, durationStr={duration}, barSizeSetting={bar_size}")
self.app.reqHistoricalData(
reqId=1,
contract=contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow="TRADES",
useRTH=1, # Use regular trading hours
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
# Wait until data is retrieved
while not self.app.data_retrieved:
time.sleep(0.1)
self.app.data_retrieved = False # Reset flag for next request
def fetch_recent_data(self, symbol, sec_type, exchange, currency):
try:
# Define the contract
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
# Set duration and bar size for last 2 days
duration = "2 D" # 2 days
bar_size = "1 min" # 1-minute intervals
# Set end_date to now in UTC
end_date = datetime.now(timezone.utc)
end_date_str = end_date.strftime("%Y%m%d %H:%M:%S UTC")
print(f"Fetching data up to {end_date_str} for the last {duration} with bar size {bar_size}")
self.request_data(contract, end_date_str, duration, bar_size)
except Exception as e:
print(f"Error fetching data: {e}")
def disconnect(self):
self.app.disconnect()
print("Disconnected from IB API.")
def get_user_input():
print("Provide the stock details for historical data retrieval.")
try:
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
sec_type = "STK" # Automatically set to Stock
exchange = "SMART" # Automatically set to SMART routing
currency = "USD" # Automatically set to USD
if not symbol:
raise ValueError("Stock symbol is required. Please try again.")
return symbol, sec_type, exchange, currency
except Exception as e:
print(f"Input Error: {e}")
return None
def graceful_exit(signal_received, frame):
print("\nTerminating program...")
app.disconnect()
exit(0)
# Handle graceful exit on Ctrl+C
signal.signal(signal.SIGINT, graceful_exit)
# Initialize and connect the IBApp
app = IBApp()
app.connect()
try:
user_input = get_user_input()
if user_input:
symbol, sec_type, exchange, currency = user_input
# Define the filename (save directly in current directory)
filename = f"{symbol}_recent_data.csv"
# Fetch recent data (last 2 days)
app.fetch_recent_data(symbol, sec_type, exchange, currency)
# Retrieve fetched data
data = app.app.df
if not data.empty:
print(f"Number of data points fetched: {len(data)}")
# Clean and parse the 'Date' column
# Attempt multiple formats
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
# Check if timezone is present; if not, localize to UTC
if data['Date'].dt.tz is None:
data['Date'] = data['Date'].dt.tz_localize(timezone.utc, ambiguous='NaT', nonexistent='NaT')
# Remove any rows with NaT in 'Date'
data.dropna(subset=['Date'], inplace=True)
# Sort by 'Date' ascending
data.sort_values(by='Date', inplace=True)
# Save to CSV
data.to_csv(filename, index=False)
print(f"Data saved to {filename}.")
print(data.tail())
else:
print("No new data fetched.")
except Exception as e:
print(f"Error: {e}")
finally:
app.disconnect()