130 lines
4.9 KiB
Python
130 lines
4.9 KiB
Python
import json
|
|
import datetime
|
|
from zoneinfo import ZoneInfo
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import asyncio
|
|
from ib_insync import IB, Future, util
|
|
|
|
# Logger setup (unchanged from your code)
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S"))
|
|
logger.addHandler(handler)
|
|
|
|
TZ = ZoneInfo("US/Eastern")
|
|
CONFIG = {"MAX_WORKERS": 4, "REQUEST_DELAY": 0.2, "BASE_CLIENT_ID": 100}
|
|
|
|
def get_third_friday(year, month):
|
|
"""Return the third Friday of the given year/month."""
|
|
fridays = [d for d in range(1, 32) if datetime.date(year, month, d).weekday() == 4
|
|
if datetime.date(year, month, d).month == month]
|
|
dt = datetime.datetime.combine(fridays[2] if len(fridays) >= 3 else fridays[-1],
|
|
datetime.time(16, 0), tzinfo=TZ)
|
|
return dt
|
|
|
|
def get_contract_for_date(date):
|
|
"""Return the MES contract active on the given date."""
|
|
months = [3, 6, 9, 12]
|
|
year = date.year
|
|
month = date.month
|
|
# Find the next expiration after the date
|
|
for m in months:
|
|
exp = get_third_friday(year, m)
|
|
if exp > date:
|
|
return f"MES{m:02d}{year % 10}", exp
|
|
# If past December, roll to next year's March
|
|
return f"MES03{(year + 1) % 10}", get_third_friday(year + 1, 3)
|
|
|
|
def get_mes_contract(ib_conn, cm, expiration):
|
|
"""Define and validate an MES contract."""
|
|
local_symbol = f"MES{cm[-3:]}{cm[:4][-1]}" # e.g., MESH5
|
|
contract = Future(
|
|
symbol='MES',
|
|
lastTradeDateOrContractMonth=expiration.strftime("%Y%m%d"),
|
|
localSymbol=local_symbol,
|
|
exchange='CME',
|
|
currency='USD',
|
|
multiplier=5
|
|
)
|
|
contract.includeExpired = True
|
|
details = ib_conn.reqContractDetails(contract)
|
|
return details[0].contract if details else None
|
|
|
|
def get_data_chunk(ib_conn, contract, start_dt, end_dt):
|
|
"""Request historical data for a specific date range."""
|
|
bars = ib_conn.reqHistoricalData(
|
|
contract,
|
|
endDateTime=end_dt.strftime("%Y%m%d %H:%M:%S"),
|
|
durationStr=f"{int((end_dt - start_dt).total_seconds() / 86400)} D",
|
|
barSizeSetting="5 mins",
|
|
whatToShow="TRADES",
|
|
useRTH=False,
|
|
formatDate=1
|
|
)
|
|
return [b for b in bars if b.volume > 0] if bars else [] # Filter zero-volume bars
|
|
|
|
def process_date_range(start_dt, end_dt, client_id):
|
|
"""Process data for a date range, dynamically selecting contracts."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
ib_conn = IB()
|
|
ib_conn.connect('127.0.0.1', 4002, clientId=client_id)
|
|
|
|
bars_list = []
|
|
current_dt = start_dt
|
|
while current_dt < end_dt:
|
|
cm, expiration = get_contract_for_date(current_dt)
|
|
contract = get_mes_contract(ib_conn, cm, expiration)
|
|
if not contract:
|
|
logger.error(f"Client {client_id}: No contract for {cm}")
|
|
current_dt = expiration + datetime.timedelta(days=1)
|
|
continue
|
|
|
|
chunk_end = min(expiration, end_dt)
|
|
logger.info(f"Client {client_id}: Pulling {contract.localSymbol} from {current_dt} to {chunk_end}")
|
|
bars = get_data_chunk(ib_conn, contract, current_dt, chunk_end)
|
|
time.sleep(CONFIG["REQUEST_DELAY"])
|
|
|
|
bars_list.extend({
|
|
'date': bar.date.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S %Z"),
|
|
'open': bar.open,
|
|
'high': bar.high,
|
|
'low': bar.low,
|
|
'close': bar.close,
|
|
'volume': bar.volume,
|
|
'contract': contract.localSymbol
|
|
} for bar in bars if start_dt <= bar.date.astimezone(TZ) <= end_dt)
|
|
|
|
current_dt = expiration + datetime.timedelta(days=1)
|
|
|
|
ib_conn.disconnect()
|
|
return bars_list
|
|
|
|
if __name__ == "__main__":
|
|
start_date = datetime.datetime.now(TZ) - datetime.timedelta(days=3*365)
|
|
end_date = datetime.datetime.now(TZ)
|
|
logger.info(f"Retrieving MES data from {start_date} to {end_date}")
|
|
|
|
all_bars = []
|
|
with ThreadPoolExecutor(max_workers=CONFIG["MAX_WORKERS"]) as executor:
|
|
chunk_size = (end_date - start_date) / CONFIG["MAX_WORKERS"]
|
|
futures = [
|
|
executor.submit(
|
|
process_date_range,
|
|
start_date + i * chunk_size,
|
|
start_date + (i + 1) * chunk_size if i < CONFIG["MAX_WORKERS"] - 1 else end_date,
|
|
CONFIG["BASE_CLIENT_ID"] + i
|
|
)
|
|
for i in range(CONFIG["MAX_WORKERS"])
|
|
]
|
|
for future in as_completed(futures):
|
|
all_bars.extend(future.result())
|
|
|
|
final_data = sorted(all_bars, key=lambda x: x['date'])
|
|
with open("mes_5min_data_dynamic.json", "w") as f:
|
|
json.dump(final_data, f, indent=4)
|
|
logger.info(f"Saved {len(final_data)} bars to mes_5min_data_dynamic.json")
|