removed griffin-stuff

This commit is contained in:
klein panic
2025-01-29 19:24:55 -05:00
parent 5f8e3f4c68
commit c39d19a17a
28 changed files with 0 additions and 1935429 deletions

View File

@@ -1,246 +0,0 @@
import numpy as np
import pandas as pd
import yfinance as yf
from scipy.optimize import minimize
def ticker_info():
ticker = "gush"
return ticker.upper()
def fetch_expiration_dates(ticker):
print(f"Fetching available expiration dates for {ticker}...")
stock = yf.Ticker(ticker)
expiration_dates = stock.options
print(f"Available expiration dates: {expiration_dates}")
return expiration_dates
def select_expiration_date(expiration_dates):
print("Selecting the first available expiration date...")
expiration_date = expiration_dates[0]
print(f"Selected expiration date: {expiration_date}")
return expiration_date
def fetch_option_chain(ticker, expiration_date):
print(f"Fetching option chain for {ticker} with expiration date {expiration_date}...")
stock = yf.Ticker(ticker)
options_chain = stock.option_chain(expiration_date)
print("Option chain fetched successfully!")
return options_chain
def get_price_data(ticker, start_date, end_date):
print(f"Fetching price data for {ticker} from {start_date} to {end_date}...")
data = yf.download(ticker, start=start_date, end=end_date)
print(f"Price data fetched successfully for {ticker}!")
return data
def moving_average_strategy(data, short_window=20, long_window=50):
data['Short_MA'] = data['Close'].rolling(window=short_window).mean()
data['Long_MA'] = data['Close'].rolling(window=long_window).mean()
data['Signal'] = np.where(data['Short_MA'] > data['Long_MA'], 1, -1)
return data['Signal']
def rsi_strategy(data, window=14, overbought=70, oversold=30):
delta = data['Close'].diff(1)
gain = np.where(delta > 0, delta, 0).flatten() # Flatten to 1D array
loss = np.where(delta < 0, abs(delta), 0).flatten() # Flatten to 1D array
avg_gain = pd.Series(gain).rolling(window=window).mean()
avg_loss = pd.Series(loss).rolling(window=window).mean()
# Avoid division by zero by using np.where to replace 0 with np.nan in avg_loss
rs = avg_gain / np.where(avg_loss == 0, np.nan, avg_loss)
rsi = 100 - (100 / (1 + rs))
signal = np.where(rsi < oversold, 1, np.where(rsi > overbought, -1, 0))
return pd.Series(signal, index=data.index)
def bollinger_bands_strategy(data, window=20, num_std=2):
# Calculate moving average
data['Moving_Avg'] = data['Close'].rolling(window=window).mean()
# Calculate rolling standard deviation and force it to be a Series
rolling_std = data['Close'].rolling(window).std()
rolling_std = rolling_std.squeeze() # Ensure rolling_std is a Series
# Print shapes for debugging
print(f"Shape of Moving_Avg: {data['Moving_Avg'].shape}")
print(f"Shape of Rolling Std: {rolling_std.shape}")
# Calculate upper and lower bands
data['Band_Upper'] = data['Moving_Avg'] + (num_std * rolling_std)
data['Band_Lower'] = data['Moving_Avg'] - (num_std * rolling_std)
# Print shapes after assignments for debugging
print(f"Shape of Band_Upper: {data['Band_Upper'].shape}")
print(f"Shape of Band_Lower: {data['Band_Lower'].shape}")
# Check for NaN values
print(f"NaNs in Close: {data['Close'].isna().sum()}")
print(f"NaNs in Band_Upper: {data['Band_Upper'].isna().sum()}")
print(f"NaNs in Band_Lower: {data['Band_Lower'].isna().sum()}")
# Print the columns of the DataFrame
print(f"Columns in data before dropping NaNs: {data.columns.tolist()}")
# Optionally drop rows with NaNs
data = data.dropna(subset=['Close', 'Band_Upper', 'Band_Lower'])
# Generate signals based on the bands
signal = np.where(data['Close'] < data['Band_Lower'], 1,
np.where(data['Close'] > data['Band_Upper'], -1, 0))
return pd.Series(signal, index=data.index)
def generate_signals(data):
ma_signal = moving_average_strategy(data)
rsi_signal = rsi_strategy(data)
bollinger_signal = bollinger_bands_strategy(data)
return pd.DataFrame({'MA': ma_signal, 'RSI': rsi_signal, 'Bollinger': bollinger_signal})
def backtest_option_trades(option_chain, signals, stock_data):
"""
Backtest option trades based on the given signals and stock data.
"""
trades = []
current_position = None
# Ensure both stock_data and option_chain indices are sorted in ascending order
stock_data = stock_data.sort_index()
# Convert 'lastTradeDate' or any date-related columns to datetime in option_chain
if 'lastTradeDate' in option_chain.columns:
option_chain['lastTradeDate'] = pd.to_datetime(option_chain['lastTradeDate'])
option_chain = option_chain.set_index('lastTradeDate')
# If option_chain index isn't datetime, convert it to datetime (ensuring compatibility)
option_chain.index = pd.to_datetime(option_chain.index)
# Remove the timezone from option_chain index
option_chain.index = option_chain.index.tz_localize(None)
# Now reindex the option chain to match the stock data index (forward fill missing option prices)
option_chain = option_chain.sort_index()
option_chain = option_chain.reindex(stock_data.index, method='ffill')
for i in range(len(signals)):
if signals.iloc[i]['MA'] == 1 and current_position is None:
# BUY signal
entry_price = option_chain['lastPrice'].iloc[i]
if pd.isna(entry_price): # If price is nan, log the error and continue
print(f"Missing entry price on {stock_data.index[i]}, skipping trade.")
continue
entry_date = stock_data.index[i]
current_position = {
'entry_price': entry_price,
'entry_date': entry_date
}
print(f"BUY signal on {entry_date}: Entry Price = {entry_price}")
elif signals.iloc[i]['MA'] == -1 and current_position is not None:
# SELL signal
exit_price = option_chain['lastPrice'].iloc[i]
if pd.isna(exit_price): # If price is nan, log the error and continue
print(f"Missing exit price on {stock_data.index[i]}, skipping trade.")
continue
exit_date = stock_data.index[i]
pnl = (exit_price - current_position['entry_price']) * 100
print(f"SELL signal on {exit_date}: Exit Price = {exit_price}, P&L = {pnl}")
trades.append({
'entry_date': current_position['entry_date'],
'entry_price': current_position['entry_price'],
'exit_date': exit_date,
'exit_price': exit_price,
'pnl': pnl
})
current_position = None
cumulative_pnl = sum(trade['pnl'] for trade in trades)
total_wins = sum(1 for trade in trades if trade['pnl'] > 0)
total_trades = len(trades)
win_rate = total_wins / total_trades if total_trades > 0 else 0
return cumulative_pnl, trades, win_rate
def objective_function_profit(weights, strategy_signals, data, option_chain):
weights = np.array(weights)
weights /= np.sum(weights) # Normalize weights
weighted_signals = np.sum([signal * weight for signal, weight in zip(strategy_signals.T.values, weights)], axis=0)
# Since `backtest_option_trades` returns 3 values, we only unpack those
cumulative_pnl, _, _ = backtest_option_trades(option_chain, weighted_signals, data)
# Return negative cumulative P&L to maximize profit
return -cumulative_pnl
def optimize_weights(strategy_signals, data, option_chain):
initial_weights = [1 / len(strategy_signals.columns)] * len(strategy_signals.columns)
constraints = ({'type': 'eq', 'fun': lambda weights: np.sum(weights) - 1})
bounds = [(0, 1)] * len(strategy_signals.columns)
result = minimize(objective_function_profit, initial_weights, args=(strategy_signals, data, option_chain),
method='SLSQP', bounds=bounds, constraints=constraints)
return result.x # Optimal weights
def weighted_signal_combination(strategy_signals, weights):
weighted_signals = np.sum([signal * weight for signal, weight in zip(strategy_signals.T.values, weights)], axis=0)
return weighted_signals
def main_decision(weighted_signals):
last_signal = weighted_signals[-1] # Latest signal
if last_signal > 0:
return "BUY"
elif last_signal < 0:
return "SELL"
else:
return "HOLD"
def run_backtest():
ticker = ticker_info()
expiration_dates = fetch_expiration_dates(ticker)
expiration_date = select_expiration_date(expiration_dates)
options_chain = fetch_option_chain(ticker, expiration_date)
# Fetch training data
train_data = get_price_data(ticker, '2010-01-01', '2022-01-01')
# Generate signals
strategy_signals_train = generate_signals(train_data)
# Optimize weights
optimal_weights = optimize_weights(strategy_signals_train, train_data, options_chain.calls)
# Fetch test data
test_data = get_price_data(ticker, '2022-01-02', '2024-01-01')
# Generate test signals
strategy_signals_test = generate_signals(test_data)
# Combine signals and backtest
weighted_signals = weighted_signal_combination(strategy_signals_test, optimal_weights)
cumulative_pnl, trades, win_rate = backtest_option_trades(options_chain.calls, weighted_signals, test_data)
# Make final decision
decision = main_decision(weighted_signals)
print(f"Final decision: {decision}")
# Output results
print(f"Cumulative P&L: {cumulative_pnl}")
print(f"Win Rate: {win_rate * 100:.2f}%")
# Call the main function
run_backtest()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,78 +0,0 @@
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping
# Load the training and testing data
training_data = pd.read_csv("3_years_training_data.csv")
testing_data = pd.read_csv("3_month_testing_data.csv")
# Drop unnecessary columns
training_data = training_data.drop(columns=["Unnamed: 0", "Date"])
testing_data = testing_data.drop(columns=["Unnamed: 0", "Date"])
# Create lagged features for the model
def create_lagged_features(data, n_lags=3):
df = data.copy()
for lag in range(1, n_lags + 1):
df[f'Close_lag_{lag}'] = df['Close'].shift(lag)
df.dropna(inplace=True) # Remove rows with NaN values due to shifting
return df
# Apply lagged features to the training and testing datasets
training_data = create_lagged_features(training_data)
testing_data = create_lagged_features(testing_data)
# Separate features and target
X_train = training_data.drop(columns=["Close"]).values
y_train = training_data["Close"].values
X_test = testing_data.drop(columns=["Close"]).values
y_test = testing_data["Close"].values
# Standardize the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Build the neural network model
model = Sequential([
Dense(64, activation='sigmoid', input_shape=(X_train.shape[1],)),
Dense(32, activation='sigmoid'),
Dense(16, activation='sigmoid'),
Dense(1) # Output layer for regression
])
# Compile the model
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
# Use early stopping to prevent overfitting
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
# Train the model
history = model.fit(
X_train, y_train,
epochs=100,
batch_size=32,
validation_split=0.2,
callbacks=[early_stopping],
verbose=1
)
# Evaluate the model on the test set
y_pred = model.predict(X_test).flatten()
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
print(f"Neural Network MSE: {mse:.2f}")
print(f"Neural Network MAE: {mae:.2f}")
# Prepare the latest data to predict tomorrow's price
latest_data = testing_data.tail(1).drop(columns=["Close"])
latest_data_scaled = scaler.transform(latest_data)
# Predict tomorrow's close price
tomorrow_pred = model.predict(latest_data_scaled)
print(f"Predicted Close Price for Tomorrow: {tomorrow_pred[0][0]:.2f}")

