274 lines
9.3 KiB
Python
274 lines
9.3 KiB
Python
import threading
|
||
import time
|
||
import csv
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from ibapi.client import EClient
|
||
from ibapi.wrapper import EWrapper
|
||
from ibapi.contract import Contract
|
||
|
||
################################################################################
|
||
# 1) CONTRACT DEFINITION & ROLLOVER
|
||
################################################################################
|
||
|
||
# Approximate 3rd-Friday expiration dates for quarterly MES futures from June 2022 -> Mar 2025
|
||
quarter_expirations = {
|
||
"202206": datetime(2022, 6, 17),
|
||
"202209": datetime(2022, 9, 16),
|
||
"202212": datetime(2022, 12, 16),
|
||
"202303": datetime(2023, 3, 17),
|
||
"202306": datetime(2023, 6, 16),
|
||
"202309": datetime(2023, 9, 15),
|
||
"202312": datetime(2023, 12, 15),
|
||
"202403": datetime(2024, 3, 15),
|
||
"202406": datetime(2024, 6, 21),
|
||
"202409": datetime(2024, 9, 20),
|
||
"202412": datetime(2024, 12, 20),
|
||
"202503": datetime(2025, 3, 21),
|
||
}
|
||
|
||
# Build “rolling windows”: from (previous expiration + 1 day) to (this expiration)
|
||
sorted_contracts = sorted(quarter_expirations.items(), key=lambda kv: kv[0])
|
||
rolling_windows = []
|
||
prev_exp = None
|
||
for c_str, exp_date in sorted_contracts:
|
||
if prev_exp is None:
|
||
# For the first contract, just assume it might start ~90 days before
|
||
start_active = exp_date - timedelta(days=90)
|
||
else:
|
||
start_active = prev_exp + timedelta(days=1)
|
||
end_active = exp_date
|
||
prev_exp = exp_date
|
||
rolling_windows.append((c_str, start_active, end_active))
|
||
|
||
# Our overall request window
|
||
overall_start = datetime(2022, 4, 1)
|
||
overall_end = datetime(2025, 4, 1)
|
||
|
||
def get_contract_intervals():
|
||
"""
|
||
Return a list of (contractMonth, interval_start, interval_end)
|
||
that fall within [overall_start, overall_end].
|
||
"""
|
||
intervals = []
|
||
for (c_str, c_start, c_end) in rolling_windows:
|
||
if c_end <= overall_start:
|
||
continue
|
||
if c_start >= overall_end:
|
||
continue
|
||
eff_start = max(c_start, overall_start)
|
||
eff_end = min(c_end, overall_end)
|
||
if eff_start < eff_end:
|
||
intervals.append((c_str, eff_start, eff_end))
|
||
return intervals
|
||
|
||
|
||
################################################################################
|
||
# 2) IB APP & CONNECTION
|
||
################################################################################
|
||
|
||
class HistoricalApp(EWrapper, EClient):
|
||
def __init__(self):
|
||
EClient.__init__(self, self)
|
||
self.connection_ready = threading.Event()
|
||
self.data_event = threading.Event()
|
||
self.current_req_id = None
|
||
self.historical_data = []
|
||
self.error_flag = False
|
||
|
||
def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""):
|
||
# Filter typical farm messages
|
||
if errorCode in (2104, 2106, 2107, 2108):
|
||
return
|
||
|
||
print(f"ERROR {reqId} {errorCode}: {errorString}")
|
||
# Connection failure
|
||
if reqId == -1 and errorCode == 502:
|
||
self.connection_ready.set()
|
||
self.error_flag = True
|
||
return
|
||
# If it's for our current request, set error_flag
|
||
if reqId == self.current_req_id:
|
||
self.error_flag = True
|
||
self.data_event.set()
|
||
|
||
def nextValidId(self, orderId):
|
||
print(f"Connected. NextValidId={orderId}")
|
||
self.connection_ready.set()
|
||
|
||
def historicalData(self, reqId, bar):
|
||
if reqId == self.current_req_id:
|
||
self.historical_data.append(bar)
|
||
|
||
def historicalDataEnd(self, reqId, start, end):
|
||
if reqId == self.current_req_id:
|
||
print(" Chunk complete.")
|
||
self.data_event.set()
|
||
|
||
def ensure_connected(app, timeout=15):
|
||
if not app.connection_ready.wait(timeout=timeout):
|
||
raise RuntimeError("Timed out waiting for IB connection.")
|
||
|
||
|
||
################################################################################
|
||
# 3) MAIN SCRIPT: FETCH 5-MINUTE RTH BARS, CASTING DECIMALS TO FLOAT, SAVE JSON/CSV
|
||
################################################################################
|
||
|
||
def main():
|
||
intervals = get_contract_intervals()
|
||
|
||
# Start up the IB API
|
||
app = HistoricalApp()
|
||
print("Connecting to IB Gateway (paper port=4002)...")
|
||
app.connect("127.0.0.1", 4002, clientId=101)
|
||
|
||
api_thread = threading.Thread(target=app.run, daemon=True)
|
||
api_thread.start()
|
||
|
||
try:
|
||
ensure_connected(app, 15)
|
||
if app.error_flag:
|
||
print("Connection error. Exiting.")
|
||
app.disconnect()
|
||
return
|
||
except RuntimeError as e:
|
||
print(e)
|
||
app.disconnect()
|
||
return
|
||
|
||
all_bars = []
|
||
req_id = 1
|
||
|
||
for (c_month, start_dt, end_dt) in intervals:
|
||
contract = Contract()
|
||
contract.symbol = "MES"
|
||
contract.secType = "FUT"
|
||
contract.exchange = "CME" # or GLOBEX if that works better for you
|
||
contract.currency = "USD"
|
||
contract.includeExpired = True
|
||
contract.lastTradeDateOrContractMonth = c_month # e.g. "202306"
|
||
|
||
# Iterate in ~30-day chunks
|
||
chunk_start = start_dt
|
||
print(f"\nContract {c_month}, active {start_dt.date()} -> {end_dt.date()} (RTH).")
|
||
|
||
while chunk_start < end_dt:
|
||
chunk_end = chunk_start + timedelta(days=29)
|
||
if chunk_end > end_dt:
|
||
chunk_end = end_dt
|
||
|
||
# IB UTC format => "YYYYMMDD-HH:mm:ss", e.g. "20230430-00:00:00"
|
||
end_str = chunk_end.strftime("%Y%m%d-00:00:00")
|
||
days_count = (chunk_end - chunk_start).days + 1
|
||
duration_str = f"{days_count} D"
|
||
|
||
# Prepare request
|
||
app.historical_data.clear()
|
||
app.error_flag = False
|
||
app.data_event.clear()
|
||
app.current_req_id = req_id
|
||
|
||
print(f" Requesting {c_month} from {chunk_start.date()} to {chunk_end.date()}"
|
||
f" -> end='{end_str}', dur='{duration_str}'")
|
||
|
||
app.reqHistoricalData(
|
||
reqId=req_id,
|
||
contract=contract,
|
||
endDateTime=end_str,
|
||
durationStr=duration_str,
|
||
barSizeSetting="5 mins",
|
||
whatToShow="TRADES",
|
||
useRTH=1, # Only regular trading hours
|
||
formatDate=1,
|
||
keepUpToDate=False,
|
||
chartOptions=[]
|
||
)
|
||
|
||
finished = app.data_event.wait(60)
|
||
if not finished:
|
||
print(f" Timeout on request {req_id}. Breaking out for contract {c_month}.")
|
||
break
|
||
if app.error_flag:
|
||
print(f" Error on request {req_id}. Aborting {c_month}.")
|
||
break
|
||
|
||
# Store chunk data
|
||
for bar in app.historical_data:
|
||
# Convert bar fields from Decimal -> float, volume -> int (if safe)
|
||
try:
|
||
o = float(bar.open)
|
||
h = float(bar.high)
|
||
l = float(bar.low)
|
||
c_ = float(bar.close)
|
||
v = float(bar.volume) # or int(bar.volume) if you prefer
|
||
except:
|
||
# fallback to string if something is weird
|
||
o = str(bar.open)
|
||
h = str(bar.high)
|
||
l = str(bar.low)
|
||
c_ = str(bar.close)
|
||
v = str(bar.volume)
|
||
|
||
all_bars.append({
|
||
"contract": c_month,
|
||
"barDate": bar.date, # might include time zone suffix
|
||
"open": o,
|
||
"high": h,
|
||
"low": l,
|
||
"close": c_,
|
||
"volume": v
|
||
})
|
||
|
||
req_id += 1
|
||
chunk_start = chunk_end + timedelta(days=1)
|
||
time.sleep(2) # pacing
|
||
|
||
# Done
|
||
app.disconnect()
|
||
api_thread.join(2)
|
||
|
||
# We’ll parse bar.date if needed to sort chronologically
|
||
# Some bars have "YYYYMMDD HH:MM:SS" or "YYYYMMDD" or "YYYYMMDD HH:MM:SS US/Central"
|
||
def parse_bar_date(d_str):
|
||
d_str = " ".join(d_str.strip().split()) # remove double spaces
|
||
parts = d_str.split()
|
||
if len(parts) == 1:
|
||
# daily => "YYYYMMDD"
|
||
return datetime.strptime(d_str, "%Y%m%d")
|
||
elif len(parts) == 3:
|
||
# e.g. "20230810 09:30:00 US/Central"
|
||
dt_part = " ".join(parts[:2]) # "20230810 09:30:00"
|
||
return datetime.strptime(dt_part, "%Y%m%d %H:%M:%S")
|
||
else:
|
||
# e.g. "20230810 09:30:00"
|
||
return datetime.strptime(d_str, "%Y%m%d %H:%M:%S")
|
||
|
||
all_bars.sort(key=lambda x: (x["contract"], parse_bar_date(x["barDate"])))
|
||
|
||
# Save to JSON
|
||
json_file = "MES_5min_data.json"
|
||
print(f"\nSaving {len(all_bars)} bars to {json_file} ...")
|
||
with open(json_file, "w") as jf:
|
||
json.dump(all_bars, jf, indent=2) # no error with floats
|
||
|
||
# Save to CSV
|
||
csv_file = "MES_5min_data.csv"
|
||
print(f"Saving {len(all_bars)} bars to {csv_file} ...")
|
||
with open(csv_file, "w", newline="") as cf:
|
||
writer = csv.writer(cf)
|
||
writer.writerow(["contract","barDate","open","high","low","close","volume"])
|
||
for row in all_bars:
|
||
writer.writerow([
|
||
row["contract"], row["barDate"],
|
||
row["open"], row["high"],
|
||
row["low"], row["close"],
|
||
row["volume"]
|
||
])
|
||
|
||
print(f"Done! Retrieved {len(all_bars)} total bars.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|