Files
gwitt1Repo/IB_Gateway/attempttofixgwittdata.py

130 lines
4.9 KiB
Python

import json
import datetime
from zoneinfo import ZoneInfo
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from ib_insync import IB, Future, util
# Logger setup (unchanged from your code)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S"))
logger.addHandler(handler)
TZ = ZoneInfo("US/Eastern")
CONFIG = {"MAX_WORKERS": 4, "REQUEST_DELAY": 0.2, "BASE_CLIENT_ID": 100}
def get_third_friday(year, month):
"""Return the third Friday of the given year/month."""
fridays = [d for d in range(1, 32) if datetime.date(year, month, d).weekday() == 4
if datetime.date(year, month, d).month == month]
dt = datetime.datetime.combine(fridays[2] if len(fridays) >= 3 else fridays[-1],
datetime.time(16, 0), tzinfo=TZ)
return dt
def get_contract_for_date(date):
"""Return the MES contract active on the given date."""
months = [3, 6, 9, 12]
year = date.year
month = date.month
# Find the next expiration after the date
for m in months:
exp = get_third_friday(year, m)
if exp > date:
return f"MES{m:02d}{year % 10}", exp
# If past December, roll to next year's March
return f"MES03{(year + 1) % 10}", get_third_friday(year + 1, 3)
def get_mes_contract(ib_conn, cm, expiration):
"""Define and validate an MES contract."""
local_symbol = f"MES{cm[-3:]}{cm[:4][-1]}" # e.g., MESH5
contract = Future(
symbol='MES',
lastTradeDateOrContractMonth=expiration.strftime("%Y%m%d"),
localSymbol=local_symbol,
exchange='CME',
currency='USD',
multiplier=5
)
contract.includeExpired = True
details = ib_conn.reqContractDetails(contract)
return details[0].contract if details else None
def get_data_chunk(ib_conn, contract, start_dt, end_dt):
"""Request historical data for a specific date range."""
bars = ib_conn.reqHistoricalData(
contract,
endDateTime=end_dt.strftime("%Y%m%d %H:%M:%S"),
durationStr=f"{int((end_dt - start_dt).total_seconds() / 86400)} D",
barSizeSetting="5 mins",
whatToShow="TRADES",
useRTH=False,
formatDate=1
)
return [b for b in bars if b.volume > 0] if bars else [] # Filter zero-volume bars
def process_date_range(start_dt, end_dt, client_id):
"""Process data for a date range, dynamically selecting contracts."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ib_conn = IB()
ib_conn.connect('127.0.0.1', 4002, clientId=client_id)
bars_list = []
current_dt = start_dt
while current_dt < end_dt:
cm, expiration = get_contract_for_date(current_dt)
contract = get_mes_contract(ib_conn, cm, expiration)
if not contract:
logger.error(f"Client {client_id}: No contract for {cm}")
current_dt = expiration + datetime.timedelta(days=1)
continue
chunk_end = min(expiration, end_dt)
logger.info(f"Client {client_id}: Pulling {contract.localSymbol} from {current_dt} to {chunk_end}")
bars = get_data_chunk(ib_conn, contract, current_dt, chunk_end)
time.sleep(CONFIG["REQUEST_DELAY"])
bars_list.extend({
'date': bar.date.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S %Z"),
'open': bar.open,
'high': bar.high,
'low': bar.low,
'close': bar.close,
'volume': bar.volume,
'contract': contract.localSymbol
} for bar in bars if start_dt <= bar.date.astimezone(TZ) <= end_dt)
current_dt = expiration + datetime.timedelta(days=1)
ib_conn.disconnect()
return bars_list
if __name__ == "__main__":
start_date = datetime.datetime.now(TZ) - datetime.timedelta(days=3*365)
end_date = datetime.datetime.now(TZ)
logger.info(f"Retrieving MES data from {start_date} to {end_date}")
all_bars = []
with ThreadPoolExecutor(max_workers=CONFIG["MAX_WORKERS"]) as executor:
chunk_size = (end_date - start_date) / CONFIG["MAX_WORKERS"]
futures = [
executor.submit(
process_date_range,
start_date + i * chunk_size,
start_date + (i + 1) * chunk_size if i < CONFIG["MAX_WORKERS"] - 1 else end_date,
CONFIG["BASE_CLIENT_ID"] + i
)
for i in range(CONFIG["MAX_WORKERS"])
]
for future in as_completed(futures):
all_bars.extend(future.result())
final_data = sorted(all_bars, key=lambda x: x['date'])
with open("mes_5min_data_dynamic.json", "w") as f:
json.dump(final_data, f, indent=4)
logger.info(f"Saved {len(final_data)} bars to mes_5min_data_dynamic.json")