View File

@@ -1,47 +0,0 @@
absl-py==2.1.0
astunparse==1.6.3
certifi==2024.8.30
charset-normalizer==3.4.0
flatbuffers==24.3.25
gast==0.6.0
google-pasta==0.2.0
grpcio==1.67.1
h5py==3.12.1
ibapi==9.81.1.post1
idna==3.10
importlib_metadata==8.5.0
joblib==1.4.2
keras==3.6.0
libclang==18.1.1
Markdown==3.7
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mdurl==0.1.2
ml-dtypes==0.4.1
namex==0.0.8
numpy==2.0.2
opt_einsum==3.4.0
optree==0.13.0
packaging==24.1
pandas==2.2.3
protobuf==5.28.3
Pygments==2.18.0
python-dateutil==2.9.0.post0
pytz==2024.2
requests==2.32.3
rich==13.9.4
scikit-learn==1.5.2
scipy==1.13.1
six==1.16.0
tensorboard==2.18.0
tensorboard-data-server==0.7.2
tensorflow==2.18.0
tensorflow-io-gcs-filesystem==0.37.1
termcolor==2.5.0
threadpoolctl==3.5.0
typing_extensions==4.12.2
tzdata==2024.2
urllib3==2.2.3
Werkzeug==3.1.1
wrapt==1.16.0
zipp==3.20.2

View File

