import json import datetime from zoneinfo import ZoneInfo import logging from concurrent.futures import ThreadPoolExecutor, as_completed import asyncio from ib_insync import IB, Future, util # Logger setup (unchanged from your code) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")) logger.addHandler(handler) TZ = ZoneInfo("US/Eastern") CONFIG = {"MAX_WORKERS": 4, "REQUEST_DELAY": 0.2, "BASE_CLIENT_ID": 100} def get_third_friday(year, month): """Return the third Friday of the given year/month.""" fridays = [d for d in range(1, 32) if datetime.date(year, month, d).weekday() == 4 if datetime.date(year, month, d).month == month] dt = datetime.datetime.combine(fridays[2] if len(fridays) >= 3 else fridays[-1], datetime.time(16, 0), tzinfo=TZ) return dt def get_contract_for_date(date): """Return the MES contract active on the given date.""" months = [3, 6, 9, 12] year = date.year month = date.month # Find the next expiration after the date for m in months: exp = get_third_friday(year, m) if exp > date: return f"MES{m:02d}{year % 10}", exp # If past December, roll to next year's March return f"MES03{(year + 1) % 10}", get_third_friday(year + 1, 3) def get_mes_contract(ib_conn, cm, expiration): """Define and validate an MES contract.""" local_symbol = f"MES{cm[-3:]}{cm[:4][-1]}" # e.g., MESH5 contract = Future( symbol='MES', lastTradeDateOrContractMonth=expiration.strftime("%Y%m%d"), localSymbol=local_symbol, exchange='CME', currency='USD', multiplier=5 ) contract.includeExpired = True details = ib_conn.reqContractDetails(contract) return details[0].contract if details else None def get_data_chunk(ib_conn, contract, start_dt, end_dt): """Request historical data for a specific date range.""" bars = ib_conn.reqHistoricalData( contract, endDateTime=end_dt.strftime("%Y%m%d %H:%M:%S"), durationStr=f"{int((end_dt - start_dt).total_seconds() / 86400)} D", barSizeSetting="5 mins", whatToShow="TRADES", useRTH=False, formatDate=1 ) return [b for b in bars if b.volume > 0] if bars else [] # Filter zero-volume bars def process_date_range(start_dt, end_dt, client_id): """Process data for a date range, dynamically selecting contracts.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ib_conn = IB() ib_conn.connect('127.0.0.1', 4002, clientId=client_id) bars_list = [] current_dt = start_dt while current_dt < end_dt: cm, expiration = get_contract_for_date(current_dt) contract = get_mes_contract(ib_conn, cm, expiration) if not contract: logger.error(f"Client {client_id}: No contract for {cm}") current_dt = expiration + datetime.timedelta(days=1) continue chunk_end = min(expiration, end_dt) logger.info(f"Client {client_id}: Pulling {contract.localSymbol} from {current_dt} to {chunk_end}") bars = get_data_chunk(ib_conn, contract, current_dt, chunk_end) time.sleep(CONFIG["REQUEST_DELAY"]) bars_list.extend({ 'date': bar.date.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S %Z"), 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume, 'contract': contract.localSymbol } for bar in bars if start_dt <= bar.date.astimezone(TZ) <= end_dt) current_dt = expiration + datetime.timedelta(days=1) ib_conn.disconnect() return bars_list if __name__ == "__main__": start_date = datetime.datetime.now(TZ) - datetime.timedelta(days=3*365) end_date = datetime.datetime.now(TZ) logger.info(f"Retrieving MES data from {start_date} to {end_date}") all_bars = [] with ThreadPoolExecutor(max_workers=CONFIG["MAX_WORKERS"]) as executor: chunk_size = (end_date - start_date) / CONFIG["MAX_WORKERS"] futures = [ executor.submit( process_date_range, start_date + i * chunk_size, start_date + (i + 1) * chunk_size if i < CONFIG["MAX_WORKERS"] - 1 else end_date, CONFIG["BASE_CLIENT_ID"] + i ) for i in range(CONFIG["MAX_WORKERS"]) ] for future in as_completed(futures): all_bars.extend(future.result()) final_data = sorted(all_bars, key=lambda x: x['date']) with open("mes_5min_data_dynamic.json", "w") as f: json.dump(final_data, f, indent=4) logger.info(f"Saved {len(final_data)} bars to mes_5min_data_dynamic.json")