From 3d5b3a39494499fd055a75ef662c989467ac94a8 Mon Sep 17 00:00:00 2001 From: kleinpanic Date: Tue, 15 Apr 2025 02:24:21 +0000 Subject: [PATCH] pulls historical data --- IB_Gateway/test3.py | 273 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 IB_Gateway/test3.py diff --git a/IB_Gateway/test3.py b/IB_Gateway/test3.py new file mode 100644 index 0000000..77cb71c --- /dev/null +++ b/IB_Gateway/test3.py @@ -0,0 +1,273 @@ +import threading +import time +import csv +import json +from datetime import datetime, timedelta +from ibapi.client import EClient +from ibapi.wrapper import EWrapper +from ibapi.contract import Contract + +################################################################################ +# 1) CONTRACT DEFINITION & ROLLOVER +################################################################################ + +# Approximate 3rd-Friday expiration dates for quarterly MES futures from June 2022 -> Mar 2025 +quarter_expirations = { + "202206": datetime(2022, 6, 17), + "202209": datetime(2022, 9, 16), + "202212": datetime(2022, 12, 16), + "202303": datetime(2023, 3, 17), + "202306": datetime(2023, 6, 16), + "202309": datetime(2023, 9, 15), + "202312": datetime(2023, 12, 15), + "202403": datetime(2024, 3, 15), + "202406": datetime(2024, 6, 21), + "202409": datetime(2024, 9, 20), + "202412": datetime(2024, 12, 20), + "202503": datetime(2025, 3, 21), +} + +# Build “rolling windows”: from (previous expiration + 1 day) to (this expiration) +sorted_contracts = sorted(quarter_expirations.items(), key=lambda kv: kv[0]) +rolling_windows = [] +prev_exp = None +for c_str, exp_date in sorted_contracts: + if prev_exp is None: + # For the first contract, just assume it might start ~90 days before + start_active = exp_date - timedelta(days=90) + else: + start_active = prev_exp + timedelta(days=1) + end_active = exp_date + prev_exp = exp_date + rolling_windows.append((c_str, start_active, end_active)) + +# Our overall request window +overall_start = datetime(2022, 4, 1) +overall_end = datetime(2025, 4, 1) + +def get_contract_intervals(): + """ + Return a list of (contractMonth, interval_start, interval_end) + that fall within [overall_start, overall_end]. + """ + intervals = [] + for (c_str, c_start, c_end) in rolling_windows: + if c_end <= overall_start: + continue + if c_start >= overall_end: + continue + eff_start = max(c_start, overall_start) + eff_end = min(c_end, overall_end) + if eff_start < eff_end: + intervals.append((c_str, eff_start, eff_end)) + return intervals + + +################################################################################ +# 2) IB APP & CONNECTION +################################################################################ + +class HistoricalApp(EWrapper, EClient): + def __init__(self): + EClient.__init__(self, self) + self.connection_ready = threading.Event() + self.data_event = threading.Event() + self.current_req_id = None + self.historical_data = [] + self.error_flag = False + + def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""): + # Filter typical farm messages + if errorCode in (2104, 2106, 2107, 2108): + return + + print(f"ERROR {reqId} {errorCode}: {errorString}") + # Connection failure + if reqId == -1 and errorCode == 502: + self.connection_ready.set() + self.error_flag = True + return + # If it's for our current request, set error_flag + if reqId == self.current_req_id: + self.error_flag = True + self.data_event.set() + + def nextValidId(self, orderId): + print(f"Connected. NextValidId={orderId}") + self.connection_ready.set() + + def historicalData(self, reqId, bar): + if reqId == self.current_req_id: + self.historical_data.append(bar) + + def historicalDataEnd(self, reqId, start, end): + if reqId == self.current_req_id: + print(" Chunk complete.") + self.data_event.set() + +def ensure_connected(app, timeout=15): + if not app.connection_ready.wait(timeout=timeout): + raise RuntimeError("Timed out waiting for IB connection.") + + +################################################################################ +# 3) MAIN SCRIPT: FETCH 5-MINUTE RTH BARS, CASTING DECIMALS TO FLOAT, SAVE JSON/CSV +################################################################################ + +def main(): + intervals = get_contract_intervals() + + # Start up the IB API + app = HistoricalApp() + print("Connecting to IB Gateway (paper port=4002)...") + app.connect("127.0.0.1", 4002, clientId=101) + + api_thread = threading.Thread(target=app.run, daemon=True) + api_thread.start() + + try: + ensure_connected(app, 15) + if app.error_flag: + print("Connection error. Exiting.") + app.disconnect() + return + except RuntimeError as e: + print(e) + app.disconnect() + return + + all_bars = [] + req_id = 1 + + for (c_month, start_dt, end_dt) in intervals: + contract = Contract() + contract.symbol = "MES" + contract.secType = "FUT" + contract.exchange = "CME" # or GLOBEX if that works better for you + contract.currency = "USD" + contract.includeExpired = True + contract.lastTradeDateOrContractMonth = c_month # e.g. "202306" + + # Iterate in ~30-day chunks + chunk_start = start_dt + print(f"\nContract {c_month}, active {start_dt.date()} -> {end_dt.date()} (RTH).") + + while chunk_start < end_dt: + chunk_end = chunk_start + timedelta(days=29) + if chunk_end > end_dt: + chunk_end = end_dt + + # IB UTC format => "YYYYMMDD-HH:mm:ss", e.g. "20230430-00:00:00" + end_str = chunk_end.strftime("%Y%m%d-00:00:00") + days_count = (chunk_end - chunk_start).days + 1 + duration_str = f"{days_count} D" + + # Prepare request + app.historical_data.clear() + app.error_flag = False + app.data_event.clear() + app.current_req_id = req_id + + print(f" Requesting {c_month} from {chunk_start.date()} to {chunk_end.date()}" + f" -> end='{end_str}', dur='{duration_str}'") + + app.reqHistoricalData( + reqId=req_id, + contract=contract, + endDateTime=end_str, + durationStr=duration_str, + barSizeSetting="5 mins", + whatToShow="TRADES", + useRTH=1, # Only regular trading hours + formatDate=1, + keepUpToDate=False, + chartOptions=[] + ) + + finished = app.data_event.wait(60) + if not finished: + print(f" Timeout on request {req_id}. Breaking out for contract {c_month}.") + break + if app.error_flag: + print(f" Error on request {req_id}. Aborting {c_month}.") + break + + # Store chunk data + for bar in app.historical_data: + # Convert bar fields from Decimal -> float, volume -> int (if safe) + try: + o = float(bar.open) + h = float(bar.high) + l = float(bar.low) + c_ = float(bar.close) + v = float(bar.volume) # or int(bar.volume) if you prefer + except: + # fallback to string if something is weird + o = str(bar.open) + h = str(bar.high) + l = str(bar.low) + c_ = str(bar.close) + v = str(bar.volume) + + all_bars.append({ + "contract": c_month, + "barDate": bar.date, # might include time zone suffix + "open": o, + "high": h, + "low": l, + "close": c_, + "volume": v + }) + + req_id += 1 + chunk_start = chunk_end + timedelta(days=1) + time.sleep(2) # pacing + + # Done + app.disconnect() + api_thread.join(2) + + # We’ll parse bar.date if needed to sort chronologically + # Some bars have "YYYYMMDD HH:MM:SS" or "YYYYMMDD" or "YYYYMMDD HH:MM:SS US/Central" + def parse_bar_date(d_str): + d_str = " ".join(d_str.strip().split()) # remove double spaces + parts = d_str.split() + if len(parts) == 1: + # daily => "YYYYMMDD" + return datetime.strptime(d_str, "%Y%m%d") + elif len(parts) == 3: + # e.g. "20230810 09:30:00 US/Central" + dt_part = " ".join(parts[:2]) # "20230810 09:30:00" + return datetime.strptime(dt_part, "%Y%m%d %H:%M:%S") + else: + # e.g. "20230810 09:30:00" + return datetime.strptime(d_str, "%Y%m%d %H:%M:%S") + + all_bars.sort(key=lambda x: (x["contract"], parse_bar_date(x["barDate"]))) + + # Save to JSON + json_file = "MES_5min_data.json" + print(f"\nSaving {len(all_bars)} bars to {json_file} ...") + with open(json_file, "w") as jf: + json.dump(all_bars, jf, indent=2) # no error with floats + + # Save to CSV + csv_file = "MES_5min_data.csv" + print(f"Saving {len(all_bars)} bars to {csv_file} ...") + with open(csv_file, "w", newline="") as cf: + writer = csv.writer(cf) + writer.writerow(["contract","barDate","open","high","low","close","volume"]) + for row in all_bars: + writer.writerow([ + row["contract"], row["barDate"], + row["open"], row["high"], + row["low"], row["close"], + row["volume"] + ]) + + print(f"Done! Retrieved {len(all_bars)} total bars.") + + +if __name__ == "__main__": + main() +