This commit is contained in:
klein panic
2025-01-30 00:57:13 -05:00
parent 030529e6c7
commit 6cf5a47005
2 changed files with 558 additions and 293 deletions

View File

@@ -0,0 +1,42 @@
2025-01-30 00:40:03,878 - INFO - ===== Resource Statistics =====
2025-01-30 00:40:03,879 - INFO - Physical CPU Cores: 10
2025-01-30 00:40:03,879 - INFO - Logical CPU Cores: 12
2025-01-30 00:40:03,879 - INFO - CPU Usage per Core: [5.1, 2.0, 5.0, 2.0, 1.0, 2.0, 1.0, 1.0, 1.0, 2.0, 0.0, 1.0]%
2025-01-30 00:40:03,879 - INFO - No GPUs detected.
2025-01-30 00:40:03,879 - INFO - =================================
2025-01-30 00:40:03,880 - INFO - Configured TensorFlow to use CPU with optimized thread settings.
2025-01-30 00:40:03,880 - INFO - Loading data from: BAT.csv
2025-01-30 00:40:04,173 - INFO - Data columns after renaming: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
2025-01-30 00:40:04,179 - INFO - Data loaded and sorted successfully.
2025-01-30 00:40:04,179 - INFO - Calculating technical indicators...
2025-01-30 00:40:04,193 - INFO - Technical indicators calculated successfully.
2025-01-30 00:40:04,197 - INFO - Starting parallel feature engineering with 10 workers...
2025-01-30 00:40:06,772 - INFO - Parallel feature engineering completed.
2025-01-30 00:40:06,812 - INFO - Scaled training features shape: (14134, 15, 17)
2025-01-30 00:40:06,813 - INFO - Scaled validation features shape: (3028, 15, 17)
2025-01-30 00:40:06,813 - INFO - Scaled testing features shape: (3030, 15, 17)
2025-01-30 00:40:06,813 - INFO - Scaled training target shape: (14134,)
2025-01-30 00:40:06,813 - INFO - Scaled validation target shape: (3028,)
2025-01-30 00:40:06,813 - INFO - Scaled testing target shape: (3030,)
2025-01-30 00:40:06,813 - INFO - Starting LSTM hyperparameter optimization with Optuna using 10 parallel trials...
2025-01-30 00:56:53,436 - INFO - ===== Resource Statistics =====
2025-01-30 00:56:53,436 - INFO - Physical CPU Cores: 10
2025-01-30 00:56:53,436 - INFO - Logical CPU Cores: 12
2025-01-30 00:56:53,437 - INFO - CPU Usage per Core: [1.0, 1.0, 5.1, 1.0, 0.0, 0.0, 2.0, 1.0, 0.0, 0.0, 0.0, 0.0]%
2025-01-30 00:56:53,437 - INFO - No GPUs detected.
2025-01-30 00:56:53,437 - INFO - =================================
2025-01-30 00:56:53,437 - INFO - Configured TensorFlow to use CPU with optimized thread settings.
2025-01-30 00:56:53,437 - INFO - Loading data from: BAT.csv
2025-01-30 00:56:53,730 - INFO - Data columns after renaming: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
2025-01-30 00:56:53,736 - INFO - Data loaded and sorted successfully.
2025-01-30 00:56:53,736 - INFO - Calculating technical indicators...
2025-01-30 00:56:53,753 - INFO - Technical indicators calculated successfully.
2025-01-30 00:56:53,758 - INFO - Starting parallel feature engineering with 10 workers...
2025-01-30 00:56:56,241 - INFO - Parallel feature engineering completed.
2025-01-30 00:56:56,285 - INFO - Scaled training features shape: (14134, 15, 17)
2025-01-30 00:56:56,285 - INFO - Scaled validation features shape: (3028, 15, 17)
2025-01-30 00:56:56,285 - INFO - Scaled testing features shape: (3030, 15, 17)
2025-01-30 00:56:56,285 - INFO - Scaled training target shape: (14134,)
2025-01-30 00:56:56,285 - INFO - Scaled validation target shape: (3028,)
2025-01-30 00:56:56,285 - INFO - Scaled testing target shape: (3030,)
2025-01-30 00:56:56,285 - INFO - Starting LSTM hyperparameter optimization with Optuna using 10 parallel trials...

View File

