import threading import time import csv import json from datetime import datetime, timedelta from ibapi.client import EClient from ibapi.wrapper import EWrapper from ibapi.contract import Contract ################################################################################ # 1) CONTRACT DEFINITION & ROLLOVER ################################################################################ # Approximate 3rd-Friday expiration dates for quarterly MES futures from June 2022 -> Mar 2025 quarter_expirations = { "202206": datetime(2022, 6, 17), "202209": datetime(2022, 9, 16), "202212": datetime(2022, 12, 16), "202303": datetime(2023, 3, 17), "202306": datetime(2023, 6, 16), "202309": datetime(2023, 9, 15), "202312": datetime(2023, 12, 15), "202403": datetime(2024, 3, 15), "202406": datetime(2024, 6, 21), "202409": datetime(2024, 9, 20), "202412": datetime(2024, 12, 20), "202503": datetime(2025, 3, 21), } # Build “rolling windows”: from (previous expiration + 1 day) to (this expiration) sorted_contracts = sorted(quarter_expirations.items(), key=lambda kv: kv[0]) rolling_windows = [] prev_exp = None for c_str, exp_date in sorted_contracts: if prev_exp is None: # For the first contract, just assume it might start ~90 days before start_active = exp_date - timedelta(days=90) else: start_active = prev_exp + timedelta(days=1) end_active = exp_date prev_exp = exp_date rolling_windows.append((c_str, start_active, end_active)) # Our overall request window overall_start = datetime(2022, 4, 1) overall_end = datetime(2025, 4, 1) def get_contract_intervals(): """ Return a list of (contractMonth, interval_start, interval_end) that fall within [overall_start, overall_end]. """ intervals = [] for (c_str, c_start, c_end) in rolling_windows: if c_end <= overall_start: continue if c_start >= overall_end: continue eff_start = max(c_start, overall_start) eff_end = min(c_end, overall_end) if eff_start < eff_end: intervals.append((c_str, eff_start, eff_end)) return intervals ################################################################################ # 2) IB APP & CONNECTION ################################################################################ class HistoricalApp(EWrapper, EClient): def __init__(self): EClient.__init__(self, self) self.connection_ready = threading.Event() self.data_event = threading.Event() self.current_req_id = None self.historical_data = [] self.error_flag = False def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""): # Filter typical farm messages if errorCode in (2104, 2106, 2107, 2108): return print(f"ERROR {reqId} {errorCode}: {errorString}") # Connection failure if reqId == -1 and errorCode == 502: self.connection_ready.set() self.error_flag = True return # If it's for our current request, set error_flag if reqId == self.current_req_id: self.error_flag = True self.data_event.set() def nextValidId(self, orderId): print(f"Connected. NextValidId={orderId}") self.connection_ready.set() def historicalData(self, reqId, bar): if reqId == self.current_req_id: self.historical_data.append(bar) def historicalDataEnd(self, reqId, start, end): if reqId == self.current_req_id: print(" Chunk complete.") self.data_event.set() def ensure_connected(app, timeout=15): if not app.connection_ready.wait(timeout=timeout): raise RuntimeError("Timed out waiting for IB connection.") ################################################################################ # 3) MAIN SCRIPT: FETCH 5-MINUTE RTH BARS, CASTING DECIMALS TO FLOAT, SAVE JSON/CSV ################################################################################ def main(): intervals = get_contract_intervals() # Start up the IB API app = HistoricalApp() print("Connecting to IB Gateway (paper port=4002)...") app.connect("127.0.0.1", 4002, clientId=101) api_thread = threading.Thread(target=app.run, daemon=True) api_thread.start() try: ensure_connected(app, 15) if app.error_flag: print("Connection error. Exiting.") app.disconnect() return except RuntimeError as e: print(e) app.disconnect() return all_bars = [] req_id = 1 for (c_month, start_dt, end_dt) in intervals: contract = Contract() contract.symbol = "MES" contract.secType = "FUT" contract.exchange = "CME" # or GLOBEX if that works better for you contract.currency = "USD" contract.includeExpired = True contract.lastTradeDateOrContractMonth = c_month # e.g. "202306" # Iterate in ~30-day chunks chunk_start = start_dt print(f"\nContract {c_month}, active {start_dt.date()} -> {end_dt.date()} (RTH).") while chunk_start < end_dt: chunk_end = chunk_start + timedelta(days=29) if chunk_end > end_dt: chunk_end = end_dt # IB UTC format => "YYYYMMDD-HH:mm:ss", e.g. "20230430-00:00:00" end_str = chunk_end.strftime("%Y%m%d-00:00:00") days_count = (chunk_end - chunk_start).days + 1 duration_str = f"{days_count} D" # Prepare request app.historical_data.clear() app.error_flag = False app.data_event.clear() app.current_req_id = req_id print(f" Requesting {c_month} from {chunk_start.date()} to {chunk_end.date()}" f" -> end='{end_str}', dur='{duration_str}'") app.reqHistoricalData( reqId=req_id, contract=contract, endDateTime=end_str, durationStr=duration_str, barSizeSetting="5 mins", whatToShow="TRADES", useRTH=1, # Only regular trading hours formatDate=1, keepUpToDate=False, chartOptions=[] ) finished = app.data_event.wait(60) if not finished: print(f" Timeout on request {req_id}. Breaking out for contract {c_month}.") break if app.error_flag: print(f" Error on request {req_id}. Aborting {c_month}.") break # Store chunk data for bar in app.historical_data: # Convert bar fields from Decimal -> float, volume -> int (if safe) try: o = float(bar.open) h = float(bar.high) l = float(bar.low) c_ = float(bar.close) v = float(bar.volume) # or int(bar.volume) if you prefer except: # fallback to string if something is weird o = str(bar.open) h = str(bar.high) l = str(bar.low) c_ = str(bar.close) v = str(bar.volume) all_bars.append({ "contract": c_month, "barDate": bar.date, # might include time zone suffix "open": o, "high": h, "low": l, "close": c_, "volume": v }) req_id += 1 chunk_start = chunk_end + timedelta(days=1) time.sleep(2) # pacing # Done app.disconnect() api_thread.join(2) # We’ll parse bar.date if needed to sort chronologically # Some bars have "YYYYMMDD HH:MM:SS" or "YYYYMMDD" or "YYYYMMDD HH:MM:SS US/Central" def parse_bar_date(d_str): d_str = " ".join(d_str.strip().split()) # remove double spaces parts = d_str.split() if len(parts) == 1: # daily => "YYYYMMDD" return datetime.strptime(d_str, "%Y%m%d") elif len(parts) == 3: # e.g. "20230810 09:30:00 US/Central" dt_part = " ".join(parts[:2]) # "20230810 09:30:00" return datetime.strptime(dt_part, "%Y%m%d %H:%M:%S") else: # e.g. "20230810 09:30:00" return datetime.strptime(d_str, "%Y%m%d %H:%M:%S") all_bars.sort(key=lambda x: (x["contract"], parse_bar_date(x["barDate"]))) # Save to JSON json_file = "MES_5min_data.json" print(f"\nSaving {len(all_bars)} bars to {json_file} ...") with open(json_file, "w") as jf: json.dump(all_bars, jf, indent=2) # no error with floats # Save to CSV csv_file = "MES_5min_data.csv" print(f"Saving {len(all_bars)} bars to {csv_file} ...") with open(csv_file, "w", newline="") as cf: writer = csv.writer(cf) writer.writerow(["contract","barDate","open","high","low","close","volume"]) for row in all_bars: writer.writerow([ row["contract"], row["barDate"], row["open"], row["high"], row["low"], row["close"], row["volume"] ]) print(f"Done! Retrieved {len(all_bars)} total bars.") if __name__ == "__main__": main()