import os import sys import argparse # Added for argument parsing import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import logging from tabulate import tabulate from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from sklearn.model_selection import TimeSeriesSplit, GridSearchCV import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Bidirectional from tensorflow.keras.optimizers import Adam, Nadam from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau from tensorflow.keras.losses import Huber import xgboost as xgb import optuna from optuna.integration import KerasPruningCallback # For Reinforcement Learning import gym from gym import spaces from stable_baselines3 import DQN from stable_baselines3.common.vec_env import DummyVecEnv # To handle parallelization import multiprocessing # Suppress TensorFlow warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Suppress INFO and WARNING messages # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 1. Data Loading and Preprocessing def load_data(file_path): logging.info(f"Loading data from: {file_path}") try: # Parse 'time' column as dates data = pd.read_csv(file_path, parse_dates=['time']) except FileNotFoundError: logging.error(f"File not found: {file_path}") sys.exit(1) except pd.errors.ParserError as e: logging.error(f"Error parsing CSV file: {e}") sys.exit(1) except Exception as e: logging.error(f"Unexpected error: {e}") sys.exit(1) # Rename columns to match script expectations rename_mapping = { 'time': 'Date', 'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close' } data.rename(columns=rename_mapping, inplace=True) logging.info(f"Data columns after renaming: {data.columns.tolist()}") # Sort and reset index data.sort_values('Date', inplace=True) data.reset_index(drop=True, inplace=True) logging.info("Data loaded and sorted successfully.") return data def compute_rsi(series, window=14): delta = series.diff() gain = (delta.where(delta > 0, 0)).rolling(window=window).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean() RS = gain / loss RSI = 100 - (100 / (1 + RS)) return RSI def compute_macd(series, span_short=12, span_long=26, span_signal=9): ema_short = series.ewm(span=span_short, adjust=False).mean() ema_long = series.ewm(span=span_long, adjust=False).mean() MACD = ema_short - ema_long signal = MACD.ewm(span=span_signal, adjust=False).mean() return MACD - signal def compute_adx(df, window=14): # Placeholder for ADX calculation return df['Close'].rolling(window=window).std() # Simplistic placeholder def compute_obv(df): # On-Balance Volume calculation OBV = (np.sign(df['Close'].diff()) * df['Volume']).fillna(0).cumsum() return OBV def calculate_technical_indicators(df): logging.info("Calculating technical indicators...") df['SMA_5'] = df['Close'].rolling(window=5).mean() df['SMA_10'] = df['Close'].rolling(window=10).mean() df['EMA_5'] = df['Close'].ewm(span=5, adjust=False).mean() df['EMA_10'] = df['Close'].ewm(span=10, adjust=False).mean() df['STDDEV_5'] = df['Close'].rolling(window=5).std() df['RSI'] = compute_rsi(df['Close'], window=14) df['MACD'] = compute_macd(df['Close']) df['ADX'] = compute_adx(df) df['OBV'] = compute_obv(df) df.dropna(inplace=True) # Drop rows with NaN values after feature engineering logging.info("Technical indicators calculated successfully.") return df # Argument Parsing def parse_arguments(): parser = argparse.ArgumentParser(description='Train LSTM and DQN models for stock trading.') parser.add_argument('csv_path', type=str, help='Path to the CSV data file.') return parser.parse_args() def main(): # Parse command-line arguments args = parse_arguments() csv_path = args.csv_path # Load and preprocess data data = load_data(csv_path) data = calculate_technical_indicators(data) # Feature selection feature_columns = ['SMA_5', 'SMA_10', 'EMA_5', 'EMA_10', 'STDDEV_5', 'RSI', 'MACD', 'ADX', 'OBV', 'Volume', 'Open', 'High', 'Low'] target_column = 'Close' data = data[['Date'] + feature_columns + [target_column]] data.dropna(inplace=True) # Scaling scaler_features = MinMaxScaler() scaler_target = MinMaxScaler() scaled_features = scaler_features.fit_transform(data[feature_columns]) scaled_target = scaler_target.fit_transform(data[[target_column]]).flatten() # Create sequences for LSTM def create_sequences(features, target, window_size=15): X, y = [], [] for i in range(len(features) - window_size): X.append(features[i:i+window_size]) y.append(target[i+window_size]) return np.array(X), np.array(y) window_size = 15 X, y = create_sequences(scaled_features, scaled_target, window_size) # Split data into training, validation, and testing sets train_size = int(len(X) * 0.7) val_size = int(len(X) * 0.15) test_size = len(X) - train_size - val_size X_train, X_val, X_test = X[:train_size], X[train_size:train_size+val_size], X[train_size+val_size:] y_train, y_val, y_test = y[:train_size], y[train_size:train_size+val_size], y[train_size+val_size:] logging.info(f"Scaled training features shape: {X_train.shape}") logging.info(f"Scaled validation features shape: {X_val.shape}") logging.info(f"Scaled testing features shape: {X_test.shape}") logging.info(f"Scaled training target shape: {y_train.shape}") logging.info(f"Scaled validation target shape: {y_val.shape}") logging.info(f"Scaled testing target shape: {y_test.shape}") # 2. Device Configuration def configure_device(): gpus = tf.config.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) logging.info(f"{len(gpus)} GPU(s) detected and configured.") except RuntimeError as e: logging.error(e) else: logging.info("No GPU detected, using CPU.") configure_device() # 3. Model Building def build_advanced_lstm(input_shape, hyperparams): model = Sequential() for i in range(hyperparams['num_lstm_layers']): return_sequences = True if i < hyperparams['num_lstm_layers'] - 1 else False model.add(Bidirectional(LSTM( hyperparams['lstm_units'], return_sequences=return_sequences, kernel_regularizer=tf.keras.regularizers.l2(0.001) ))) model.add(Dropout(hyperparams['dropout_rate'])) model.add(Dense(1, activation='linear')) if hyperparams['optimizer'] == 'Adam': optimizer = Adam(learning_rate=hyperparams['learning_rate'], decay=hyperparams['decay']) elif hyperparams['optimizer'] == 'Nadam': optimizer = Nadam(learning_rate=hyperparams['learning_rate']) else: optimizer = Adam(learning_rate=hyperparams['learning_rate']) model.compile(optimizer=optimizer, loss=Huber(), metrics=['mae']) return model def build_xgboost_model(X_train, y_train, hyperparams): model = xgb.XGBRegressor( objective='reg:squarederror', n_estimators=hyperparams['n_estimators'], max_depth=hyperparams['max_depth'], learning_rate=hyperparams['learning_rate'], subsample=hyperparams['subsample'], colsample_bytree=hyperparams['colsample_bytree'], random_state=42, n_jobs=-1 ) model.fit(X_train.reshape(X_train.shape[0], -1), y_train) return model # 4. Hyperparameter Tuning with Optuna def objective(trial): # Hyperparameter suggestions num_lstm_layers = trial.suggest_int('num_lstm_layers', 1, 3) lstm_units = trial.suggest_categorical('lstm_units', [32, 64, 96, 128]) dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5) learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2) optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'Nadam']) decay = trial.suggest_float('decay', 0.0, 1e-4) hyperparams = { 'num_lstm_layers': num_lstm_layers, 'lstm_units': lstm_units, 'dropout_rate': dropout_rate, 'learning_rate': learning_rate, 'optimizer': optimizer_name, 'decay': decay } model = build_advanced_lstm((X_train.shape[1], X_train.shape[2]), hyperparams) early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) lr_reduce = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6) history = model.fit( X_train, y_train, epochs=100, batch_size=16, validation_data=(X_val, y_val), callbacks=[early_stop, lr_reduce, KerasPruningCallback(trial, 'val_loss')], verbose=0 ) val_mae = min(history.history['val_mae']) return val_mae # Optuna study logging.info("Starting hyperparameter optimization with Optuna...") study = optuna.create_study(direction='minimize') study.optimize(objective, n_trials=50) best_params = study.best_params logging.info(f"Best Hyperparameters from Optuna: {best_params}") # 5. Train the Best LSTM Model best_model = build_advanced_lstm((X_train.shape[1], X_train.shape[2]), best_params) early_stop = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) lr_reduce = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6) logging.info("Training the best LSTM model with optimized hyperparameters...") history = best_model.fit( X_train, y_train, epochs=300, batch_size=16, validation_data=(X_val, y_val), callbacks=[early_stop, lr_reduce], verbose=1 ) # 6. Evaluate the Model def evaluate_model(model, X_test, y_test): logging.info("Evaluating model...") y_pred_scaled = model.predict(X_test).flatten() y_pred_scaled = np.clip(y_pred_scaled, 0, 1) # Ensure predictions are within [0,1] y_pred = scaler_target.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() y_test_actual = scaler_target.inverse_transform(y_test.reshape(-1, 1)).flatten() mse = mean_squared_error(y_test_actual, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_test_actual, y_pred) r2 = r2_score(y_test_actual, y_pred) # Directional Accuracy direction_actual = np.sign(np.diff(y_test_actual)) direction_pred = np.sign(np.diff(y_pred)) directional_accuracy = np.mean(direction_actual == direction_pred) logging.info(f"Test MSE: {mse}") logging.info(f"Test RMSE: {rmse}") logging.info(f"Test MAE: {mae}") logging.info(f"Test R2 Score: {r2}") logging.info(f"Directional Accuracy: {directional_accuracy}") # Plot Actual vs Predicted plt.figure(figsize=(14, 7)) plt.plot(y_test_actual, label='Actual Price') plt.plot(y_pred, label='Predicted Price') plt.title('Actual vs Predicted Prices') plt.xlabel('Time Step') plt.ylabel('Price') plt.legend() plt.grid(True) plt.savefig('actual_vs_predicted.png') # Save the plot plt.close() logging.info("Actual vs Predicted plot saved as 'actual_vs_predicted.png'") # Tabulate first 40 predictions table = [[i, round(actual, 2), round(pred, 2)] for i, (actual, pred) in enumerate(zip(y_test_actual[:40], y_pred[:40]))] headers = ["Index", "Actual Price", "Predicted Price"] print(tabulate(table, headers=headers, tablefmt="pretty")) return mse, rmse, mae, r2, directional_accuracy mse, rmse, mae, r2, directional_accuracy = evaluate_model(best_model, X_test, y_test) # 7. Save the Model and Scalers best_model.save('optimized_lstm_model.h5') import joblib joblib.dump(scaler_features, 'scaler_features.save') joblib.dump(scaler_target, 'scaler_target.save') logging.info("Model and scalers saved as 'optimized_lstm_model.h5', 'scaler_features.save', and 'scaler_target.save'.") # 8. Reinforcement Learning: Deep Q-Learning for Trading Actions class StockTradingEnv(gym.Env): """ A simple stock trading environment for OpenAI gym """ metadata = {'render.modes': ['human']} def __init__(self, df, initial_balance=10000): super(StockTradingEnv, self).__init__() self.df = df.reset_index() self.initial_balance = initial_balance self.balance = initial_balance self.net_worth = initial_balance self.max_steps = len(df) self.current_step = 0 self.shares_held = 0 self.cost_basis = 0 # Actions: 0 = Sell, 1 = Hold, 2 = Buy self.action_space = spaces.Discrete(3) # Observations: [normalized features + balance + shares held + cost basis] self.observation_space = spaces.Box(low=0, high=1, shape=(len(feature_columns) + 3,), dtype=np.float32) def reset(self): self.balance = self.initial_balance self.net_worth = self.initial_balance self.current_step = 0 self.shares_held = 0 self.cost_basis = 0 return self._next_observation() def _next_observation(self): obs = self.df.loc[self.current_step, feature_columns].values # Normalize features by their max to ensure [0,1] range obs = obs / np.max(obs) # Append balance, shares held, and cost basis additional = np.array([ self.balance / self.initial_balance, self.shares_held / 100, # Assuming a maximum of 100 shares for normalization self.cost_basis / self.initial_balance ]) return np.concatenate([obs, additional]) def step(self, action): current_price = self.df.loc[self.current_step, 'Close'] if action == 2: # Buy total_possible = self.balance // current_price shares_bought = total_possible if shares_bought > 0: self.balance -= shares_bought * current_price self.shares_held += shares_bought self.cost_basis = (self.cost_basis * (self.shares_held - shares_bought) + shares_bought * current_price) / self.shares_held elif action == 0: # Sell if self.shares_held > 0: self.balance += self.shares_held * current_price self.shares_held = 0 self.cost_basis = 0 # Hold does nothing self.net_worth = self.balance + self.shares_held * current_price self.current_step += 1 done = self.current_step >= self.max_steps - 1 # Reward: change in net worth reward = self.net_worth - self.initial_balance obs = self._next_observation() return obs, reward, done, {} def render(self, mode='human', close=False): profit = self.net_worth - self.initial_balance print(f'Step: {self.current_step}') print(f'Balance: {self.balance}') print(f'Shares held: {self.shares_held} (Cost Basis: {self.cost_basis})') print(f'Net worth: {self.net_worth}') print(f'Profit: {profit}') def train_dqn_agent(env): logging.info("Training DQN Agent...") try: model = DQN( 'MlpPolicy', env, verbose=1, learning_rate=1e-3, buffer_size=10000, learning_starts=1000, batch_size=64, tau=1.0, gamma=0.99, train_freq=4, target_update_interval=1000, exploration_fraction=0.1, exploration_final_eps=0.02, tensorboard_log="./dqn_stock_tensorboard/" ) model.learn(total_timesteps=100000) model.save("dqn_stock_trading") logging.info("DQN Agent trained and saved as 'dqn_stock_trading.zip'.") return model except Exception as e: logging.error(f"Error training DQN Agent: {e}") sys.exit(1) # Initialize trading environment trading_env = StockTradingEnv(data) trading_env = DummyVecEnv([lambda: trading_env]) # Train DQN agent dqn_model = train_dqn_agent(trading_env) if __name__ == "__main__": main()