@@ -1,129 +0,0 @@
import pandas as pd
import matplotlib.pyplot as plt
import json
import os
import datetime
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from indicators import add_indicators
from strategy import generate_signals
from backtester import backtest
from optimizer import parameter_search
from indicator_sets import indicator_sets
def load_config(config_path="config.json"):
with open(config_path, 'r') as f:
return json.load(f)
def load_data(data_path):
df = pd.read_csv(data_path, parse_dates=['Date'], index_col='Date')
df = df.sort_index()
required_cols = ['Open','High','Low','Close','Volume']
if not all(col in df.columns for col in required_cols):
raise ValueError("Data file must contain Date,Open,High,Low,Close,Volume columns.")
return df
def visualize_data_with_indicators(df):
fig, axes = plt.subplots(4, 1, figsize=(12, 10), sharex=True)
axes[0].plot(df.index, df['Close'], label='Close', color='black')
axes[0].plot(df.index, df['EMA'], label='EMA', color='blue', alpha=0.7)
axes[0].set_title('Price and EMA')
axes[0].legend()
axes[1].plot(df.index, df['RSI'], label='RSI', color='green')
axes[1].axhline(70, color='red', linestyle='--')
axes[1].axhline(30, color='green', linestyle='--')
axes[1].set_title('RSI')
axes[2].plot(df.index, df['MACD'], label='MACD', color='purple')
axes[2].axhline(0, color='red', linestyle='--')
axes[2].set_title('MACD')
axes[3].plot(df.index, df['ADX'], label='ADX', color='brown')
axes[3].axhline(20, color='grey', linestyle='--')
axes[3].axhline(25, color='grey', linestyle='--')
axes[3].set_title('ADX')
plt.tight_layout()
plt.show()
def log_results(message, log_file="indicator_test_results.log"):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, "a") as f:
f.write(f"{timestamp} - {message}\n")
def main():
config = load_config("config.json")
data_path = os.path.join("data", "SPY_5min_preprocessed.csv")
df = load_data(data_path)
# Add core indicators and visualize
df = add_indicators(df, config)
visualize_data_with_indicators(df)
# Generate signals and backtest
df = generate_signals(df, config)
results = backtest(df, config)
print("Backtest Results:")
print(results)
# Parameter optimization example
param_grid = {
"rsi_threshold_bearish": [65, 70, 75],
"rsi_threshold_bullish": [25, 30, 35]
}
best_params, best_performance = parameter_search(df, config, param_grid)
print("Best Parameters Found:", best_params)
print("Best Performance (Final Equity):", best_performance)
# Now test multiple indicator sets for classification accuracy
log_file = "indicator_test_results.log"
with open(log_file, "w") as f:
f.write("Indicator Test Results Log\n")
# Create prediction target: next candle up or down
df['Future_Close'] = df['Close'].shift(-1)
df['Up_Indicator'] = (df['Future_Close'] > df['Close']).astype(int)
df = df.dropna(subset=['Future_Close'])
train_size = int(len(df)*0.7)
df_train = df.iloc[:train_size].copy()
df_test = df.iloc[train_size:].copy()
for set_name, func in indicator_sets.items():
# Apply the indicator set to train/test
train = df_train.copy()
test = df_test.copy()
train = func(train)
test = func(test)
# Ensure columns align
test = test.reindex(columns=train.columns)
test = test.dropna()
if len(test) == 0 or len(train) == 0:
log_results(f"{set_name}: Not enough data after adding indicators.", log_file)
continue
base_cols = ['Open','High','Low','Close','Volume','Future_Close','Up_Indicator']
feature_cols = [c for c in train.columns if c not in base_cols]
X_train = train[feature_cols]
y_train = train['Up_Indicator']
X_test = test[feature_cols]
y_test = test['Up_Indicator']
# Train a simple logistic regression model
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
result_message = f"{set_name}: Accuracy = {acc:.4f}"
print(result_message)
log_results(result_message, log_file)
if __name__ == "__main__":
main()

View File

@@ -1,32 +0,0 @@
{
"data": {
"input_csv": "data/SPY_5min_preprocessed.csv",
"date_column": "Date",
"price_column": "Close",
"high_column": "High",
"low_column": "Low",
"volume_column": "Volume"
},
"indicators": {
"trend": ["SMA", "EMA", "ADX"],
"momentum": ["RSI", "MACD"],
"volatility": ["BollingerBands"],
"volume": ["OBV"],
"mean_reversion": ["MeanReversionSignal"]
},
"parameters": {
"SMA": {"window": 20},
"EMA": {"window": 20},
"ADX": {"window": 14},
"RSI": {"window": 14, "overbought": 70, "oversold": 30},
"MACD": {"fastperiod": 12, "slowperiod": 26, "signalperiod": 9},
"BollingerBands": {"window": 20, "std_dev": 2},
"OBV": {},
"MeanReversionSignal": {"window": 10}
},
"evaluation": {
"prediction_horizon": 1,
"log_file": "logs/results.log"
}
}

View File

