testing griffins priviledges
This commit is contained in:
129
IB_Gateway/attempttofixgwittdata.py
Normal file
129
IB_Gateway/attempttofixgwittdata.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import json
|
||||
import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import asyncio
|
||||
from ib_insync import IB, Future, util
|
||||
|
||||
# Logger setup (unchanged from your code)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S"))
|
||||
logger.addHandler(handler)
|
||||
|
||||
TZ = ZoneInfo("US/Eastern")
|
||||
CONFIG = {"MAX_WORKERS": 4, "REQUEST_DELAY": 0.2, "BASE_CLIENT_ID": 100}
|
||||
|
||||
def get_third_friday(year, month):
|
||||
"""Return the third Friday of the given year/month."""
|
||||
fridays = [d for d in range(1, 32) if datetime.date(year, month, d).weekday() == 4
|
||||
if datetime.date(year, month, d).month == month]
|
||||
dt = datetime.datetime.combine(fridays[2] if len(fridays) >= 3 else fridays[-1],
|
||||
datetime.time(16, 0), tzinfo=TZ)
|
||||
return dt
|
||||
|
||||
def get_contract_for_date(date):
|
||||
"""Return the MES contract active on the given date."""
|
||||
months = [3, 6, 9, 12]
|
||||
year = date.year
|
||||
month = date.month
|
||||
# Find the next expiration after the date
|
||||
for m in months:
|
||||
exp = get_third_friday(year, m)
|
||||
if exp > date:
|
||||
return f"MES{m:02d}{year % 10}", exp
|
||||
# If past December, roll to next year's March
|
||||
return f"MES03{(year + 1) % 10}", get_third_friday(year + 1, 3)
|
||||
|
||||
def get_mes_contract(ib_conn, cm, expiration):
|
||||
"""Define and validate an MES contract."""
|
||||
local_symbol = f"MES{cm[-3:]}{cm[:4][-1]}" # e.g., MESH5
|
||||
contract = Future(
|
||||
symbol='MES',
|
||||
lastTradeDateOrContractMonth=expiration.strftime("%Y%m%d"),
|
||||
localSymbol=local_symbol,
|
||||
exchange='CME',
|
||||
currency='USD',
|
||||
multiplier=5
|
||||
)
|
||||
contract.includeExpired = True
|
||||
details = ib_conn.reqContractDetails(contract)
|
||||
return details[0].contract if details else None
|
||||
|
||||
def get_data_chunk(ib_conn, contract, start_dt, end_dt):
|
||||
"""Request historical data for a specific date range."""
|
||||
bars = ib_conn.reqHistoricalData(
|
||||
contract,
|
||||
endDateTime=end_dt.strftime("%Y%m%d %H:%M:%S"),
|
||||
durationStr=f"{int((end_dt - start_dt).total_seconds() / 86400)} D",
|
||||
barSizeSetting="5 mins",
|
||||
whatToShow="TRADES",
|
||||
useRTH=False,
|
||||
formatDate=1
|
||||
)
|
||||
return [b for b in bars if b.volume > 0] if bars else [] # Filter zero-volume bars
|
||||
|
||||
def process_date_range(start_dt, end_dt, client_id):
|
||||
"""Process data for a date range, dynamically selecting contracts."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
ib_conn = IB()
|
||||
ib_conn.connect('127.0.0.1', 4002, clientId=client_id)
|
||||
|
||||
bars_list = []
|
||||
current_dt = start_dt
|
||||
while current_dt < end_dt:
|
||||
cm, expiration = get_contract_for_date(current_dt)
|
||||
contract = get_mes_contract(ib_conn, cm, expiration)
|
||||
if not contract:
|
||||
logger.error(f"Client {client_id}: No contract for {cm}")
|
||||
current_dt = expiration + datetime.timedelta(days=1)
|
||||
continue
|
||||
|
||||
chunk_end = min(expiration, end_dt)
|
||||
logger.info(f"Client {client_id}: Pulling {contract.localSymbol} from {current_dt} to {chunk_end}")
|
||||
bars = get_data_chunk(ib_conn, contract, current_dt, chunk_end)
|
||||
time.sleep(CONFIG["REQUEST_DELAY"])
|
||||
|
||||
bars_list.extend({
|
||||
'date': bar.date.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S %Z"),
|
||||
'open': bar.open,
|
||||
'high': bar.high,
|
||||
'low': bar.low,
|
||||
'close': bar.close,
|
||||
'volume': bar.volume,
|
||||
'contract': contract.localSymbol
|
||||
} for bar in bars if start_dt <= bar.date.astimezone(TZ) <= end_dt)
|
||||
|
||||
current_dt = expiration + datetime.timedelta(days=1)
|
||||
|
||||
ib_conn.disconnect()
|
||||
return bars_list
|
||||
|
||||
if __name__ == "__main__":
|
||||
start_date = datetime.datetime.now(TZ) - datetime.timedelta(days=3*365)
|
||||
end_date = datetime.datetime.now(TZ)
|
||||
logger.info(f"Retrieving MES data from {start_date} to {end_date}")
|
||||
|
||||
all_bars = []
|
||||
with ThreadPoolExecutor(max_workers=CONFIG["MAX_WORKERS"]) as executor:
|
||||
chunk_size = (end_date - start_date) / CONFIG["MAX_WORKERS"]
|
||||
futures = [
|
||||
executor.submit(
|
||||
process_date_range,
|
||||
start_date + i * chunk_size,
|
||||
start_date + (i + 1) * chunk_size if i < CONFIG["MAX_WORKERS"] - 1 else end_date,
|
||||
CONFIG["BASE_CLIENT_ID"] + i
|
||||
)
|
||||
for i in range(CONFIG["MAX_WORKERS"])
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
all_bars.extend(future.result())
|
||||
|
||||
final_data = sorted(all_bars, key=lambda x: x['date'])
|
||||
with open("mes_5min_data_dynamic.json", "w") as f:
|
||||
json.dump(final_data, f, indent=4)
|
||||
logger.info(f"Saved {len(final_data)} bars to mes_5min_data_dynamic.json")
|
||||
Reference in New Issue
Block a user