import json import datetime import logging import time from zoneinfo import ZoneInfo from concurrent.futures import ThreadPoolExecutor, as_completed import asyncio # For new event loops in threads from ib_insync import IB, Future, util # Try to import tabulate for a nice summary table try: from tabulate import tabulate HAS_TABULATE = True except ImportError: HAS_TABULATE = False # --- CONFIGURATION --- CONFIG = { "MAX_WORKERS": 4, # Number of threads (concurrent workers) "REQUEST_DELAY": 0.2, # Delay in seconds between historical data requests "BASE_CLIENT_ID": 100, # Starting clientId; each thread gets a unique clientId } # --- Custom Colored Logger Setup --- class ColoredFormatter(logging.Formatter): COLORS = { logging.DEBUG: "\033[36m", # Cyan logging.INFO: "\033[32m", # Green logging.WARNING: "\033[33m", # Yellow logging.ERROR: "\033[31m", # Red logging.CRITICAL: "\033[41m", # Red background } RESET = "\033[0m" def format(self, record): color = self.COLORS.get(record.levelno, self.RESET) record.msg = f"{color}{record.msg}{self.RESET}" return super().format(record) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = ColoredFormatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") handler.setFormatter(formatter) logger.addHandler(handler) # --- Global Time Zone --- TZ = ZoneInfo("US/Eastern") # --- Functions --- def get_third_friday(year, month): """Return the third Friday of the given year/month as a datetime with TZ.""" fridays = [] for day in range(1, 32): try: d = datetime.date(year, month, day) except ValueError: break if d.weekday() == 4: fridays.append(d) if len(fridays) >= 3: dt = datetime.datetime.combine(fridays[2], datetime.time(16, 0)) elif fridays: dt = datetime.datetime.combine(fridays[-1], datetime.time(16, 0)) else: dt = datetime.datetime(year, month, 1, 16, 0) return dt.replace(tzinfo=TZ) def generate_contract_months(start_date, end_date): """Generate a sorted list of contract month strings ('YYYYMM') between start_date and end_date.""" months = [3, 6, 9, 12] result = [] for year in range(start_date.year, end_date.year + 2): for m in months: dt = datetime.datetime(year, m, 1, tzinfo=TZ) if dt <= end_date: result.append(f"{year}{m:02d}") return sorted(set(result)) def get_data_chunk(ib_conn, contract, end_dt, duration_str="1 W"): """Request a chunk of historical data for the given contract ending at end_dt using ib_conn.""" try: bars = ib_conn.reqHistoricalData( contract, endDateTime=end_dt.strftime("%Y%m%d %H:%M:%S"), durationStr=duration_str, barSizeSetting="5 mins", whatToShow="TRADES", useRTH=False, formatDate=1 ) return bars except Exception as e: logger.error(f"Error retrieving data for {contract.localSymbol} ending at {end_dt}: {e}") return None def getMESContract(ib_conn, cm, contract_expiration): """ Try several MES contract definitions for the given contract month (cm) and expiration date. Return a tuple (contract, variant_used) or (None, None) if none match. """ expiration_str = contract_expiration.strftime("%Y%m%d") variants = [] # Variant 1: Full expiration date, exchange GLOBEX. contract1 = Future( symbol='MES', lastTradeDateOrContractMonth=expiration_str, exchange='GLOBEX', currency='USD', multiplier=5 ) contract1.includeExpired = True variants.append(("Variant 1: full expiration, GLOBEX", contract1)) # Variant 2: Full expiration date, exchange CME. contract2 = Future( symbol='MES', lastTradeDateOrContractMonth=expiration_str, exchange='CME', currency='USD', multiplier=5 ) contract2.includeExpired = True variants.append(("Variant 2: full expiration, CME", contract2)) # Variant 3: Contract month, exchange GLOBEX, add tradingClass. contract3 = Future( symbol='MES', lastTradeDateOrContractMonth=cm, exchange='GLOBEX', currency='USD', multiplier=5, tradingClass='MES' ) contract3.includeExpired = True variants.append(("Variant 3: contract month, GLOBEX, tradingClass", contract3)) # Variant 4: Contract month, exchange CME, add tradingClass. contract4 = Future( symbol='MES', lastTradeDateOrContractMonth=cm, exchange='CME', currency='USD', multiplier=5, tradingClass='MES' ) contract4.includeExpired = True variants.append(("Variant 4: contract month, CME, tradingClass", contract4)) # Variant 5: Contract month, exchange GLOBEX, with computed localSymbol. month_codes = {1:'F', 2:'G', 3:'H', 4:'J', 5:'K', 6:'M', 7:'N', 8:'Q', 9:'U', 10:'V', 11:'X', 12:'Z'} year = int(cm[:4]) month = int(cm[4:]) local_symbol = f"MES{month_codes.get(month, '')}{str(year)[-1]}" contract5 = Future( symbol='MES', lastTradeDateOrContractMonth=cm, localSymbol=local_symbol, exchange='GLOBEX', currency='USD', multiplier=5 ) contract5.includeExpired = True variants.append(("Variant 5: contract month, GLOBEX, localSymbol", contract5)) for variant_desc, contract in variants: logger.info(f"Trying {variant_desc} for {cm} (expiration: {expiration_str})...") details = ib_conn.reqContractDetails(contract) if details: logger.info(f"Success with {variant_desc}: {details[0].contract}") return details[0].contract, variant_desc else: logger.info(f"{variant_desc} did not return any details.") return None, None def process_month(cm, start_date, end_date, request_delay, client_id): """ Process a single contract month: - Create a new event loop and IB connection (with unique client_id). - Retrieve a valid MES contract. - Request historical data in chunks. - Return a summary dict and list of retrieved bars. """ # Create and set a new event loop in this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) summary = { "Month": cm, "Status": "", "Variant Used": "", "LocalSymbol": "", "Bars Retrieved": 0, "Reason": "" } bars_list = [] try: ib_conn = IB() ib_conn.connect('127.0.0.1', 4002, clientId=client_id) except Exception as e: summary["Status"] = "Skipped" summary["Reason"] = f"Connection error: {e}" logger.error(f"Client {client_id}: Connection error for month {cm}: {e}") return summary, bars_list year = int(cm[:4]) month = int(cm[4:]) contract_expiration = get_third_friday(year, month) contract, variant_used = getMESContract(ib_conn, cm, contract_expiration) if not contract: summary["Status"] = "Skipped" summary["Reason"] = "No valid MES contract found" logger.warning(f"Client {client_id}: No valid MES contract found for {cm}. Skipping.") ib_conn.disconnect() return summary, bars_list summary["Status"] = "Processed" summary["Variant Used"] = variant_used summary["LocalSymbol"] = contract.localSymbol logger.info(f"Client {client_id}: Processing contract {contract.localSymbol} for month {cm} (expiration approx: {contract_expiration.strftime('%Y-%m-%d %H:%M:%S %Z')})") contract_end = min(end_date, contract_expiration) current_end = contract_end chunk_duration = datetime.timedelta(weeks=1) while current_end > start_date: logger.info(f"Client {client_id}: Requesting {contract.localSymbol} data ending at {current_end.strftime('%Y-%m-%d %H:%M:%S %Z')}") bars = get_data_chunk(ib_conn, contract, current_end, duration_str="1 W") time.sleep(request_delay) # Rate limiting if bars is None: logger.error(f"Client {client_id}: Error retrieving chunk; moving to next week.") current_end -= chunk_duration continue if not bars: logger.info(f"Client {client_id}: No data returned for period ending at {current_end.strftime('%Y-%m-%d %H:%M:%S %Z')}; ending requests.") break for bar in bars: bar_time = bar.date.astimezone(TZ) if start_date <= bar_time <= end_date: bars_list.append({ 'date': bar_time.strftime("%Y-%m-%d %H:%M:%S %Z"), 'open': bar.open, 'high': bar.high, 'low': bar.low, 'close': bar.close, 'volume': bar.volume, 'contract': contract.localSymbol }) earliest_time = min(bar.date.astimezone(TZ) for bar in bars) new_end = earliest_time - datetime.timedelta(seconds=1) if new_end >= current_end: break current_end = new_end summary["Bars Retrieved"] = len(bars_list) if len(bars_list) == 0: summary["Reason"] = "No data returned for this contract period" ib_conn.disconnect() return summary, bars_list # --- Main Execution --- if __name__ == "__main__": overall_start_time = time.time() logger.info("Starting retrieval process...") end_date = datetime.datetime.now(TZ) start_date = end_date - datetime.timedelta(days=3*365) logger.info(f"Retrieving MES data from {start_date.strftime('%Y-%m-%d %H:%M:%S %Z')} to {end_date.strftime('%Y-%m-%d %H:%M:%S %Z')}") contract_months = generate_contract_months(start_date, end_date) logger.info(f"Contract months to process: {contract_months}") all_month_summaries = [] all_bars = [] with ThreadPoolExecutor(max_workers=CONFIG["MAX_WORKERS"]) as executor: futures = [] for i, cm in enumerate(contract_months): client_id = CONFIG["BASE_CLIENT_ID"] + i futures.append(executor.submit(process_month, cm, start_date, end_date, CONFIG["REQUEST_DELAY"], client_id)) for future in as_completed(futures): summary, bars = future.result() all_month_summaries.append(summary) all_bars.extend(bars) final_data = sorted(all_bars, key=lambda x: x['date']) expected_bars = int((end_date - start_date).total_seconds() / (5 * 60)) output_filename = "mes_5min_data.json" if final_data: try: with open(output_filename, "w") as f: json.dump(final_data, f, indent=4) logger.info(f"Data successfully saved to {output_filename}") except Exception as e: logger.error(f"Error writing to JSON file: {e}") else: logger.info("No data retrieved. File not saved.") # Build final summary table. summary_rows = [] for s in all_month_summaries: summary_rows.append([s["Month"], s["Status"], s["Variant Used"], s["LocalSymbol"], s["Bars Retrieved"], s["Reason"]]) headers = ["Contract Month", "Status", "Variant Used", "LocalSymbol", "Bars Retrieved", "Reason"] if HAS_TABULATE: table = tabulate(summary_rows, headers=headers, tablefmt="grid") else: header_line = " | ".join(headers) separator = "-" * len(header_line) table = header_line + "\n" + separator + "\n" for row in summary_rows: table += " | ".join(str(item) for item in row) + "\n" logger.info("\n" + table) processed = len([s for s in all_month_summaries if s["Status"] == "Processed"]) skipped = len([s for s in all_month_summaries if s["Status"] == "Skipped"]) logger.info(f"Total contract months processed: {len(contract_months)}") logger.info(f" Processed: {processed} Skipped: {skipped}") logger.info(f"Total bars retrieved: {len(final_data)} (expected ~{expected_bars})") overall_end_time = time.time() runtime_sec = overall_end_time - overall_start_time runtime_str = str(datetime.timedelta(seconds=int(runtime_sec))) logger.info(f"Total runtime: {runtime_str}")