@@ -1,32 +0,0 @@
{
"data": {
"input_csv": "data/price_data.csv",
"date_column": "Date",
"price_column": "Close",
"high_column": "High",
"low_column": "Low",
"volume_column": "Volume"
},
"indicators": {
"trend": ["SMA", "EMA", "ADX"],
"momentum": ["RSI", "MACD"],
"volatility": ["BollingerBands"],
"volume": ["OBV"],
"mean_reversion": ["MeanReversionSignal"]
},
"parameters": {
"SMA": {"window": 20},
"EMA": {"window": 20},
"ADX": {"window": 14},
"RSI": {"window": 14, "overbought": 70, "oversold": 30},
"MACD": {"fastperiod": 12, "slowperiod": 26, "signalperiod": 9},
"BollingerBands": {"window": 20, "std_dev": 2},
"OBV": {},
"MeanReversionSignal": {"window": 10}
},
"evaluation": {
"prediction_horizon": 1,
"log_file": "logs/results.log"
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,32 +0,0 @@
import numpy as np
def evaluate_indicator_accuracy(df, price_col="Close", horizon=1):
"""
Evaluate how often indicator signals predict the correct next-day price direction.
Logic:
- If signal[i] = 1 (bullish), correct if price[i+horizon] > price[i].
- If signal[i] = -1 (bearish), correct if price[i+horizon] < price[i].
- If signal[i] = 0, skip.
"""
correct = 0
total = 0
for i in range(len(df) - horizon):
sig = df['signal'].iloc[i]
if sig == 0:
continue
future_price = df[price_col].iloc[i + horizon]
current_price = df[price_col].iloc[i]
if sig == 1 and future_price > current_price:
correct += 1
elif sig == -1 and future_price < current_price:
correct += 1
if sig != 0:
total += 1
if total == 0:
return np.nan # No signals generated
return correct / total

View File

@@ -1,96 +0,0 @@
import pandas as pd
import numpy as np
import ta
def calculate_indicator_signals(df, indicator_name, params, price_col="Close", high_col="High", low_col="Low", volume_col="Volume"):
"""
Calculates indicator values and generates signals:
Signal Convention: 1 = Bullish Prediction, -1 = Bearish Prediction, 0 = Neutral
"""
if price_col not in df.columns:
raise ValueError(f"{price_col} column not found in the dataframe.")
if indicator_name == "SMA":
# Trend: price > SMA => bullish, else bearish
window = params.get("window", 20)
df['SMA'] = df[price_col].rolling(window).mean()
df['signal'] = np.where(df[price_col] > df['SMA'], 1, -1)
elif indicator_name == "EMA":
# Trend: price > EMA => bullish, else bearish
window = params.get("window", 20)
df['EMA'] = df[price_col].ewm(span=window, adjust=False).mean()
df['signal'] = np.where(df[price_col] > df['EMA'], 1, -1)
elif indicator_name == "ADX":
# Trend: use ADXIndicator
if high_col not in df.columns or low_col not in df.columns:
raise ValueError("ADX calculation requires 'High' and 'Low' columns.")
window = params.get("window", 14)
adx_indicator = ta.trend.ADXIndicator(high=df[high_col], low=df[low_col], close=df[price_col], window=window)
df['ADX'] = adx_indicator.adx()
df['DIP'] = adx_indicator.adx_pos() # +DI
df['DIN'] = adx_indicator.adx_neg() # -DI
# If ADX > 25 and DI+ > DI- => bullish
# If ADX > 25 and DI- > DI+ => bearish
# Otherwise => no strong signal
df['signal'] = 0
trending_up = (df['DIP'] > df['DIN']) & (df['ADX'] > 25)
trending_down = (df['DIN'] > df['DIP']) & (df['ADX'] > 25)
df.loc[trending_up, 'signal'] = 1
df.loc[trending_down, 'signal'] = -1
elif indicator_name == "RSI":
# Momentum: RSI > overbought => bearish, RSI < oversold => bullish
window = params.get("window", 14)
overbought = params.get("overbought", 70)
oversold = params.get("oversold", 30)
df['RSI'] = ta.momentum.rsi(df[price_col], window=window)
conditions = [
(df['RSI'] > overbought),
(df['RSI'] < oversold)
]
values = [-1, 1]
df['signal'] = np.select(conditions, values, default=0)
elif indicator_name == "MACD":
# Momentum: MACD line > Signal line => bullish, else bearish
fastperiod = params.get("fastperiod", 12)
slowperiod = params.get("slowperiod", 26)
signalperiod = params.get("signalperiod", 9)
macd = ta.trend.MACD(df[price_col], window_slow=slowperiod, window_fast=fastperiod, window_sign=signalperiod)
df['MACD'] = macd.macd()
df['MACD_Signal'] = macd.macd_signal()
df['signal'] = np.where(df['MACD'] > df['MACD_Signal'], 1, -1)
elif indicator_name == "BollingerBands":
# Volatility: price near upper band => bearish, near lower band => bullish
window = params.get("window", 20)
std_dev = params.get("std_dev", 2)
bb = ta.volatility.BollingerBands(df[price_col], window=window, window_dev=std_dev)
df['BB_High'] = bb.bollinger_hband()
df['BB_Low'] = bb.bollinger_lband()
df['signal'] = np.where(df[price_col] >= df['BB_High'], -1,
np.where(df[price_col] <= df['BB_Low'], 1, 0))
elif indicator_name == "OBV":
# Volume: Rising OBV => bullish, falling OBV => bearish
if volume_col not in df.columns:
raise ValueError(f"OBV calculation requires '{volume_col}' column.")
df['OBV'] = ta.volume.on_balance_volume(df[price_col], df[volume_col])
df['OBV_Change'] = df['OBV'].diff()
df['signal'] = np.where(df['OBV_Change'] > 0, 1, np.where(df['OBV_Change'] < 0, -1, 0))
elif indicator_name == "MeanReversionSignal":
# Mean Reversion: price > mean => bearish, price < mean => bullish
window = params.get("window", 10)
df['mean'] = df[price_col].rolling(window).mean()
df['signal'] = np.where(df[price_col] > df['mean'], -1,
np.where(df[price_col] < df['mean'], 1, 0))
else:
raise ValueError(f"Unknown indicator: {indicator_name}")
return df

View File

@@ -1,24 +0,0 @@
2024-12-13 22:07:39,152 - INFO - Category: trend, Indicator: SMA, Accuracy: 0.3166
2024-12-13 22:07:40,855 - INFO - Category: trend, Indicator: EMA, Accuracy: 0.3160
2024-12-13 22:10:04,274 - INFO - Category: trend, Indicator: SMA, Accuracy: 0.3166
2024-12-13 22:10:05,997 - INFO - Category: trend, Indicator: EMA, Accuracy: 0.3160
2024-12-13 22:10:07,745 - INFO - Category: trend, Indicator: ADX, Accuracy: 0.2696
2024-12-13 22:10:08,484 - INFO - Category: momentum, Indicator: RSI, Accuracy: 0.2495
2024-12-13 22:10:09,096 - INFO - Category: volatility, Indicator: BollingerBands, Accuracy: 0.3114
2024-12-13 22:10:11,937 - INFO - Category: volume, Indicator: OBV, Accuracy: 0.3167
2024-12-13 22:10:15,386 - INFO - Category: mean_reversion, Indicator: MeanReversionSignal, Accuracy: 0.3330
2024-12-13 22:12:44,520 - INFO - Category: trend, Indicator: SMA, Accuracy: 0.3166
2024-12-13 22:12:45,874 - INFO - Category: trend, Indicator: EMA, Accuracy: 0.3160
2024-12-13 22:12:47,913 - INFO - Category: trend, Indicator: ADX, Accuracy: 0.2696
2024-12-13 22:12:48,530 - INFO - Category: momentum, Indicator: RSI, Accuracy: 0.2495
2024-12-13 22:12:49,173 - INFO - Category: volatility, Indicator: BollingerBands, Accuracy: 0.3114
2024-12-13 22:12:51,230 - INFO - Category: volume, Indicator: OBV, Accuracy: 0.3167
2024-12-13 22:12:54,504 - INFO - Category: mean_reversion, Indicator: MeanReversionSignal, Accuracy: 0.3330
2024-12-13 22:23:17,293 - INFO - Category: trend, Indicator: SMA, Accuracy: 0.3166
2024-12-13 22:23:18,087 - INFO - Category: trend, Indicator: EMA, Accuracy: 0.3160
2024-12-13 22:23:19,409 - INFO - Category: trend, Indicator: ADX, Accuracy: 0.2696
2024-12-13 22:23:19,797 - INFO - Category: momentum, Indicator: RSI, Accuracy: 0.2495
2024-12-13 22:23:20,669 - INFO - Category: momentum, Indicator: MACD, Accuracy: 0.3184
2024-12-13 22:23:20,993 - INFO - Category: volatility, Indicator: BollingerBands, Accuracy: 0.3114
2024-12-13 22:23:21,786 - INFO - Category: volume, Indicator: OBV, Accuracy: 0.3167
2024-12-13 22:23:22,678 - INFO - Category: mean_reversion, Indicator: MeanReversionSignal, Accuracy: 0.3330

View File

@@ -1,8 +0,0 @@
2024-12-13 22:23:17,293 - INFO - Category: trend, Indicator: SMA, Accuracy: 0.3166
2024-12-13 22:23:18,087 - INFO - Category: trend, Indicator: EMA, Accuracy: 0.3160
2024-12-13 22:23:19,409 - INFO - Category: trend, Indicator: ADX, Accuracy: 0.2696
2024-12-13 22:23:19,797 - INFO - Category: momentum, Indicator: RSI, Accuracy: 0.2495
2024-12-13 22:23:20,669 - INFO - Category: momentum, Indicator: MACD, Accuracy: 0.3184
2024-12-13 22:23:20,993 - INFO - Category: volatility, Indicator: BollingerBands, Accuracy: 0.3114
2024-12-13 22:23:21,786 - INFO - Category: volume, Indicator: OBV, Accuracy: 0.3167
2024-12-13 22:23:22,678 - INFO - Category: mean_reversion, Indicator: MeanReversionSignal, Accuracy: 0.3330

View File

@@ -1,65 +0,0 @@
import json
import logging
import pandas as pd
import os
from indicators import calculate_indicator_signals
from evaluation import evaluate_indicator_accuracy
def setup_logging(log_path):
os.makedirs(os.path.dirname(log_path), exist_ok=True)
logging.basicConfig(
filename=log_path,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def load_config(config_path="config.json"):
with open(config_path, 'r') as f:
config = json.load(f)
return config
def load_data(csv_path, date_col, price_col):
df = pd.read_csv(csv_path)
df[date_col] = pd.to_datetime(df[date_col])
df = df.sort_values(date_col).reset_index(drop=True)
df = df.dropna(subset=[date_col, price_col])
return df
if __name__ == "__main__":
config = load_config("config.json")
setup_logging(config["evaluation"]["log_file"])
# Load data
df = load_data(config["data"]["input_csv"],
config["data"]["date_column"],
config["data"]["price_column"])
# Calculate indicators and signals, evaluate accuracy
all_results = []
for category, indicators in config["indicators"].items():
for ind_name in indicators:
params = config["parameters"].get(ind_name, {})
signals_df = calculate_indicator_signals(
df.copy(),
indicator_name=ind_name,
params=params,
price_col=config["data"]["price_column"],
high_col=config["data"]["high_column"],
low_col=config["data"]["low_column"],
volume_col=config["data"]["volume_column"]
)
accuracy = evaluate_indicator_accuracy(
signals_df,
price_col=config["data"]["price_column"],
horizon=config["evaluation"]["prediction_horizon"]
)
logging.info(f"Category: {category}, Indicator: {ind_name}, Accuracy: {accuracy:.4f}")
all_results.append((category, ind_name, accuracy))
# Print results to console as well
for category, ind_name, acc in all_results:
print(f"Category: {category}, Indicator: {ind_name}, Accuracy: {acc:.4f}")

View File

@@ -1,233 +0,0 @@
import signal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
import pandas as pd
from datetime import datetime, timedelta, timezone
from tqdm import tqdm # For progress bar
import os
class IBApi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = []
self.df = pd.DataFrame()
self.data_retrieved = False
def historicalData(self, reqId, bar):
# Debug: Print each received bar
print(f"Received bar: Date={bar.date}, Open={bar.open}, High={bar.high}, Low={bar.low}, Close={bar.close}, Volume={bar.volume}")
self.data.append({
"Date": bar.date,
"Open": bar.open,
"High": bar.high,
"Low": bar.low,
"Close": bar.close,
"Volume": bar.volume
})
def historicalDataEnd(self, reqId, start, end):
# Debug: Indicate end of data reception
print(f"HistoricalDataEnd received. Start: {start}, End: {end}. Number of bars fetched: {len(self.data)}")
chunk_df = pd.DataFrame(self.data)
if not chunk_df.empty:
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
else:
print("No data received in this request.")
self.data_retrieved = True
self.data = [] # Reset data list for next request
class IBApp:
def __init__(self):
self.app = IBApi()
def connect(self):
# Connect to IB API (ensure IB Gateway or TWS is running)
print("Connecting to IB API...")
self.app.connect("127.0.0.1", 4002, clientId=1)
# Start the API thread
thread = threading.Thread(target=self.run_app, daemon=True)
thread.start()
time.sleep(1) # Allow time for connection
print("Connected to IB API.")
def run_app(self):
self.app.run()
def request_data(self, contract, end_date, duration, bar_size):
# Request historical data
print(f"Requesting data: endDateTime={end_date}, durationStr={duration}, barSizeSetting={bar_size}")
self.app.reqHistoricalData(
reqId=1,
contract=contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow="TRADES",
useRTH=1, # Use regular trading hours
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
# Wait until data is retrieved
while not self.app.data_retrieved:
time.sleep(0.1)
self.app.data_retrieved = False # Reset flag for next request
def fetch_historical_data_yearly(self, symbol, sec_type, exchange, currency, start_date, end_date, bar_size="1 day"):
"""
Fetch historical data in yearly chunks to cover 3 years.
"""
try:
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
delta = timedelta(days=365)
current_end_date = end_date
total_years = 3 # Fetch 3 years of data
with tqdm(total=total_years, desc="Fetching Data", unit="year") as pbar:
for _ in range(total_years):
current_start_date = current_end_date - delta
end_date_str = current_end_date.strftime("%Y%m%d %H:%M:%S UTC")
self.request_data(contract, end_date_str, "1 Y", bar_size)
pbar.update(1)
current_end_date = current_start_date
time.sleep(1) # Respect IB API pacing
except Exception as e:
print(f"Error fetching data: {e}")
def fetch_historical_data(self, symbol, sec_type, exchange, currency, existing_df=None):
"""
Fetch historical data for the given symbol.
If existing_df is provided, fetch data after the last date in existing_df.
Otherwise, fetch the entire 3 years of data.
"""
try:
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
bar_size = "1 day" # Set bar size to 1 day for daily data
duration = "1 Y" # Fetch 1 year at a time
if existing_df is not None and not existing_df.empty:
# Get the last date from existing data
last_date_str = existing_df['Date'].iloc[-1]
# Clean up the date string to have single space
last_date_str = last_date_str.strip().replace(' ', ' ')
# Parse the last date as timezone-aware datetime (assuming UTC)
try:
# Try parsing in 'YYYYMMDD HH:MM:SS' format
last_date = datetime.strptime(last_date_str, "%Y%m%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
try:
# If that fails, try 'YYYY-MM-DD HH:MM:SS' format
last_date = datetime.strptime(last_date_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
print(f"Error parsing last_date_str: {last_date_str}")
return
# Remove any future dates if present
current_time = datetime.now(timezone.utc)
existing_df = existing_df[existing_df['Date'] <= current_time]
print(f"Last valid date after cleaning: {last_date.strftime('%Y-%m-%d %H:%M:%S')}")
# Fetch new data in yearly chunks
# Since we need 3 years of data, and assuming existing_df has some, adjust accordingly
# For simplicity, fetch the entire 3 years again
# Alternatively, fetch data from last_date forward
# Here, we'll fetch 3 years of data up to current_date
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=365 * 3)
self.fetch_historical_data_yearly(symbol, sec_type, exchange, currency, start_date, end_date, bar_size)
else:
# No existing data, fetch all 3 years
end_date = datetime.now(timezone.utc)
self.fetch_historical_data_yearly(symbol, sec_type, exchange, currency, end_date - timedelta(days=365*3), end_date, bar_size)
except Exception as e:
print(f"Error fetching data: {e}")
def disconnect(self):
self.app.disconnect()
print("Disconnected from IB API.")
def get_user_input():
print("Provide the stock details for historical data retrieval.")
try:
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
sec_type = "STK" # Automatically set to Stock
exchange = "SMART" # Automatically set to SMART routing
currency = "USD" # Automatically set to USD
if not symbol:
raise ValueError("Stock symbol is required. Please try again.")
return symbol, sec_type, exchange, currency
except Exception as e:
print(f"Input Error: {e}")
return None
def graceful_exit(signal_received, frame):
print("\nTerminating program...")
app.disconnect()
exit(0)
# Handle graceful exit on Ctrl+C
signal.signal(signal.SIGINT, graceful_exit)
# Initialize and connect the IBApp
app = IBApp()
app.connect()
try:
user_input = get_user_input()
if user_input:
symbol, sec_type, exchange, currency = user_input
# Define the filename (save directly in current directory)
filename = f"{symbol}_3yr_daily_data.csv"
# Fetch historical data
app.fetch_historical_data(symbol, sec_type, exchange, currency)
# Retrieve fetched data
data = app.app.df
if not data.empty:
print(f"Number of data points fetched: {len(data)}")
# Clean and parse the 'Date' column
# Attempt multiple formats
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
# Check if timezone is present; if not, localize to UTC
if data['Date'].dt.tz is None:
data['Date'] = data['Date'].dt.tz_localize(timezone.utc, ambiguous='NaT', nonexistent='NaT')
# Remove any rows with NaT in 'Date'
data.dropna(subset=['Date'], inplace=True)
# Sort by 'Date' ascending
data.sort_values(by='Date', inplace=True)
# Reset index
data.reset_index(drop=True, inplace=True)
# Save to CSV
data.to_csv(filename, index=False)
print(f"Data saved to {filename}.")
print(data.head())
else:
print("No data retrieved.")
except Exception as e:
print(f"Error: {e}")
finally:
app.disconnect()

View File

@@ -1,144 +0,0 @@
import signal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
import pandas as pd
from datetime import datetime, timedelta, timezone
from tqdm import tqdm # For progress bar
class IBApi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = []
self.df = pd.DataFrame()
self.data_retrieved = False
def historicalData(self, reqId, bar):
self.data.append({
"Date": bar.date,
"Open": bar.open,
"High": bar.high,
"Low": bar.low,
"Close": bar.close,
"Volume": bar.volume
})
def historicalDataEnd(self, reqId, start, end):
chunk_df = pd.DataFrame(self.data)
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
self.data_retrieved = True
self.data = []
class IBApp:
def __init__(self):
self.app = IBApi()
def connect(self):
self.app.connect("127.0.0.1", 4002, clientId=1)
thread = threading.Thread(target=self.run_app, daemon=True)
thread.start()
time.sleep(1)
def run_app(self):
self.app.run()
def request_data(self, contract, end_date, duration, bar_size):
self.app.reqHistoricalData(
reqId=1,
contract=contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow="TRADES",
useRTH=0,
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
# Ensure pacing between API calls
while not self.app.data_retrieved:
time.sleep(0.1)
def fetch_historical_data(self, symbol, sec_type, exchange, currency):
try:
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
# Set duration and bar size
duration = "1 D" # 1 day chunks
bar_size = "5 mins" # 1-minute intervals
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=365) #Can multiply for more years
total_days = (end_date - start_date).days
with tqdm(total=total_days, desc="Fetching Data", unit="day") as pbar:
current_date = end_date
while current_date > start_date:
end_date_str = current_date.strftime("%Y%m%d %H:%M:%S UTC")
try:
self.request_data(contract, end_date_str, duration, bar_size)
pbar.update(1)
time.sleep(5) # Sleep to avoid pacing violations
except Exception as e:
print(f"Error fetching data for {end_date_str}: {e}")
current_date -= timedelta(days=1)
except Exception as e:
print(f"Error fetching data: {e}")
def disconnect(self):
self.app.disconnect()
def get_user_input():
print("Provide the stock details for historical data retrieval.")
try:
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
sec_type = "STK" # Automatically set to Stock
exchange = "SMART" # Automatically set to SMART routing
currency = "USD" # Automatically set to USD
if not symbol:
raise ValueError("Stock symbol is required. Please try again.")
return symbol, sec_type, exchange, currency
except Exception as e:
print(f"Input Error: {e}")
return None
def graceful_exit(signal_received, frame):
print("\nTerminating program...")
app.disconnect()
exit(0)
signal.signal(signal.SIGINT, graceful_exit)
app = IBApp()
app.connect()
try:
user_input = get_user_input()
if user_input:
symbol, sec_type, exchange, currency = user_input
app.fetch_historical_data(symbol, sec_type, exchange, currency)
data = app.app.df
if not data.empty:
filename = f"{symbol}_1yr_5min_data.csv"
data.to_csv(filename, index=False)
print(f"Data saved to {filename}.")
print(data.head())
else:
print("No data retrieved.")
except Exception as e:
print(f"Error: {e}")
finally:
app.disconnect()

View File

@@ -1,24 +0,0 @@
import pandas as pd
# Define the path to your CSV file
csv_file_path = 'C:/Users/gwitt/MidasTechnologies/API/SPY_3yr_5min_data.csv' # Replace with your actual file path
df = pd.read_csv(csv_file_path)
# Step 2: Preprocess the data
# Parse the 'Date' column to datetime and set as index
df['Date'] = pd.to_datetime(df['Date'].str.strip(), format='%Y%m%d %H:%M:%S')
df.set_index('Date', inplace=True)
# Sort data in chronological order
df.sort_index(inplace=True)
# Handle missing data by forward filling
df.ffill(inplace=True)
# Step 3: Save preprocessed data to a new CSV file
preprocessed_file_path = 'SPY_5min_preprocessed.csv' # Replace with your desired path
df.to_csv(preprocessed_file_path)
print(f"Preprocessed data saved to {preprocessed_file_path}")

View File

@@ -1,145 +0,0 @@
import signal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
import pandas as pd
from datetime import datetime, timezone
from tqdm import tqdm # For progress bar
class IBApi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = []
self.df = pd.DataFrame()
self.data_retrieved = False
def historicalData(self, reqId, bar):
self.data.append({
"Date": bar.date,
"Open": bar.open,
"High": bar.high,
"Low": bar.low,
"Close": bar.close,
"Volume": bar.volume
})
def historicalDataEnd(self, reqId, start, end):
chunk_df = pd.DataFrame(self.data)
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
self.data_retrieved = True
self.data = []
class IBApp:
def __init__(self):
self.app = IBApi()
def connect(self):
self.app.connect("127.0.0.1", 4002, clientId=1)
thread = threading.Thread(target=self.run_app, daemon=True)
thread.start()
time.sleep(1)
def run_app(self):
self.app.run()
def request_data(self, contract, end_date, duration, bar_size):
self.app.reqHistoricalData(
reqId=1,
contract=contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow="TRADES",
useRTH=0,
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
# Ensure pacing between API calls
while not self.app.data_retrieved:
time.sleep(0.1)
def fetch_options_data(self, symbol, exchange, currency, right, strike, expiry):
try:
contract = Contract()
contract.symbol = symbol
contract.secType = "OPT" # Set security type to options
contract.exchange = exchange
contract.currency = currency
contract.right = right # 'C' for Call, 'P' for Put
contract.strike = float(strike) # Strike price
contract.lastTradeDateOrContractMonth = expiry # Expiry date in YYYYMMDD format
# Set duration and bar size for options data
duration = "1 D" # 1 day chunks
bar_size = "1 min" # 1-minute intervals
end_date = datetime.now(timezone.utc)
# Since options data typically spans less than a year, we fetch for the expiry
with tqdm(total=1, desc=f"Fetching {right} {strike} {expiry} data", unit="contract") as pbar:
end_date_str = end_date.strftime("%Y%m%d %H:%M:%S UTC")
try:
self.request_data(contract, end_date_str, duration, bar_size)
pbar.update(1)
time.sleep(15) # Sleep to avoid pacing violations
except Exception as e:
print(f"Error fetching data for contract {contract.symbol}: {e}")
except Exception as e:
print(f"Error fetching data: {e}")
def disconnect(self):
self.app.disconnect()
def get_user_input():
print("Provide the options contract details for data retrieval.")
try:
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
exchange = "SMART" # Automatically set to SMART routing
currency = "USD" # Automatically set to USD
right = input("Enter the option type ('C' for Call, 'P' for Put): ").strip().upper()
strike = input("Enter the strike price (e.g., '150'): ").strip()
expiry = input("Enter the expiry date (YYYYMMDD): ").strip()
if not all([symbol, right, strike, expiry]):
raise ValueError("All fields are required. Please try again.")
return symbol, exchange, currency, right, strike, expiry
except Exception as e:
print(f"Input Error: {e}")
return None
def graceful_exit(signal_received, frame):
print("\nTerminating program...")
app.disconnect()
exit(0)
signal.signal(signal.SIGINT, graceful_exit)
app = IBApp()
app.connect()
try:
user_input = get_user_input()
if user_input:
symbol, exchange, currency, right, strike, expiry = user_input
app.fetch_options_data(symbol, exchange, currency, right, strike, expiry)
data = app.app.df
if not data.empty:
filename = f"{symbol}_{strike}_{right}_{expiry}_options_data.csv"
data.to_csv(filename, index=False)
print(f"Options data saved to {filename}.")
print(data.head())
else:
print("No options data retrieved.")
except Exception as e:
print(f"Error: {e}")
finally:
app.disconnect()

View File

@@ -1,170 +0,0 @@
import signal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
import pandas as pd
from datetime import datetime, timedelta, timezone
import os
class IBApi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = []
self.df = pd.DataFrame()
self.data_retrieved = False
def historicalData(self, reqId, bar):
# Debug: Print each received bar
print(f"Received bar: Date={bar.date}, Open={bar.open}, High={bar.high}, Low={bar.low}, Close={bar.close}, Volume={bar.volume}")
self.data.append({
"Date": bar.date,
"Open": bar.open,
"High": bar.high,
"Low": bar.low,
"Close": bar.close,
"Volume": bar.volume
})
def historicalDataEnd(self, reqId, start, end):
# Debug: Indicate end of data reception
print(f"HistoricalDataEnd received. Start: {start}, End: {end}. Number of bars fetched: {len(self.data)}")
chunk_df = pd.DataFrame(self.data)
if not chunk_df.empty:
self.df = pd.concat([self.df, chunk_df], ignore_index=True)
else:
print("No data received in this request.")
self.data_retrieved = True
self.data = [] # Reset data list for next request
class IBApp:
def __init__(self):
self.app = IBApi()
def connect(self):
# Connect to IB API (ensure IB Gateway or TWS is running)
print("Connecting to IB API...")
self.app.connect("127.0.0.1", 4002, clientId=1)
# Start the API thread
thread = threading.Thread(target=self.run_app, daemon=True)
thread.start()
time.sleep(1) # Allow time for connection
print("Connected to IB API.")
def run_app(self):
self.app.run()
def request_data(self, contract, end_date, duration, bar_size):
# Request historical data
print(f"Requesting data: endDateTime={end_date}, durationStr={duration}, barSizeSetting={bar_size}")
self.app.reqHistoricalData(
reqId=1,
contract=contract,
endDateTime=end_date,
durationStr=duration,
barSizeSetting=bar_size,
whatToShow="TRADES",
useRTH=1, # Use regular trading hours
formatDate=1,
keepUpToDate=False,
chartOptions=[]
)
# Wait until data is retrieved
while not self.app.data_retrieved:
time.sleep(0.1)
self.app.data_retrieved = False # Reset flag for next request
def fetch_recent_data(self, symbol, sec_type, exchange, currency):
try:
# Define the contract
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.exchange = exchange
contract.currency = currency
# Set duration and bar size for last 2 days
duration = "2 D" # 2 days
bar_size = "1 min" # 1-minute intervals
# Set end_date to now in UTC
end_date = datetime.now(timezone.utc)
end_date_str = end_date.strftime("%Y%m%d %H:%M:%S UTC")
print(f"Fetching data up to {end_date_str} for the last {duration} with bar size {bar_size}")
self.request_data(contract, end_date_str, duration, bar_size)
except Exception as e:
print(f"Error fetching data: {e}")
def disconnect(self):
self.app.disconnect()
print("Disconnected from IB API.")
def get_user_input():
print("Provide the stock details for historical data retrieval.")
try:
symbol = input("Enter the stock symbol (e.g., 'AAPL'): ").strip().upper()
sec_type = "STK" # Automatically set to Stock
exchange = "SMART" # Automatically set to SMART routing
currency = "USD" # Automatically set to USD
if not symbol:
raise ValueError("Stock symbol is required. Please try again.")
return symbol, sec_type, exchange, currency
except Exception as e:
print(f"Input Error: {e}")
return None
def graceful_exit(signal_received, frame):
print("\nTerminating program...")
app.disconnect()
exit(0)
# Handle graceful exit on Ctrl+C
signal.signal(signal.SIGINT, graceful_exit)
# Initialize and connect the IBApp
app = IBApp()
app.connect()
try:
user_input = get_user_input()
if user_input:
symbol, sec_type, exchange, currency = user_input
# Define the filename (save directly in current directory)
filename = f"{symbol}_recent_data.csv"
# Fetch recent data (last 2 days)
app.fetch_recent_data(symbol, sec_type, exchange, currency)
# Retrieve fetched data
data = app.app.df
if not data.empty:
print(f"Number of data points fetched: {len(data)}")
# Clean and parse the 'Date' column
# Attempt multiple formats
data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
# Check if timezone is present; if not, localize to UTC
if data['Date'].dt.tz is None:
data['Date'] = data['Date'].dt.tz_localize(timezone.utc, ambiguous='NaT', nonexistent='NaT')
# Remove any rows with NaT in 'Date'
data.dropna(subset=['Date'], inplace=True)
# Sort by 'Date' ascending
data.sort_values(by='Date', inplace=True)
# Save to CSV
data.to_csv(filename, index=False)
print(f"Data saved to {filename}.")
print(data.tail())
else:
print("No new data fetched.")
except Exception as e:
print(f"Error: {e}")
finally:
app.disconnect()

View File

@@ -1,119 +0,0 @@
import ta
import pandas as pd
preprocessed_file_path = 'C:/Users/gwitt/MidasTechnologies/API/spy_1min_preprocessed.csv' # Replace with your file path
df = pd.read_csv(preprocessed_file_path, index_col='Date', parse_dates=True)
# **Trend Indicators**
# Simple Moving Averages
df['SMA_20'] = ta.trend.sma_indicator(close=df['Close'], window=20)
df['SMA_50'] = ta.trend.sma_indicator(close=df['Close'], window=50)
df['SMA_200'] = ta.trend.sma_indicator(close=df['Close'], window=200)
# Exponential Moving Averages
df['EMA_20'] = ta.trend.ema_indicator(close=df['Close'], window=20)
df['EMA_50'] = ta.trend.ema_indicator(close=df['Close'], window=50)
# MACD
macd = ta.trend.MACD(close=df['Close'], window_slow=26, window_fast=12, window_sign=9)
df['MACD'] = macd.macd()
df['MACD_Signal'] = macd.macd_signal()
df['MACD_Hist'] = macd.macd_diff()
# ADX
df['ADX_14'] = ta.trend.adx(high=df['High'], low=df['Low'], close=df['Close'], window=14)
# **Momentum Indicators**
# RSI
df['RSI_14'] = ta.momentum.rsi(close=df['Close'], window=14)
# Stochastic Oscillator
stoch = ta.momentum.StochasticOscillator(high=df['High'], low=df['Low'], close=df['Close'], window=14, smooth_window=3)
df['Stoch_%K'] = stoch.stoch()
df['Stoch_%D'] = stoch.stoch_signal()
# Rate of Change
df['ROC_10'] = ta.momentum.roc(close=df['Close'], window=10)
# **Volatility Indicators**
# Bollinger Bands
bollinger = ta.volatility.BollingerBands(close=df['Close'], window=20, window_dev=2)
df['Bollinger_High'] = bollinger.bollinger_hband()
df['Bollinger_Low'] = bollinger.bollinger_lband()
df['Bollinger_Middle'] = bollinger.bollinger_mavg()
# Average True Range
df['ATR_14'] = ta.volatility.average_true_range(high=df['High'], low=df['Low'], close=df['Close'], window=14)
# **Volume Indicators**
# On-Balance Volume
df['OBV'] = ta.volume.on_balance_volume(close=df['Close'], volume=df['Volume'])
# Volume Weighted Average Price
df['VWAP'] = ta.volume.volume_weighted_average_price(high=df['High'], low=df['Low'], close=df['Close'], volume=df['Volume'])
# Chaikin Money Flow
df['CMF_20'] = ta.volume.chaikin_money_flow(high=df['High'], low=df['Low'], close=df['Close'], volume=df['Volume'], window=20)
# **Composite Indicators**
# # Ichimoku Cloud
# ichimoku = ta.trend.IchimokuIndicator(high=df['High'], low=df['Low'], close=df['Close'], window1=9, window2=26, window3=52)
# df['Ichimoku_A'] = ichimoku.ichimoku_a()
# df['Ichimoku_B'] = ichimoku.ichimoku_b()
# df['Ichimoku_Base_Line'] = ichimoku.ichimoku_base_line()
# df['Ichimoku_Conversion_Line'] = ichimoku.ichimoku_conversion_line()
# Parabolic SAR
df['PSAR'] = ta.trend.psar_up(close=df['Close'], high=df['High'], low=df['Low'], step=0.02, max_step=0.2)
# **Classification Target:** 1 if next minute's close > current close, else 0
df['Target_Class'] = (df['Close'].shift(-1) > df['Close']).astype(int)
# **Regression Target:** Percentage change in close price
df['Target_Change'] = ((df['Close'].shift(-1) - df['Close']) / df['Close']) * 100
# Display targets
print("\nTarget Variables:")
print(df[['Close', 'Target_Class', 'Target_Change']].head())
# Define lag periods
lag_periods = [1, 2, 3]
# Create lagged features for key indicators
key_indicators = ['RSI_14', 'MACD', 'ADX_14', 'ATR_14', 'OBV', 'CMF_20']
for indicator in key_indicators:
for lag in lag_periods:
df[f'{indicator}_lag{lag}'] = df[indicator].shift(lag)
# Display lagged features
print("\nLagged Features:")
print(df[[f'RSI_14_lag{lag}' for lag in lag_periods]].head())
# Rolling mean of RSI over past 5 minutes
df['RSI_14_roll_mean_5'] = df['RSI_14'].rolling(window=5).mean()
# Rolling standard deviation of ATR over past 10 minutes
df['ATR_14_roll_std_10'] = df['ATR_14'].rolling(window=10).std()
# Display rolling features
print("\nRolling Features:")
print(df[['RSI_14_roll_mean_5', 'ATR_14_roll_std_10']].head())
# Interaction between MACD and RSI
df['MACD_RSI'] = df['MACD'] * df['RSI_14']
# Interaction between ATR and ADX
df['ATR_ADX'] = df['ATR_14'] * df['ADX_14']
# Display interaction features
print("\nInteraction Features:")
print(df[['MACD_RSI', 'ATR_ADX']].head())
# Save dataset with technical indicators
indicators_file_path = 'C:/Users/gwitt/MidasTechnologies/API/spy_1min_with_indicators.csv' # Replace with your desired path
df.to_csv(indicators_file_path)
print(f"Data with technical indicators saved to {indicators_file_path}")