@@ -7,8 +7,8 @@ import logging
from tabulate import tabulate
import matplotlib.pyplot as plt
import seaborn as sns
# TensorFlow / Keras
import psutil
import GPUtil
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
@@ -17,30 +17,120 @@ from tensorflow.keras.losses import Huber
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, Nadam
# Sklearn
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import joblib
# Optuna
import optuna
from optuna.integration import KerasPruningCallback
# RL stuff
import gym
from gym import spaces
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback
from multiprocessing import Pool, cpu_count
import threading
import time
# Suppress TensorFlow logs beyond errors
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# ============================
# Resource Detection Functions
# ============================
def get_cpu_info():
"""
Retrieves CPU information including physical and logical cores and current usage per core.
######################################################
# 1. DATA LOADING & ADVANCED TECHNICAL INDICATORS
######################################################
Returns:
dict: Dictionary containing physical cores, logical cores, and CPU usage per core.
"""
cpu_count = psutil.cpu_count(logical=False) # Physical cores
cpu_count_logical = psutil.cpu_count(logical=True) # Logical cores
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
return {
'physical_cores': cpu_count,
'logical_cores': cpu_count_logical,
'cpu_percent': cpu_percent
}
def get_gpu_info():
"""
Retrieves GPU information including load, memory usage, and temperature.
Returns:
list: List of dictionaries containing GPU stats.
"""
gpus = GPUtil.getGPUs()
gpu_info = []
for gpu in gpus:
gpu_info.append({
'id': gpu.id,
'name': gpu.name,
'load': gpu.load * 100, # Convert to percentage
'memory_total': gpu.memoryTotal,
'memory_used': gpu.memoryUsed,
'memory_free': gpu.memoryFree,
'temperature': gpu.temperature
})
return gpu_info
def configure_tensorflow(cpu_stats, gpu_stats):
"""
Configures TensorFlow to utilize available CPU and GPU resources efficiently.
Args:
cpu_stats (dict): Dictionary containing CPU statistics.
gpu_stats (list): List of dictionaries containing GPU statistics.
"""
logical_cores = cpu_stats['logical_cores']
os.environ["OMP_NUM_THREADS"] = str(logical_cores)
os.environ["TF_NUM_INTRAOP_THREADS"] = str(logical_cores)
os.environ["TF_NUM_INTEROP_THREADS"] = str(logical_cores)
if gpu_stats:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
logging.info(f"Enabled memory growth for {len(gpus)} GPU(s).")
except RuntimeError as e:
logging.error(f"TensorFlow GPU configuration error: {e}")
else:
tf.config.threading.set_intra_op_parallelism_threads(logical_cores)
tf.config.threading.set_inter_op_parallelism_threads(logical_cores)
logging.info("Configured TensorFlow to use CPU with optimized thread settings.")
# ============================
# Resource Monitoring Function (Optional)
# ============================
def monitor_resources(interval=60):
"""
Continuously monitors and logs CPU and GPU usage at specified intervals.
Args:
interval (int): Time in seconds between each monitoring snapshot.
"""
while True:
cpu = psutil.cpu_percent(interval=1, percpu=True)
gpu = get_gpu_info()
logging.info(f"CPU Usage per Core: {cpu}%")
if gpu:
for gpu_stat in gpu:
logging.info(f"GPU {gpu_stat['id']} - {gpu_stat['name']}: Load: {gpu_stat['load']}%, "
f"Memory Used: {gpu_stat['memory_used']}MB / {gpu_stat['memory_total']}MB, "
f"Temperature: {gpu_stat['temperature']}°C")
else:
logging.info("No GPUs detected.")
logging.info("-" * 50)
time.sleep(interval)
# ============================
# Data Loading & Technical Indicators
# ============================
def load_data(file_path):
logging.info(f"Loading data from: {file_path}")
try:
@@ -140,10 +230,9 @@ def calculate_technical_indicators(df):
logging.info("Technical indicators calculated successfully.")
return df
###############################
# 2. ARG PARSING
###############################
# ============================
# Argument Parsing
# ============================
def parse_arguments():
parser = argparse.ArgumentParser(description='All-in-One: LSTM + DQN (with LSTM predictions) + Tuning.')
parser.add_argument('csv_path', type=str,
@@ -158,12 +247,17 @@ def parse_arguments():
help='Number of Optuna trials for LSTM. Default=30.')
parser.add_argument('--n_trials_dqn', type=int, default=20,
help='Number of Optuna trials for DQN. Default=20.')
parser.add_argument('--max_parallel_trials', type=int, default=None,
help='Maximum number of parallel Optuna trials. Defaults to (logical cores - 2).')
parser.add_argument('--preprocess_workers', type=int, default=None,
help='Number of worker processes for data preprocessing. Defaults to (logical cores - 2).')
parser.add_argument('--monitor_resources', action='store_true',
help='Enable real-time resource monitoring.')
return parser.parse_args()
###############################
# 3. CUSTOM DQN CALLBACK: LOG ACTIONS + REWARDS
###############################
# ============================
# Custom DQN Callback: Log Actions + Rewards
# ============================
class ActionLoggingCallback(BaseCallback):
"""
Logs distribution of actions and average reward after each rollout.
@@ -171,7 +265,7 @@ class ActionLoggingCallback(BaseCallback):
but stable-baselines3 still calls `_on_rollout_end` periodically.
"""
def __init__(self, verbose=0):
super().__init__(verbose)
super(ActionLoggingCallback, self).__init__(verbose)
self.action_buffer = []
self.reward_buffer = []
@@ -180,6 +274,7 @@ class ActionLoggingCallback(BaseCallback):
self.reward_buffer = []
def _on_step(self):
# For Stable Baselines3, access actions and rewards via self.locals
action = self.locals.get('action', None)
reward = self.locals.get('reward', None)
if action is not None:
@@ -204,9 +299,43 @@ class ActionLoggingCallback(BaseCallback):
self.action_buffer = []
self.reward_buffer = []
###############################
# 4. MAIN
###############################
# ============================
# Data Preprocessing with Controlled Parallelization
# ============================
def parallel_feature_engineering(row):
"""
Placeholder function for feature engineering. Modify as needed.
Args:
row (pd.Series): A row from the DataFrame.
Returns:
pd.Series: Processed row.
"""
# Implement any additional feature engineering here if necessary
return row
def feature_engineering_parallel(df, num_workers):
"""
Applies feature engineering in parallel using multiprocessing.
Args:
df (pd.DataFrame): DataFrame to process.
num_workers (int): Number of worker processes.
Returns:
pd.DataFrame: Processed DataFrame.
"""
logging.info(f"Starting parallel feature engineering with {num_workers} workers...")
with Pool(processes=num_workers) as pool:
processed_rows = pool.map(parallel_feature_engineering, [row for _, row in df.iterrows()])
df_processed = pd.DataFrame(processed_rows)
logging.info("Parallel feature engineering completed.")
return df_processed
# ============================
# Main Function with Enhanced Optimizations
# ============================
def main():
args = parse_arguments()
csv_path = args.csv_path
@@ -215,6 +344,53 @@ def main():
dqn_eval_episodes = args.dqn_eval_episodes
n_trials_lstm = args.n_trials_lstm
n_trials_dqn = args.n_trials_dqn
max_parallel_trials = args.max_parallel_trials
preprocess_workers = args.preprocess_workers
enable_resource_monitor = args.monitor_resources
# =============================
# Setup Logging
# =============================
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("LSTMDQN.log"),
logging.StreamHandler(sys.stdout)
])
# =============================
# Resource Detection & Logging
# =============================
cpu_stats = get_cpu_info()
gpu_stats = get_gpu_info()
logging.info("===== Resource Statistics =====")
logging.info(f"Physical CPU Cores: {cpu_stats['physical_cores']}")
logging.info(f"Logical CPU Cores: {cpu_stats['logical_cores']}")
logging.info(f"CPU Usage per Core: {cpu_stats['cpu_percent']}%")
if gpu_stats:
logging.info("GPU Statistics:")
for gpu in gpu_stats:
logging.info(f"GPU {gpu['id']} - {gpu['name']}: Load: {gpu['load']}%, "
f"Memory Used: {gpu['memory_used']}MB / {gpu['memory_total']}MB, "
f"Temperature: {gpu['temperature']}°C")
else:
logging.info("No GPUs detected.")
logging.info("=================================")
# =============================
# Configure TensorFlow
# =============================
configure_tensorflow(cpu_stats, gpu_stats)
# =============================
# Start Resource Monitoring (Optional)
# =============================
if enable_resource_monitor:
logging.info("Starting real-time resource monitoring...")
resource_monitor_thread = threading.Thread(target=monitor_resources, args=(60,), daemon=True)
resource_monitor_thread.start()
##########################################
# A) LSTM PART: LOAD, PREPROCESS, TUNE
@@ -231,7 +407,15 @@ def main():
target_column = 'Close'
df = df[['Date'] + feature_columns + [target_column]].dropna()
from sklearn.preprocessing import MinMaxScaler
# 2) Controlled Parallel Data Preprocessing
if preprocess_workers is None:
# Default to logical cores minus 2 to prevent overloading
preprocess_workers = max(1, cpu_stats['logical_cores'] - 2)
else:
preprocess_workers = min(preprocess_workers, cpu_stats['logical_cores'])
df = feature_engineering_parallel(df, num_workers=preprocess_workers)
scaler_features = MinMaxScaler()
scaler_target = MinMaxScaler()
@@ -241,7 +425,7 @@ def main():
X_scaled = scaler_features.fit_transform(X_all)
y_scaled = scaler_target.fit_transform(y_all).flatten()
# 2) Create sequences
# 3) Create sequences
def create_sequences(features, target, window_size):
X_seq, y_seq = [], []
for i in range(len(features) - window_size):
@@ -251,7 +435,7 @@ def main():
X, y = create_sequences(X_scaled, y_scaled, lstm_window_size)
# 3) Split into train/val/test
# 4) Split into train/val/test
train_size = int(len(X) * 0.7)
val_size = int(len(X) * 0.15)
test_size = len(X) - train_size - val_size
@@ -267,24 +451,8 @@ def main():
logging.info(f"Scaled validation target shape: {y_val.shape}")
logging.info(f"Scaled testing target shape: {y_test.shape}")
# 4) GPU config
def configure_device():
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
logging.info(f"{len(gpus)} GPU(s) detected & configured.")
except RuntimeError as e:
logging.error(e)
else:
logging.info("No GPU detected, using CPU.")
configure_device()
# 5) Build LSTM function
# 5) Build and compile LSTM model
def build_lstm(input_shape, hyperparams):
from tensorflow.keras.regularizers import l2
model = Sequential()
num_layers = hyperparams['num_lstm_layers']
units = hyperparams['lstm_units']
@@ -313,7 +481,6 @@ def main():
# 6) Optuna objective for LSTM
def lstm_objective(trial):
import tensorflow as tf
num_lstm_layers = trial.suggest_int('num_lstm_layers', 1, 3)
lstm_units = trial.suggest_categorical('lstm_units', [32, 64, 96, 128])
dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
@@ -346,18 +513,25 @@ def main():
val_mae = min(history.history['val_mae'])
return val_mae
logging.info("Starting LSTM hyperparam optimization with Optuna...")
study_lstm= optuna.create_study(direction='minimize')
study_lstm.optimize(lstm_objective, n_trials=n_trials_lstm)
best_lstm_params = study_lstm.best_params
logging.info(f"Best LSTM Hyperparams: {best_lstm_params}")
# 7) Hyperparameter Optimization with Optuna (Parallelized)
if max_parallel_trials is None:
# Default to logical cores minus 2 to prevent overloading
max_parallel_trials = max(1, cpu_stats['logical_cores'] - 2)
else:
max_parallel_trials = min(max_parallel_trials, cpu_stats['logical_cores'])
# 7) Train final LSTM
logging.info(f"Starting LSTM hyperparameter optimization with Optuna using {max_parallel_trials} parallel trials...")
study_lstm = optuna.create_study(direction='minimize')
study_lstm.optimize(lstm_objective, n_trials=n_trials_lstm, n_jobs=max_parallel_trials)
best_lstm_params = study_lstm.best_params
logging.info(f"Best LSTM Hyperparameters: {best_lstm_params}")
# 8) Train final LSTM
final_lstm = build_lstm((X_train.shape[1], X_train.shape[2]), best_lstm_params)
early_stop_final = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)
lr_reduce_final = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
logging.info("Training best LSTM model with found hyperparams...")
logging.info("Training best LSTM model with optimized hyperparameters...")
hist = final_lstm.fit(
X_train, y_train,
epochs=300,
@@ -367,9 +541,9 @@ def main():
verbose=1
)
# Evaluate LSTM
# 9) Evaluate LSTM
def evaluate_lstm(model, X_test, y_test):
logging.info("Evaluating final LSTM...")
logging.info("Evaluating final LSTM model...")
y_pred_scaled = model.predict(X_test).flatten()
y_pred_scaled = np.clip(y_pred_scaled, 0, 1)
y_pred = scaler_target.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
@@ -384,54 +558,53 @@ def main():
direction_pred = np.sign(np.diff(y_pred))
directional_accuracy = np.mean(direction_actual == direction_pred)
logging.info(f"Test MSE: {mse_}")
logging.info(f"Test RMSE: {rmse_}")
logging.info(f"Test MAE: {mae_}")
logging.info(f"Test R2 Score: {r2_}")
logging.info(f"Directional Accuracy: {directional_accuracy}")
logging.info(f"Test MSE: {mse_:.4f}")
logging.info(f"Test RMSE: {rmse_:.4f}")
logging.info(f"Test MAE: {mae_:.4f}")
logging.info(f"Test R2 Score: {r2_:.4f}")
logging.info(f"Directional Accuracy: {directional_accuracy:.4f}")
# Plot
# Plot Actual vs Predicted
plt.figure(figsize=(14, 7))
plt.plot(y_test_actual, label='Actual Price')
plt.plot(y_pred, label='Predicted Price')
plt.title('LSTM: Actual vs Predicted')
plt.title('LSTM: Actual vs Predicted Closing Prices')
plt.legend()
plt.grid(True)
plt.savefig('lstm_actual_vs_pred.png')
plt.close()
# Tabulate first 40
# Tabulate first 40 results
table = []
limit = min(40, len(y_test_actual))
for i in range(limit):
table.append([i, round(y_test_actual[i], 2), round(y_pred[i], 2)])
headers = ["Index", "Actual Price", "Predicted Price"]
print("\nFirst 40 Actual vs. Predicted Prices:")
print(tabulate(table, headers=headers, tablefmt="pretty"))
return r2_, directional_accuracy
_r2, _diracc = evaluate_lstm(final_lstm, X_test, y_test)
# Save LSTM + scalers
# 10) Save LSTM and Scalers
final_lstm.save('best_lstm_model.h5')
joblib.dump(scaler_features, 'scaler_features.pkl')
joblib.dump(scaler_target, 'scaler_target.pkl')
logging.info("Saved best LSTM model + scalers (best_lstm_model.h5, scaler_features.pkl, scaler_target.pkl).")
logging.info("Saved best LSTM model and scaler objects (best_lstm_model.h5, scaler_features.pkl, scaler_target.pkl).")
############################################################
# B) DQN PART: BUILD ENV THAT USES THE LSTM + FORECAST
############################################################
class StockTradingEnvWithLSTM(gym.Env):
"""
An environment that uses the LSTM model's predicted next day close
as part of the observation:
obs = [technical indicators, balance, shares, cost_basis, predicted_next_close].
Reward => net_worth - initial_balance each step.
A custom OpenAI Gym environment for stock trading that integrates LSTM model predictions.
Observation includes technical indicators, account information, and predicted next close price.
"""
metadata = {'render.modes': ['human']}
def __init__(self, df, feature_columns, lstm_model, scaler_features, scaler_target,
window_size=15, initial_balance=10000, transaction_cost=0.001):
super().__init__()
super(StockTradingEnvWithLSTM, self).__init__()
self.df = df.reset_index(drop=True)
self.feature_columns = feature_columns
self.lstm_model = lstm_model
@@ -449,13 +622,13 @@ def main():
self.shares_held = 0
self.cost_basis = 0
# raw array of features
# Raw array of features
self.raw_features = df[feature_columns].values
# 0=Sell,1=Hold,2=Buy
# Action space: 0=Sell, 1=Hold, 2=Buy
self.action_space = spaces.Discrete(3)
# observation dimension = len(feature_columns)+3 +1 => 17 + 3 +1=21
# Observation space: [technical indicators, balance, shares, cost_basis, predicted_next_close]
self.observation_space = spaces.Box(
low=0, high=1,
shape=(len(feature_columns) + 3 + 1,),
@@ -475,16 +648,16 @@ def main():
row_max = np.max(row) if np.max(row) != 0 else 1.0
row_norm = row / row_max
# account info
# Account info
additional = np.array([
self.balance / self.initial_balance,
self.shares_held/100.0,
self.shares_held / 100.0, # Assuming max 100 shares for normalization
self.cost_basis / (self.initial_balance + 1e-9)
], dtype=np.float32)
# LSTM prediction
if self.current_step < self.window_size:
# not enough history => no forecast
# Not enough history => no forecast
predicted_close = 0.0
else:
seq = self.raw_features[self.current_step - self.window_size: self.current_step]
@@ -493,7 +666,7 @@ def main():
pred_scaled = self.lstm_model.predict(seq_scaled, verbose=0).flatten()[0]
pred_scaled = np.clip(pred_scaled, 0, 1)
unscaled = self.scaler_target.inverse_transform([[pred_scaled]])[0, 0]
# either keep raw or scale it. We'll do a naive scale by /1000 if typical price is double digits
# Normalize predicted close price (assuming a typical price range)
predicted_close = unscaled / 1000.0
obs = np.concatenate([row_norm, additional, [predicted_close]]).astype(np.float32)
@@ -540,17 +713,22 @@ def main():
f"Profit={profit:.2f}")
###################################
# C) DQN HYPERPARAM TUNING W/ LSTM
# C) DQN HYPERPARAMETER TUNING WITH LSTM
###################################
# We'll define a function that trains a DQN with trial hyperparams,
# then evaluates final net worth on one run.
from stable_baselines3.common.evaluation import evaluate_policy
# We'll define a small function to do final net worth check:
def evaluate_dqn_networth(model, env, n_episodes=1):
# We do a simple loop that runs the entire dataset (1 episode)
# to see final net worth.
# If you want multiple episodes, you can do multiple resets in random start positions, etc.
"""
Evaluates the trained DQN model by simulating trading over a specified number of episodes.
Args:
model (stable_baselines3.DQN): Trained DQN model.
env (gym.Env): Trading environment instance.
n_episodes (int): Number of episodes to run for evaluation.
Returns:
float: Average final net worth across episodes.
"""
final_net_worths = []
for _ in range(n_episodes):
obs = env.reset()
@@ -561,31 +739,35 @@ def main():
final_net_worths.append(env.net_worth)
return np.mean(final_net_worths)
# We'll define the DQN objective with Optuna
def dqn_objective(trial):
# we sample some DQN hyperparams
"""
Objective function for Optuna to optimize DQN hyperparameters.
Minimizes the negative of the final net worth achieved by the DQN agent.
Args:
trial (optuna.trial.Trial): Optuna trial object.
Returns:
float: Negative of the final net worth.
"""
lr = trial.suggest_loguniform("lr", 1e-5, 1e-2)
gamma = trial.suggest_float("gamma", 0.8, 0.9999)
exploration_fraction = trial.suggest_float("exploration_fraction", 0.01, 0.3)
buffer_size = trial.suggest_categorical("buffer_size", [5000, 10000, 20000])
batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
# Build environment fresh each time or reuse:
# We'll reuse the same data environment but new instance
# Initialize environment
env = StockTradingEnvWithLSTM(
df=df,
feature_columns=feature_columns,
lstm_model= final_lstm, # use the best LSTM
lstm_model=final_lstm, # Use the trained LSTM model
scaler_features=scaler_features,
scaler_target=scaler_target,
window_size=lstm_window_size
)
vec_env = DummyVecEnv([lambda: env])
# Build DQN
from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import BaseCallback
# Initialize DQN model
dqn_action_logger = ActionLoggingCallback(verbose=0)
model = DQN(
@@ -598,28 +780,34 @@ def main():
buffer_size=buffer_size,
batch_size=batch_size,
train_freq=4,
target_update_interval=1000,
# etc
target_update_interval=1000
)
# Train some timesteps
# Train DQN model
model.learn(total_timesteps=dqn_total_timesteps, callback=dqn_action_logger)
# Evaluate final net worth
final_net_worth = evaluate_dqn_networth(model, env, n_episodes=dqn_eval_episodes)
# we want to maximize net worth => minimize negative net worth
# Objective is to maximize net worth, so return negative
return -final_net_worth
logging.info("Starting DQN hyperparam tuning with Optuna (using LSTM environment)...")
# 11) Hyperparameter Optimization with Optuna (Parallelized)
if max_parallel_trials is None:
# Default to logical cores minus 2 to prevent overloading
max_parallel_trials = max(1, cpu_stats['logical_cores'] - 2)
else:
max_parallel_trials = min(max_parallel_trials, cpu_stats['logical_cores'])
logging.info(f"Starting DQN hyperparameter tuning with Optuna using {max_parallel_trials} parallel trials...")
study_dqn = optuna.create_study(direction='minimize')
study_dqn.optimize(dqn_objective, n_trials=n_trials_dqn)
study_dqn.optimize(dqn_objective, n_trials=n_trials_dqn, n_jobs=max_parallel_trials)
best_dqn_params = study_dqn.best_params
logging.info(f"Best DQN hyperparams: {best_dqn_params}")
logging.info(f"Best DQN Hyperparameters: {best_dqn_params}")
###################################
# D) TRAIN FINAL DQN WITH BEST PARAMS
# D) TRAIN FINAL DQN WITH BEST PARAMETERS
###################################
logging.info("Training final DQN with best hyperparams & LSTM environment...")
logging.info("Training final DQN model with best hyperparameters...")
env_final = StockTradingEnvWithLSTM(
df=df,
feature_columns=feature_columns,
@@ -630,8 +818,8 @@ def main():
)
vec_env_final = DummyVecEnv([lambda: env_final])
# Build final model
final_dqn_logger = ActionLoggingCallback(verbose=1) # We'll see logs each rollout
final_dqn_logger = ActionLoggingCallback(verbose=1) # Enable detailed logging
final_model = DQN(
'MlpPolicy',
vec_env_final,
@@ -643,15 +831,15 @@ def main():
batch_size=best_dqn_params['batch_size'],
train_freq=4,
target_update_interval=1000
# etc if you want other params
)
final_model.learn(total_timesteps=dqn_total_timesteps, callback=final_dqn_logger)
final_model.save("best_dqn_model_lstm.zip")
logging.info("Final DQN model trained and saved as 'best_dqn_model_lstm.zip'.")
###################################
# E) FINAL INFERENCE & LOG RESULTS
###################################
logging.info("Running final inference with best DQN...")
logging.info("Running final inference with the trained DQN model...")
env_test = StockTradingEnvWithLSTM(
df=df,
@@ -711,9 +899,44 @@ def main():
print(f"\n== Last 15 Steps ==")
print(tabulate(rows, headers=headers, tablefmt="pretty"))
logging.info("All tasks complete. Exiting.")
logging.info("Final inference completed. Results logged and displayed.")
###################################
# F) OPTIONAL: RETRY LOOP IF NET WORTH < THRESHOLD
###################################
NET_WORTH_THRESHOLD = 10500.0 # example threshold
if final_net_worth < NET_WORTH_THRESHOLD:
logging.warning(f"Final net worth (${final_net_worth:.2f}) is below ${NET_WORTH_THRESHOLD:.2f}. Retraining the same DQN model to learn from mistakes...")
# We continue training the SAME final_model without resetting its replay buffer.
# By setting `reset_num_timesteps=False`, we keep the replay buffer and learned weights.
additional_timesteps = 50000
logging.info(f"Retraining the existing DQN model for an additional {additional_timesteps} timesteps (keeping old experiences).")
# If you want to see action distributions again, you can keep the same callback or define a new one:
final_model.learn(
total_timesteps=additional_timesteps,
reset_num_timesteps=False, # Keep replay buffer + internal step counter
callback=final_dqn_logger # Optional: to log actions again
)
# Evaluate again
obs = env_test.reset()
done = False
second_total_reward = 0.0
while not done:
action, _ = final_model.predict(obs, deterministic=True)
obs, reward, done, info = env_test.step(action)
second_total_reward += reward
second_net_worth = env_test.net_worth
second_profit = second_net_worth - env_test.initial_balance
logging.info(f"After additional training, new final net worth=${second_net_worth:.2f}, profit=${second_profit:.2f}")
if second_net_worth < NET_WORTH_THRESHOLD:
logging.warning("Even after continued training, net worth is still below threshold. Consider a deeper hyperparameter search or analyzing the environment settings.")
if __name__=="__main__":
main()