Files
gwitt1Repo/IB_Gateway/test3.py
2025-04-15 02:24:21 +00:00

274 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
import csv
import json
from datetime import datetime, timedelta
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
################################################################################
# 1) CONTRACT DEFINITION & ROLLOVER
################################################################################
# Approximate 3rd-Friday expiration dates for quarterly MES futures from June 2022 -> Mar 2025
quarter_expirations = {
"202206": datetime(2022, 6, 17),
"202209": datetime(2022, 9, 16),
"202212": datetime(2022, 12, 16),
"202303": datetime(2023, 3, 17),
"202306": datetime(2023, 6, 16),
"202309": datetime(2023, 9, 15),
"202312": datetime(2023, 12, 15),
"202403": datetime(2024, 3, 15),
"202406": datetime(2024, 6, 21),
"202409": datetime(2024, 9, 20),
"202412": datetime(2024, 12, 20),
"202503": datetime(2025, 3, 21),
}
# Build “rolling windows”: from (previous expiration + 1 day) to (this expiration)
sorted_contracts = sorted(quarter_expirations.items(), key=lambda kv: kv[0])
rolling_windows = []
prev_exp = None
for c_str, exp_date in sorted_contracts:
if prev_exp is None:
# For the first contract, just assume it might start ~90 days before
start_active = exp_date - timedelta(days=90)
else:
start_active = prev_exp + timedelta(days=1)
end_active = exp_date
prev_exp = exp_date
rolling_windows.append((c_str, start_active, end_active))
# Our overall request window
overall_start = datetime(2022, 4, 1)
overall_end = datetime(2025, 4, 1)
def get_contract_intervals():
"""
Return a list of (contractMonth, interval_start, interval_end)
that fall within [overall_start, overall_end].
"""
intervals = []
for (c_str, c_start, c_end) in rolling_windows:
if c_end <= overall_start:
continue
if c_start >= overall_end:
continue
eff_start = max(c_start, overall_start)
eff_end = min(c_end, overall_end)
if eff_start < eff_end:
intervals.append((c_str, eff_start, eff_end))
return intervals
################################################################################
# 2) IB APP & CONNECTION
################################################################################
class HistoricalApp(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.connection_ready = threading.Event()
self.data_event = threading.Event()
self.current_req_id = None
self.historical_data = []
self.error_flag = False
def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""):
# Filter typical farm messages
if errorCode in (2104, 2106, 2107, 2108):
return
print(f"ERROR {reqId} {errorCode}: {errorString}")
# Connection failure
if reqId == -1 and errorCode == 502:
self.connection_ready.set()
self.error_flag = True
return
# If it's for our current request, set error_flag
if reqId == self.current_req_id:
self.error_flag = True
self.data_event.set()
def nextValidId(self, orderId):
print(f"Connected. NextValidId={orderId}")
self.connection_ready.set()
def historicalData(self, reqId, bar):
if reqId == self.current_req_id:
self.historical_data.append(bar)
def historicalDataEnd(self, reqId, start, end):
if reqId == self.current_req_id:
print(" Chunk complete.")
self.data_event.set()
def ensure_connected(app, timeout=15):
if not app.connection_ready.wait(timeout=timeout):
raise RuntimeError("Timed out waiting for IB connection.")
################################################################################
# 3) MAIN SCRIPT: FETCH 5-MINUTE RTH BARS, CASTING DECIMALS TO FLOAT, SAVE JSON/CSV
################################################################################
def main():
intervals = get_contract_intervals()
# Start up the IB API
app = HistoricalApp()
print("Connecting to IB Gateway (paper port=4002)...")
app.connect("127.0.0.1", 4002, clientId=101)
api_thread = threading.Thread(target=app.run, daemon=True)
api_thread.start()
try:
ensure_connected(app, 15)
if app.error_flag:
print("Connection error. Exiting.")
app.disconnect()
return
except RuntimeError as e:
print(e)
app.disconnect()
return
all_bars = []
req_id = 1
for (c_month, start_dt, end_dt) in intervals:
contract = Contract()
contract.symbol = "MES"
contract.secType = "FUT"
contract.exchange = "CME" # or GLOBEX if that works better for you
contract.currency = "USD"
contract.includeExpired = True
contract.lastTradeDateOrContractMonth = c_month # e.g. "202306"
# Iterate in ~30-day chunks
chunk_start = start_dt
print(f"\nContract {c_month}, active {start_dt.date()} -> {end_dt.date()} (RTH).")
while chunk_start < end_dt:
chunk_end = chunk_start + timedelta(days=29)
if chunk_end > end_dt:
chunk_end = end_dt
# IB UTC format => "YYYYMMDD-HH:mm:ss", e.g. "20230430-00:00:00"
end_str = chunk_end.strftime("%Y%m%d-00:00:00")
days_count = (chunk_end - chunk_start).days + 1
duration_str = f"{days_count} D"
# Prepare request
app.historical_data.clear()
app.error_flag = False
app.data_event.clear()
app.current_req_id = req_id
print(f" Requesting {c_month} from {chunk_start.date()} to {chunk_end.date()}"
f" -> end='{end_str}', dur='{duration_str}'")
app.reqHistoricalData(
reqId=req_id,
contract=contract,
endDateTime=end_str,
durationStr=duration_str,
barSizeSetting="5 mins",
whatToShow="TRADES",
useRTH=1, # Only regular trading hours
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
finished = app.data_event.wait(60)
if not finished:
print(f" Timeout on request {req_id}. Breaking out for contract {c_month}.")
break
if app.error_flag:
print(f" Error on request {req_id}. Aborting {c_month}.")
break
# Store chunk data
for bar in app.historical_data:
# Convert bar fields from Decimal -> float, volume -> int (if safe)
try:
o = float(bar.open)
h = float(bar.high)
l = float(bar.low)
c_ = float(bar.close)
v = float(bar.volume) # or int(bar.volume) if you prefer
except:
# fallback to string if something is weird
o = str(bar.open)
h = str(bar.high)
l = str(bar.low)
c_ = str(bar.close)
v = str(bar.volume)
all_bars.append({
"contract": c_month,
"barDate": bar.date, # might include time zone suffix
"open": o,
"high": h,
"low": l,
"close": c_,
"volume": v
})
req_id += 1
chunk_start = chunk_end + timedelta(days=1)
time.sleep(2) # pacing
# Done
app.disconnect()
api_thread.join(2)
# Well parse bar.date if needed to sort chronologically
# Some bars have "YYYYMMDD HH:MM:SS" or "YYYYMMDD" or "YYYYMMDD HH:MM:SS US/Central"
def parse_bar_date(d_str):
d_str = " ".join(d_str.strip().split()) # remove double spaces
parts = d_str.split()
if len(parts) == 1:
# daily => "YYYYMMDD"
return datetime.strptime(d_str, "%Y%m%d")
elif len(parts) == 3:
# e.g. "20230810 09:30:00 US/Central"
dt_part = " ".join(parts[:2]) # "20230810 09:30:00"
return datetime.strptime(dt_part, "%Y%m%d %H:%M:%S")
else:
# e.g. "20230810 09:30:00"
return datetime.strptime(d_str, "%Y%m%d %H:%M:%S")
all_bars.sort(key=lambda x: (x["contract"], parse_bar_date(x["barDate"])))
# Save to JSON
json_file = "MES_5min_data.json"
print(f"\nSaving {len(all_bars)} bars to {json_file} ...")
with open(json_file, "w") as jf:
json.dump(all_bars, jf, indent=2) # no error with floats
# Save to CSV
csv_file = "MES_5min_data.csv"
print(f"Saving {len(all_bars)} bars to {csv_file} ...")
with open(csv_file, "w", newline="") as cf:
writer = csv.writer(cf)
writer.writerow(["contract","barDate","open","high","low","close","volume"])
for row in all_bars:
writer.writerow([
row["contract"], row["barDate"],
row["open"], row["high"],
row["low"], row["close"],
row["volume"]
])
print(f"Done! Retrieved {len(all_bars)} total bars.")
if __name__ == "__main__":
main()