Updating new attempt

This commit is contained in:
2025-04-12 20:49:03 +00:00
parent 9f63656268
commit 50a245056c
63 changed files with 3315 additions and 0 deletions

View File

@@ -104,3 +104,14 @@
2025-04-12 15:24:57,163 - INFO - Validation sequences shape: (9781, 15, 17)
2025-04-12 15:24:57,163 - INFO - Testing sequences shape: (9782, 15, 17)
2025-04-12 15:24:57,163 - INFO - Starting LSTM hyperparameter optimization with Optuna using 54 parallel trials...
2025-04-12 17:54:29,145 - INFO - Best LSTM Hyperparameters: {'num_lstm_layers': 1, 'lstm_units': 96, 'dropout_rate': 0.11559069858148932, 'learning_rate': 0.005924866096343409, 'optimizer': 'Nadam', 'decay': 1.4755604880325902e-06}
2025-04-12 17:54:29,389 - INFO - Training best LSTM model with optimized hyperparameters...
2025-04-12 18:44:39,440 - INFO - Evaluating final LSTM model...
2025-04-12 18:44:42,467 - INFO - Test MSE: 324.1369
2025-04-12 18:44:42,467 - INFO - Test RMSE: 18.0038
2025-04-12 18:44:42,468 - INFO - Test MAE: 12.3228
2025-04-12 18:44:42,468 - INFO - Test R2 Score: 0.9838
2025-04-12 18:44:42,468 - INFO - Directional Accuracy: 0.4449
2025-04-12 18:44:42,777 - WARNING - You are saving your model as an HDF5 file via `model.save()` or `keras.saving.save_model(model)`. This file format is considered legacy. We recommend using instead the native Keras format, e.g. `model.save('my_model.keras')` or `keras.saving.save_model(model, 'my_model.keras')`.
2025-04-12 18:44:42,811 - INFO - Saved best LSTM model and scaler objects.
2025-04-12 18:44:42,875 - INFO - Starting PPO training...

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 KiB

After

Width:  |  Height:  |  Size: 91 KiB

3
src/MidasAgent/todo Normal file
View File

@@ -0,0 +1,3 @@
We will want to store the AI's actions for stock shit in an SQL data base for query

View File

@@ -0,0 +1,7 @@
Metadata-Version: 2.1
Name: FuturesTradingAI
Version: 0.1.0
Summary: Hierarchical Learning for Live /MES Futures Trading
Author: Your Name
Author-email: your.email@example.com
Requires-Python: >=3.7

View File

@@ -0,0 +1,21 @@
setup.py
FuturesTradingAI.egg-info/PKG-INFO
FuturesTradingAI.egg-info/SOURCES.txt
FuturesTradingAI.egg-info/dependency_links.txt
FuturesTradingAI.egg-info/requires.txt
FuturesTradingAI.egg-info/top_level.txt
src/config.py
src/main.py
src/api/api_ai.py
src/c_modules/fast_indicators.c
src/data/loader.py
src/data/lstm_forecaster.py
src/data/preprocessing.py
src/data/technical_indicators.py
src/models/heirachical.py
src/models/ppo_trader.py
src/trading/env.py
src/trading/execution.py
src/trading/risk_manager.py
src/utils/logger.py
src/utils/resource_monitor.py

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,12 @@
tensorflow
gym
stable-baselines3
optuna
numpy
pandas
scikit-learn
joblib
flask
psutil
GPUtil
ibapi

View File

@@ -0,0 +1,2 @@
fast_indicators
src

View File

@@ -0,0 +1,85 @@
"""
src/api/api_ai.py
This module implements an API layer exposing endpoints for the trading AI.
Using Flask, it offers RESTful endpoints that receive market data and return trade signals.
"""
from flask import Flask, request, jsonify
import logging
from src.models.hierarchical import HierarchicalTrader
from src.models.lstm_forecaster import LSTMForecaster
from src.models.ppo_trader import PPOTrader
import pandas as pd
# Initialize Flask app
app = Flask(__name__)
logger = logging.getLogger(__name__)
# Global variable to hold the trading model
global_hierarchical_trader = None
@app.route('/initialize', methods=['POST'])
def initialize_models():
"""
Initialize trading models.
Expects JSON payload with parameters such as data path and model paths.
Example payload:
{
"data_path": "path/to/data.csv",
"lstm_model_path": "path/to/lstm_model.h5",
"ppo_model_path": "path/to/ppo_trader.zip"
}
"""
global global_hierarchical_trader
data = request.json
data_path = data.get('data_path')
lstm_model_path = data.get('lstm_model_path')
ppo_model_path = data.get('ppo_model_path')
try:
df = pd.read_csv(data_path, parse_dates=['time'])
except Exception as e:
logger.error(f"Error loading data: {e}")
return jsonify({"status": "error", "message": "Failed to load data"}), 400
# For illustration, create new models. In production load models from disk.
lstm_forecaster = LSTMForecaster(data=df)
ppo_trader = PPOTrader(data=df, lstm_model=lstm_forecaster.model)
ppo_trader.load(ppo_model_path)
global_hierarchical_trader = HierarchicalTrader(data=df, lstm_model=lstm_forecaster.model, ppo_model=ppo_trader.model)
logger.info("Models initialized successfully.")
return jsonify({"status": "success", "message": "Models initialized."}), 200
@app.route('/trade_signal', methods=['POST'])
def get_trade_signal():
"""
Endpoint to obtain a trade signal based on a provided observation.
Expected payload:
{
"observation": [ ... ]
}
Returns:
{
"trade_signal": <integer signal>
}
"""
global global_hierarchical_trader
if global_hierarchical_trader is None:
return jsonify({"status": "error", "message": "Models not initialized."}), 400
data = request.json
observation = data.get('observation')
if observation is None:
return jsonify({"status": "error", "message": "Observation not provided."}), 400
signal = global_hierarchical_trader.generate_trade_signal(observation)
return jsonify({"trade_signal": signal}), 200
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,68 @@
"""
src/config.py
This module holds central configuration settings and hyperparameters for the FuturesTradingAI system.
It includes settings for data paths, model training, trading environment parameters, risk management, etc.
"""
import os
# Base directory configuration
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# Data paths
DATA_DIR = os.path.join(BASE_DIR, 'data')
SAMPLE_DATA_PATH = os.path.join(DATA_DIR, 'sample_data.csv')
# Output directories
OUTPUT_DIR = os.path.join(BASE_DIR, 'output')
MODEL_DIR = os.path.join(OUTPUT_DIR, 'models')
LOG_DIR = os.path.join(OUTPUT_DIR, 'logs')
# LSTM model hyperparameters
LSTM_PARAMS = {
'num_layers': 2, # Number of LSTM layers
'units': 64, # Number of units per LSTM layer
'dropout_rate': 0.2, # Dropout rate to prevent overfitting
'learning_rate': 1e-3, # Initial learning rate
'l2_regularization': 1e-4 # L2 regularization factor
}
# PPO agent hyperparameters
PPO_PARAMS = {
'n_steps': 2048,
'batch_size': 64,
'gae_lambda': 0.95,
'gamma': 0.99,
'learning_rate': 3e-4,
'ent_coef': 0.0,
'verbose': 1
}
# Trading environment configuration
TRADING_ENV_CONFIG = {
'window_size': 15, # Lookback window size for LSTM forecasting
'transaction_cost': 0.001, # Transaction cost fee as a fraction of notional
'max_contracts': 1, # Initial max number of contracts for trading (scalable for multicontract trading)
'risk_reward_threshold': 2.0, # Minimum risk/reward ratio required to execute a trade (2:1)
'atr_period': 14, # Period for ATR calculation
'trailing_stop_multiplier': 1.5 # Multiplier for ATRbased trailing stop
}
# API configuration
API_CONFIG = {
'host': '127.0.0.1', # Host IP for API server
'port': 5000 # Port for API server
}
# Logging settings
LOGGING_CONFIG = {
'log_level': 'INFO', # Logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL
'log_format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
}
# Misc configuration for resource monitoring, etc.
MONITOR_CONFIG = {
'interval': 60 # Monitor system resources every 60 seconds
}

View File

@@ -0,0 +1,59 @@
"""
src/data/loader.py
This module provides functions to load and clean CSV market data.
"""
import pandas as pd
import logging
def load_data(file_path):
"""
Load CSV data from the specified file path.
Parameters:
- file_path (str): Path to the CSV file.
Returns:
- pandas.DataFrame: Loaded and cleaned data.
"""
logger = logging.getLogger(__name__)
try:
# Attempt to read the CSV with proper parsing of date columns
df = pd.read_csv(file_path, parse_dates=['time'])
logger.info(f"Successfully loaded data from {file_path}")
except FileNotFoundError:
logger.error(f"Data file not found: {file_path}")
raise
except pd.errors.ParserError as e:
logger.error(f"Error parsing the CSV file: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error loading data: {e}")
raise
# Standardize column names (e.g., time, open, high, low, close, volume)
expected_cols = ['time', 'open', 'high', 'low', 'close', 'volume']
df.columns = [col.strip().lower() for col in df.columns]
if not all(col in df.columns for col in expected_cols):
logger.warning("Input data does not contain all expected columns. Attempting to map columns.")
# Rename columns if necessary (this can be extended based on the actual CSV structure)
rename_mapping = {
'time': 'time',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
}
df = df.rename(columns=rename_mapping)
# Sort data chronologically and reset index
df.sort_values(by='time', inplace=True)
df.reset_index(drop=True, inplace=True)
# Handle missing values by forward filling
df.fillna(method='ffill', inplace=True)
return df

View File

@@ -0,0 +1,240 @@
"""
src/models/lstm_forecaster.py
This module implements an LSTMbased forecasting model for predicting shortterm future prices.
The model uses a bidirectional LSTM with dropout and L2 regularization.
It supports hyperparameter tuning with Optuna and provides training, evaluation, and saving functionality.
"""
import os
import numpy as np
import pandas as pd
import logging
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, Nadam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.preprocessing import MinMaxScaler
import optuna
class LSTMForecaster:
"""
LSTMForecaster builds, trains, and evaluates an LSTM model for price forecasting.
"""
def __init__(self, data, window_size=15, model_save_path=None, hyperparams=None):
"""
Initialize the LSTMForecaster.
Parameters:
- data (pandas.DataFrame): Preprocessed market data with technical indicators.
- window_size (int): The number of time steps in the input sequence.
- model_save_path (str): Directory path where the model will be saved.
- hyperparams (dict): Dictionary of hyperparameters (optional).
"""
self.logger = logging.getLogger(__name__)
self.data = data.copy()
self.window_size = window_size
self.model_save_path = model_save_path if model_save_path else os.getcwd()
self.hyperparams = hyperparams
# Define target column and feature columns. Here we assume 'close' is the target.
self.target_column = 'close'
self.feature_columns = [col for col in data.columns if col not in [self.target_column, 'time']]
# Initialize scalers for features and target
self.scaler_features = MinMaxScaler()
self.scaler_target = MinMaxScaler()
# Prepare training sequences
self.X, self.y = self._create_sequences()
# Build model based on hyperparameters or default settings
if self.hyperparams is None:
self.hyperparams = {
'num_layers': 2,
'units': 64,
'dropout_rate': 0.2,
'learning_rate': 1e-3,
'optimizer': 'Adam',
'l2_reg': 1e-4
}
self.model = self._build_model()
def _create_sequences(self):
"""
Create sequences from the data for LSTM training.
Returns:
- X (numpy.array): 3D array for model input.
- y (numpy.array): 1D array for target variable.
"""
features = self.data[self.feature_columns].values
target = self.data[[self.target_column]].values
features_scaled = self.scaler_features.fit_transform(features)
target_scaled = self.scaler_target.fit_transform(target)
X_seq, y_seq = [], []
for i in range(len(features_scaled) - self.window_size):
X_seq.append(features_scaled[i:i+self.window_size])
y_seq.append(target_scaled[i+self.window_size])
X_seq = np.array(X_seq)
y_seq = np.array(y_seq).flatten()
self.logger.info(f"Created sequences: X shape = {X_seq.shape}, y shape = {y_seq.shape}")
return X_seq, y_seq
def _build_model(self):
"""
Build the LSTM model architecture.
Returns:
- model (tf.keras.Model): Compiled LSTM model.
"""
self.logger.info("Building LSTM model with hyperparameters: {}".format(self.hyperparams))
model = Sequential()
num_layers = self.hyperparams['num_layers']
units = self.hyperparams['units']
dropout_rate = self.hyperparams['dropout_rate']
l2_reg = self.hyperparams['l2_reg']
# Add multiple Bidirectional LSTM layers with dropout
for i in range(num_layers):
return_sequences = True if i < num_layers - 1 else False
if i == 0:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg)),
input_shape=(self.X.shape[1], self.X.shape[2])))
else:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg))))
model.add(Dropout(dropout_rate))
# Final Dense layer for regression output
model.add(Dense(1, activation='linear'))
# Select optimizer
optimizer_name = self.hyperparams.get('optimizer', 'Adam')
learning_rate = self.hyperparams.get('learning_rate', 1e-3)
if optimizer_name == 'Adam':
optimizer = Adam(learning_rate=learning_rate)
elif optimizer_name == 'Nadam':
optimizer = Nadam(learning_rate=learning_rate)
else:
optimizer = Adam(learning_rate=learning_rate)
model.compile(loss='mse', optimizer=optimizer, metrics=['mae'])
self.logger.info("LSTM model built and compiled successfully.")
return model
def train(self, epochs=100, batch_size=16, validation_split=0.2):
"""
Train the LSTM model.
Parameters:
- epochs (int): Number of training epochs.
- batch_size (int): Batch size for training.
- validation_split (float): Fraction of data to use for validation.
"""
callbacks = [
EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]
self.logger.info("Starting LSTM training...")
history = self.model.fit(
self.X, self.y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=callbacks,
verbose=1
)
self.logger.info("LSTM training completed.")
return history
def predict(self, X):
"""
Generate predictions using the trained model.
Parameters:
- X (numpy.array): Input data sequences.
Returns:
- numpy.array: Predicted values (inverse transformed).
"""
pred_scaled = self.model.predict(X)
predictions = self.scaler_target.inverse_transform(pred_scaled)
return predictions.flatten()
def save(self, save_dir):
"""
Save the trained model and scalers.
Parameters:
- save_dir (str): Directory where the model and scalers will be saved.
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
model_path = os.path.join(save_dir, 'lstm_forecaster.h5')
self.model.save(model_path)
import joblib
joblib.dump(self.scaler_features, os.path.join(save_dir, 'scaler_features.pkl'))
joblib.dump(self.scaler_target, os.path.join(save_dir, 'scaler_target.pkl'))
self.logger.info(f"Model and scalers saved to {save_dir}")
@staticmethod
def objective(trial, X_train, y_train, X_val, y_val, input_shape):
"""
Objective function for hyperparameter tuning with Optuna.
Parameters:
- trial: Optuna trial object.
- X_train, y_train: Training data.
- X_val, y_val: Validation data.
- input_shape: Shape of the input data sequence.
Returns:
- float: Validation loss (MAE) to minimize.
"""
num_layers = trial.suggest_int('num_layers', 1, 3)
units = trial.suggest_categorical('units', [32, 64, 128])
dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
optimizer_choice = trial.suggest_categorical('optimizer', ['Adam', 'Nadam'])
l2_reg = trial.suggest_float('l2_reg', 1e-5, 1e-3, log=True)
model = Sequential()
for i in range(num_layers):
return_sequences = True if i < num_layers - 1 else False
if i == 0:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg)),
input_shape=input_shape))
else:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg))))
model.add(Dropout(dropout_rate))
model.add(Dense(1, activation='linear'))
if optimizer_choice == 'Adam':
optimizer = Adam(learning_rate=learning_rate)
else:
optimizer = Nadam(learning_rate=learning_rate)
model.compile(loss='mse', optimizer=optimizer, metrics=['mae'])
callbacks = [
EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6)
]
history = model.fit(
X_train, y_train,
epochs=50,
batch_size=16,
validation_data=(X_val, y_val),
callbacks=callbacks,
verbose=0
)
val_mae = min(history.history['val_mae'])
return val_mae

View File

@@ -0,0 +1,64 @@
"""
src/data/preprocessing.py
This module provides advanced data preprocessing routines, including parallel feature engineering,
normalization, and any additional cleanup required for the trading system.
"""
import pandas as pd
import numpy as np
import logging
from multiprocessing import Pool, cpu_count
def parallel_feature_engineering(row):
"""
Perform feature engineering on a single row of data.
This can include custom transformations, creation of new features, etc.
Currently, this is an example that adds a feature: the ratio of high to low prices.
Parameters:
- row (pandas.Series): A row from the DataFrame.
Returns:
- dict: A dictionary with engineered features.
"""
try:
engineered = row.to_dict()
engineered['hl_ratio'] = row['high'] / (row['low'] + 1e-9)
return engineered
except Exception as e:
logging.error(f"Error processing row for feature engineering: {e}")
return row.to_dict()
def preprocess_data(df):
"""
Perform preprocessing on the entire DataFrame, including parallel feature engineering
and normalization routines.
Parameters:
- df (pandas.DataFrame): Input market data.
Returns:
- pandas.DataFrame: Preprocessed data ready for modeling.
"""
logger = logging.getLogger(__name__)
logger.info("Starting parallel feature engineering...")
num_workers = max(1, cpu_count() - 2)
with Pool(processes=num_workers) as pool:
processed_data = pool.map(parallel_feature_engineering, [row for _, row in df.iterrows()])
df_processed = pd.DataFrame(processed_data)
# Normalization: Scale pricerelated columns (open, high, low, close) using minmax scaling.
price_cols = ['open', 'high', 'low', 'close']
for col in price_cols:
if col in df_processed.columns:
min_val = df_processed[col].min()
max_val = df_processed[col].max()
df_processed[col] = (df_processed[col] - min_val) / (max_val - min_val + 1e-9)
logger.info("Data preprocessing and feature engineering completed.")
return df_processed

View File

@@ -0,0 +1,214 @@
"""
src/data/technical_indicators.py
This module implements a comprehensive set of technical indicators used in the trading system.
Indicators include:
- RSI (Relative Strength Index)
- MACD (Moving Average Convergence Divergence)
- OBV (On-Balance Volume)
- ADX (Average Directional Index)
- Bollinger Bands
- MFI (Money Flow Index)
- ATR (Average True Range) for trailing stop calculations
"""
import pandas as pd
import numpy as np
def compute_rsi(series, window=14):
"""
Compute Relative Strength Index (RSI).
Parameters:
- series (pandas.Series): Series of prices.
- window (int): Number of periods to use for calculation.
Returns:
- pandas.Series: RSI values.
"""
delta = series.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(window=window, min_periods=window).mean()
avg_loss = loss.rolling(window=window, min_periods=window).mean()
rs = avg_gain / (avg_loss + 1e-9)
rsi = 100 - (100 / (1 + rs))
return rsi
def compute_macd(series, span_short=12, span_long=26, span_signal=9):
"""
Compute MACD (Moving Average Convergence Divergence) indicator.
Parameters:
- series (pandas.Series): Series of prices.
- span_short (int): Shortterm EMA span.
- span_long (int): Longterm EMA span.
- span_signal (int): Signal line EMA span.
Returns:
- pandas.Series: MACD histogram values.
"""
ema_short = series.ewm(span=span_short, adjust=False).mean()
ema_long = series.ewm(span=span_long, adjust=False).mean()
macd_line = ema_short - ema_long
signal_line = macd_line.ewm(span=span_signal, adjust=False).mean()
macd_hist = macd_line - signal_line
return macd_hist
def compute_obv(df):
"""
Compute OnBalance Volume (OBV).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'close' and 'volume'.
Returns:
- pandas.Series: OBV values.
"""
direction = np.sign(df['close'].diff()).fillna(0)
obv = (direction * df['volume']).cumsum()
return obv
def compute_adx(df, window=14):
"""
Compute Average Directional Index (ADX).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'high', 'low', and 'close'.
- window (int): Window size for calculation.
Returns:
- pandas.Series: ADX values.
"""
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = (high - close.shift()).abs()
tr3 = (low - close.shift()).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=window, min_periods=window).mean()
up_move = high.diff()
down_move = low.diff().abs()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0)
plus_di = 100 * (pd.Series(plus_dm).rolling(window=window, min_periods=window).sum() / (atr + 1e-9))
minus_di = 100 * (pd.Series(minus_dm).rolling(window=window, min_periods=window).sum() / (atr + 1e-9))
dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-9)) * 100
adx = dx.rolling(window=window, min_periods=window).mean()
return adx
def compute_bollinger_bands(series, window=20, num_std=2):
"""
Compute Bollinger Bands.
Parameters:
- series (pandas.Series): Series of prices.
- window (int): Window size for moving average.
- num_std (int): Number of standard deviations for the bands.
Returns:
- tuple: Upper band, lower band, and bandwidth as pandas.Series objects.
"""
sma = series.rolling(window=window, min_periods=window).mean()
std = series.rolling(window=window, min_periods=window).std()
upper = sma + num_std * std
lower = sma - num_std * std
bandwidth = (upper - lower) / (sma + 1e-9)
return upper, lower, bandwidth
def compute_mfi(df, window=14):
"""
Compute Money Flow Index (MFI).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'high', 'low', 'close', and 'volume'.
- window (int): Window size for calculation.
Returns:
- pandas.Series: MFI values.
"""
typical_price = (df['high'] + df['low'] + df['close']) / 3
money_flow = typical_price * df['volume']
positive_flow = []
negative_flow = []
for i in range(1, len(typical_price)):
if typical_price.iloc[i] > typical_price.iloc[i-1]:
positive_flow.append(money_flow.iloc[i])
negative_flow.append(0)
elif typical_price.iloc[i] < typical_price.iloc[i-1]:
positive_flow.append(0)
negative_flow.append(money_flow.iloc[i])
else:
positive_flow.append(0)
negative_flow.append(0)
positive_mf = pd.Series(positive_flow).rolling(window=window, min_periods=window).sum()
negative_mf = pd.Series(negative_flow).rolling(window=window, min_periods=window).sum()
mfi = 100 - (100 / (1 + (positive_mf / (negative_mf + 1e-9))))
mfi.index = df.index[1:]
mfi = mfi.reindex(df.index)
return mfi
def compute_atr(df, period=14):
"""
Compute Average True Range (ATR).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'high', 'low', and 'close'.
- period (int): Number of periods to use for the ATR calculation.
Returns:
- pandas.Series: ATR values.
"""
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = (high - close.shift()).abs()
tr3 = (low - close.shift()).abs()
true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = true_range.rolling(window=period, min_periods=period).mean()
return atr
def calculate_all_indicators(df):
"""
Calculate and append all technical indicators to the DataFrame.
Parameters:
- df (pandas.DataFrame): Input market data.
Returns:
- pandas.DataFrame: DataFrame enriched with technical indicator columns.
"""
df['RSI'] = compute_rsi(df['close'])
df['MACD'] = compute_macd(df['close'])
df['OBV'] = compute_obv(df)
df['ADX'] = compute_adx(df)
bollinger_upper, bollinger_lower, bollinger_bandwidth = compute_bollinger_bands(df['close'])
df['BB_Upper'] = bollinger_upper
df['BB_Lower'] = bollinger_lower
df['BB_Bandwidth'] = bollinger_bandwidth
df['MFI'] = compute_mfi(df)
df['ATR'] = compute_atr(df)
# Additional moving averages as sample features
df['SMA_5'] = df['close'].rolling(window=5, min_periods=5).mean()
df['SMA_10'] = df['close'].rolling(window=10, min_periods=10).mean()
# Drop rows with NaN values resulting from indicator calculations
df.dropna(inplace=True)
return df

View File

@@ -0,0 +1,104 @@
"""
src/main.py
Main entry point for the FuturesTradingAI application.
This script parses commandline arguments, sets up logging, loads configurations,
initializes data processing, model training, and can start the live trading loop or backtesting.
"""
import argparse
import os
import logging
import threading
from src.config import SAMPLE_DATA_PATH, OUTPUT_DIR, MODEL_DIR, LOG_DIR, MONITOR_CONFIG
from src.utils.logger import setup_logging
from src.data.loader import load_data
from src.data.technical_indicators import calculate_all_indicators
from src.data.preprocessing import preprocess_data
from src.models.lstm_forecaster import LSTMForecaster
from src.models.ppo_trader import PPOTrader
from src.models.hierarchical import HierarchicalTrader
from src.trading.env import FuturesTradingEnv
from src.utils.resource_monitor import start_resource_monitor
def parse_args():
"""
Parse commandline arguments.
"""
parser = argparse.ArgumentParser(
description="FuturesTradingAI - Hierarchical Learning for Live /MES Futures Trading"
)
parser.add_argument("--data", type=str, default=SAMPLE_DATA_PATH, help="Path to input CSV data file")
parser.add_argument(
"--mode", type=str, choices=["train", "backtest", "live"], default="train",
help="Mode to run: train for model training, backtest for historical simulation, live for live trading"
)
return parser.parse_args()
def main():
# Parse commandline arguments
args = parse_args()
# Ensure necessary output directories exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
# Setup logging configuration
setup_logging(LOG_DIR)
logger = logging.getLogger(__name__)
logger.info("Starting FuturesTradingAI application...")
# Optionally start system resource monitoring in a separate thread
monitor_thread = threading.Thread(target=start_resource_monitor, args=(MONITOR_CONFIG['interval'],), daemon=True)
monitor_thread.start()
# Load historical market data
logger.info(f"Loading data from {args.data}...")
df = load_data(args.data)
# Calculate technical indicators (RSI, MACD, OBV, ADX, Bollinger Bands, MFI, ATR, etc.)
logger.info("Calculating technical indicators on dataset...")
df = calculate_all_indicators(df)
# Advanced data preprocessing and feature engineering
logger.info("Preprocessing data...")
df_processed = preprocess_data(df)
# Depending on mode, run training, backtesting or live trading
if args.mode == "train":
# Train LSTM forecaster
logger.info("Initializing LSTM forecaster...")
lstm_forecaster = LSTMForecaster(data=df_processed)
lstm_forecaster.train()
# Train PPO trading agent using custom environment
logger.info("Initializing PPO trader...")
ppo_trader = PPOTrader(data=df_processed, lstm_model=lstm_forecaster.model)
ppo_trader.train()
# Save models
lstm_forecaster.save(MODEL_DIR)
ppo_trader.save(MODEL_DIR)
elif args.mode == "backtest":
# Perform backtesting using hierarchical integration
logger.info("Running backtesting simulation...")
hierarchical_trader = HierarchicalTrader(data=df_processed)
results = hierarchical_trader.backtest()
logger.info("Backtest completed. Results:")
logger.info(results)
elif args.mode == "live":
# Initiate live trading simulation
logger.info("Starting live trading simulation...")
env = FuturesTradingEnv(data=df_processed)
hierarchical_trader = HierarchicalTrader(data=df_processed, env=env)
hierarchical_trader.live_trading()
logger.info("FuturesTradingAI application finished.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,145 @@
"""
src/models/hierarchical.py
This module integrates the LSTM forecaster and the PPO trading agent to form a hierarchical decisionmaking system.
It fuses shortterm price forecasts with trading signals and enforces risk management constraints before executing trades.
"""
import logging
import numpy as np
class HierarchicalTrader:
"""
HierarchicalTrader integrates the outputs from the LSTM forecaster and PPO agent,
applies risk management filters, and simulates trade execution.
"""
def __init__(self, data, lstm_model=None, ppo_model=None, env=None, risk_reward_threshold=2.0):
"""
Initialize HierarchicalTrader.
Parameters:
- data (pandas.DataFrame): Preprocessed market data.
- lstm_model: Pretrained LSTM model for forecasting.
- ppo_model: Trained PPO model for trading decisions.
- env: Trading environment.
- risk_reward_threshold (float): Minimum risk/reward ratio required to execute trades.
"""
self.logger = logging.getLogger(__name__)
self.data = data
self.lstm_model = lstm_model
self.ppo_model = ppo_model
self.env = env
self.risk_reward_threshold = risk_reward_threshold
def assess_risk_reward(self, current_price, forecast, atr, current_position):
"""
Assess risk/reward ratio based on current price, forecast, ATR, and position.
Parameters:
- current_price (float): Current market price.
- forecast (float): Forecasted price change (as a ratio).
- atr (float): Current ATR value.
- current_position (int): Number of contracts held.
Returns:
- risk_reward (float): Calculated risk/reward ratio.
"""
expected_price = current_price * (1 + forecast)
reward = abs(expected_price - current_price)
risk = atr
risk_reward = reward / (risk + 1e-9)
return risk_reward
def generate_trade_signal(self, observation):
"""
Generate a trade signal by combining PPO agent output and LSTM forecast.
Applies risk/reward filtering.
Parameters:
- observation: Current state from the trading environment.
Returns:
- signal (int): Trade signal (e.g., number of contracts to buy/sell), or 0 for no action.
"""
# Assume observation consists of technical indicators, normalized position, and forecast (last element)
current_forecast = observation[-1]
current_position = observation[-2]
if self.ppo_model is None:
self.logger.error("PPO model not provided for hierarchical trading.")
return 0
action, _ = self.ppo_model.predict(observation, deterministic=True)
trade_signal = int(action) if isinstance(action, (int, np.integer)) else int(round(float(action)))
# For simplicity, we simulate current price and ATR extraction; in practice, these come from the environment.
current_price = 1.0 # Placeholder
atr = 0.01 # Placeholder
rr_ratio = self.assess_risk_reward(current_price, current_forecast, atr, current_position)
self.logger.info(f"Calculated risk/reward ratio: {rr_ratio:.2f}")
if rr_ratio < self.risk_reward_threshold:
self.logger.info("Risk/Reward ratio below threshold. No trade executed.")
return 0
self.logger.info(f"Trade signal generated: {trade_signal}")
return trade_signal
def backtest(self):
"""
Perform backtesting simulation over historical data using hierarchical decision making.
Returns:
- dict: Backtesting results (e.g., total PnL, win rate).
"""
self.logger.info("Starting backtest simulation...")
total_reward = 0.0
num_trades = 0
for idx, row in self.data.iterrows():
observation = self._create_observation(row)
signal = self.generate_trade_signal(observation)
reward = signal * 0.001 # Simplified reward calculation
total_reward += reward
if signal != 0:
num_trades += 1
results = {
'total_reward': total_reward,
'num_trades': num_trades,
'average_reward': total_reward / (num_trades + 1e-9)
}
self.logger.info("Backtest simulation completed.")
return results
def _create_observation(self, row):
"""
Create a synthetic observation vector from a data row.
In a full implementation, this should mirror the environment's observation space.
Parameters:
- row (pandas.Series): A row from the historical dataset.
Returns:
- numpy.array: Observation vector.
"""
tech_indicators = row.drop(labels=['time', 'close']).values
normalized_position = np.array([0.0])
forecast = np.array([0.001]) # Placeholder forecast value
observation = np.concatenate([tech_indicators, normalized_position, forecast])
return observation
def live_trading(self):
"""
Simulate live trading by continuously generating trade signals.
This function is a placeholder for integrating with live market data and execution systems.
"""
self.logger.info("Starting live trading simulation...")
import time
while True:
observation = self._create_observation(self.data.iloc[-1])
signal = self.generate_trade_signal(observation)
self.logger.info(f"Live trade signal: {signal}")
# In production, signal triggers execution via the execution module.
time.sleep(5) # Simulate 5minute reevaluation in a live setting.

View File

@@ -0,0 +1,109 @@
"""
src/models/ppo_trader.py
This module implements a Proximal Policy Optimization (PPO) agent for trading decisions
within a custom Gym environment tailored for /MES futures trading.
The agent uses technical indicators and LSTM forecasts as inputs.
"""
import os
import logging
import gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from src.trading.env import FuturesTradingEnv
from src.config import TRADING_ENV_CONFIG
class PPOTrader:
"""
PPOTrader encapsulates the training and evaluation of a PPO agent in a custom futures trading environment.
"""
def __init__(self, data, lstm_model, env_config=TRADING_ENV_CONFIG, model_save_path=None):
"""
Initialize the PPOTrader.
Parameters:
- data (pandas.DataFrame): Preprocessed market data.
- lstm_model: Pretrained LSTM model used for forecasting.
- env_config (dict): Configuration for the trading environment.
- model_save_path (str): Directory to save the trained PPO model.
"""
self.logger = logging.getLogger(__name__)
self.data = data
self.lstm_model = lstm_model
self.env_config = env_config
self.model_save_path = model_save_path if model_save_path else os.getcwd()
# Initialize the custom trading environment with required parameters
self.env = FuturesTradingEnv(data=self.data, lstm_model=self.lstm_model, config=self.env_config)
# Wrap environment in DummyVecEnv for PPO training
self.vec_env = DummyVecEnv([lambda: self.env])
# Define PPO hyperparameters (can be customized)
self.ppo_params = {
'n_steps': self.env_config.get('n_steps', 2048),
'batch_size': 64,
'gae_lambda': 0.95,
'gamma': 0.99,
'learning_rate': 3e-4,
'ent_coef': 0.0,
'verbose': 1
}
self.model = None
def train(self, total_timesteps=100000):
"""
Train the PPO model.
Parameters:
- total_timesteps (int): Total number of timesteps for training.
"""
self.logger.info("Starting PPO training...")
self.model = PPO('MlpPolicy', self.vec_env, **self.ppo_params)
self.model.learn(total_timesteps=total_timesteps)
self.logger.info("PPO training completed.")
return self.model
def predict(self, obs):
"""
Predict the action for a given observation.
Parameters:
- obs: Current observation from the trading environment.
Returns:
- action: Predicted action.
"""
if self.model is None:
self.logger.error("Model is not trained yet!")
return None
action, _ = self.model.predict(obs, deterministic=True)
return action
def save(self, save_dir):
"""
Save the trained PPO model.
Parameters:
- save_dir (str): Directory where the model will be saved.
"""
if self.model is None:
self.logger.error("No model to save. Train the model first.")
return
if not os.path.exists(save_dir):
os.makedirs(save_dir)
model_path = os.path.join(save_dir, 'ppo_trader.zip')
self.model.save(model_path)
self.logger.info(f"PPO model saved to {model_path}")
def load(self, model_path):
"""
Load a pretrained PPO model.
Parameters:
- model_path (str): Path to the saved PPO model.
"""
self.model = PPO.load(model_path, env=self.vec_env)
self.logger.info(f"PPO model loaded from {model_path}")

View File

@@ -0,0 +1,163 @@
"""
src/trading/env.py
This module defines a custom OpenAI Gym environment for /MES futures trading on a 5minute timeframe.
The environment includes logic for:
- Tracking positions (long/short)
- Evaluating positions every 5 minutes
- Implementing ATRbased trailing stops for risk management
- Allowing a single contract initially with hooks for multicontract trading in the future
"""
import gym
from gym import spaces
import numpy as np
import pandas as pd
import logging
import threading
class FuturesTradingEnv(gym.Env):
"""
Custom Gym environment for futures trading.
"""
metadata = {'render.modes': ['human']}
def __init__(self, data, lstm_model=None, config=None):
"""
Initialize the trading environment.
Parameters:
- data (pandas.DataFrame): Preprocessed market data.
- lstm_model: Pretrained LSTM model for price forecasting (optional).
- config (dict): Configuration parameters for the environment.
"""
super(FuturesTradingEnv, self).__init__()
self.logger = logging.getLogger(__name__)
self.data = data.reset_index(drop=True)
self.lstm_model = lstm_model
self.config = config if config is not None else {}
# Define action space: discrete orders ranging from -max_contracts to +max_contracts.
self.max_contracts = self.config.get('max_contracts', 1)
self.action_space = spaces.Discrete(2 * self.max_contracts + 1)
# Define observation space: technical indicators plus current position and forecast.
self.tech_indicator_cols = [col for col in self.data.columns if col not in ['time', 'close']]
obs_dim = len(self.tech_indicator_cols) + 2 # +1 for normalized position, +1 for forecast.
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32)
# Trading variables
self.current_step = 0
self.contracts_held = 0
self.entry_price = None
# ATRbased trailing stop parameters
self.atr_period = self.config.get('atr_period', 14)
self.trailing_stop_multiplier = self.config.get('trailing_stop_multiplier', 1.5)
self.trailing_stop = None
# Lock for threadsafe LSTM predictions
self.lstm_lock = threading.Lock()
def _get_observation(self):
"""
Construct the observation vector from the current market data.
Returns:
- observation (numpy.array): Observation vector for the current step.
"""
row = self.data.iloc[self.current_step]
tech_indicators = row[self.tech_indicator_cols].values.astype(np.float32)
norm_position = np.array([self.contracts_held / self.max_contracts], dtype=np.float32)
if self.lstm_model and self.current_step >= self.config.get('window_size', 15):
# For proper forecasting, construct an input sequence; here we use a placeholder.
forecast = np.array([0.001], dtype=np.float32)
else:
forecast = np.array([0.0], dtype=np.float32)
observation = np.concatenate([tech_indicators, norm_position, forecast])
return observation
def reset(self):
"""
Reset the environment to an initial state.
Returns:
- observation (numpy.array): The initial observation.
"""
self.current_step = 0
self.contracts_held = 0
self.entry_price = None
self.trailing_stop = None
return self._get_observation()
def step(self, action):
"""
Execute one time step in the environment.
Parameters:
- action (int): Action taken by the agent (discrete order adjustment).
Returns:
- observation (numpy.array): Next state observation.
- reward (float): Reward obtained from the action.
- done (bool): Whether the episode has ended.
- info (dict): Additional environment info.
"""
trade_adjustment = action - self.max_contracts
current_price = self.data.iloc[self.current_step]['close']
transaction_cost = self.config.get('transaction_cost', 0.001) * abs(trade_adjustment) * current_price
reward = 0.0
if trade_adjustment != 0:
if self.contracts_held == 0:
self.contracts_held = trade_adjustment
self.entry_price = current_price
atr = self.data.iloc[self.current_step].get('ATR', 0.01)
self.trailing_stop = current_price - self.trailing_stop_multiplier * atr if trade_adjustment > 0 else current_price + self.trailing_stop_multiplier * atr
else:
prev_position = self.contracts_held
self.contracts_held += trade_adjustment
if self.contracts_held != 0:
self.entry_price = (self.entry_price * prev_position + current_price * trade_adjustment) / self.contracts_held
else:
self.entry_price = None
reward -= transaction_cost
else:
if self.contracts_held != 0 and self.entry_price is not None:
reward = (current_price - self.entry_price) * self.contracts_held
# ATRbased trailing stop logic:
if self.trailing_stop is not None:
if self.contracts_held > 0 and current_price < self.trailing_stop:
self.logger.info("ATR trailing stop triggered for long position.")
reward += (current_price - self.entry_price) * self.contracts_held
self.contracts_held = 0
self.entry_price = None
self.trailing_stop = None
elif self.contracts_held < 0 and current_price > self.trailing_stop:
self.logger.info("ATR trailing stop triggered for short position.")
reward += (self.entry_price - current_price) * abs(self.contracts_held)
self.contracts_held = 0
self.entry_price = None
self.trailing_stop = None
self.current_step += 1
done = self.current_step >= len(self.data) - 1
observation = self._get_observation()
info = {
'contracts_held': self.contracts_held,
'entry_price': self.entry_price
}
return observation, reward, done, info
def render(self, mode='human'):
"""
Render the current state.
"""
current_price = self.data.iloc[self.current_step]['close']
print(f"Step: {self.current_step}, Price: {current_price:.4f}, Contracts Held: {self.contracts_held}, Entry Price: {self.entry_price}")

View File

@@ -0,0 +1,53 @@
"""
src/trading/execution.py
This module abstracts the interaction with broker APIs for executing market orders.
It provides placeholder functions for live order execution.
In a production setting, integrate these functions with the appropriate broker API.
"""
import logging
def execute_order(order_type, quantity, price=None):
"""
Execute a market order.
Parameters:
- order_type (str): 'buy' or 'sell'.
- quantity (int): Number of contracts.
- price (float): Execution price (None for market orders).
Returns:
- dict: Order execution confirmation details.
"""
logger = logging.getLogger(__name__)
logger.info(f"Executing {order_type.upper()} order for {quantity} contract(s) at {price if price else 'Market Price'}.")
# Placeholder: In production, execute order via broker API.
order_confirmation = {
'order_type': order_type,
'quantity': quantity,
'price': price,
'status': 'executed'
}
return order_confirmation
def get_live_market_data():
"""
Retrieve live market data from a data feed.
Returns:
- dict: Latest market data (e.g., price and volume).
"""
logger = logging.getLogger(__name__)
live_data = {
'time': None,
'open': None,
'high': None,
'low': None,
'close': 1.0, # Dummy value for testing
'volume': None
}
logger.info("Retrieved live market data (placeholder).")
return live_data

View File

@@ -0,0 +1,49 @@
"""
src/trading/risk_manager.py
This module provides advanced risk management functionalities.
It calculates the current risk/reward ratio and manages live positions,
including enforcing a minimum 2:1 risk/reward ratio and ATRbased trailing stops.
"""
import logging
def calculate_risk_reward(current_price, entry_price, atr):
"""
Calculate the risk/reward ratio for a position.
Parameters:
- current_price (float): Current market price.
- entry_price (float): Position entry price.
- atr (float): Average True Range (volatility measure).
Returns:
- float: Risk/reward ratio.
"""
logger = logging.getLogger(__name__)
risk = atr
reward = abs(current_price - entry_price)
if risk == 0:
logger.warning("ATR is zero; risk/reward ratio undefined. Defaulting to 0.")
return 0.0
risk_reward = reward / risk
return risk_reward
def is_trade_acceptable(current_price, entry_price, atr, minimum_ratio=2.0):
"""
Check if the trade meets the required minimum risk/reward ratio.
Parameters:
- current_price (float): Current price.
- entry_price (float): Entry price.
- atr (float): Average True Range.
- minimum_ratio (float): Minimum required ratio.
Returns:
- bool: True if acceptable, False otherwise.
"""
rr_ratio = calculate_risk_reward(current_price, entry_price, atr)
return rr_ratio >= minimum_ratio

View File

@@ -0,0 +1,32 @@
"""
src/utils/logger.py
This module sets up centralized logging for the FuturesTradingAI system.
Logs are output to both console and file.
"""
import logging
import os
from src.config import LOGGING_CONFIG
def setup_logging(log_dir):
"""
Setup logging to a file and the console.
Parameters:
- log_dir (str): Directory where log files will be stored.
"""
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, 'futures_trading_ai.log')
logging.basicConfig(
level=LOGGING_CONFIG.get('log_level', 'INFO'),
format=LOGGING_CONFIG.get('log_format'),
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)

View File

@@ -0,0 +1,42 @@
"""
src/utils/resource_monitor.py
This module provides functions to monitor system resources (CPU and GPU).
It logs resource utilization at specified intervals.
"""
import logging
import time
import psutil
import GPUtil
def monitor_resources(interval=60):
"""
Monitor CPU and GPU resources at regular intervals.
Parameters:
- interval (int): Seconds between checks.
"""
logger = logging.getLogger(__name__)
while True:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
logger.info(f"CPU usage per core: {cpu_percent}")
gpus = GPUtil.getGPUs()
for gpu in gpus:
logger.info(
f"GPU {gpu.id} ({gpu.name}): Load: {gpu.load*100:.1f}%, Memory: "
f"{gpu.memoryUsed}/{gpu.memoryTotal} MB, Temperature: {gpu.temperature} °C"
)
time.sleep(interval)
def start_resource_monitor(interval=60):
"""
Start monitoring system resources.
Parameters:
- interval (int): Interval in seconds.
"""
monitor_resources(interval)

Binary file not shown.

View File

@@ -0,0 +1,38 @@
"""
examples/run_backtest.py
Script to run backtesting simulation using historical market data.
"""
import argparse
import pandas as pd
from src.data.loader import load_data
from src.data.technical_indicators import calculate_all_indicators
from src.data.preprocessing import preprocess_data
from src.models.hierarchical import HierarchicalTrader
import logging
from src.utils.logger import setup_logging
import os
def main():
parser = argparse.ArgumentParser(description="Run backtesting simulation for FuturesTradingAI")
parser.add_argument("--data", type=str, required=True, help="Path to CSV data file for backtesting")
args = parser.parse_args()
log_dir = os.path.join(os.getcwd(), "logs")
setup_logging(log_dir)
logger = logging.getLogger(__name__)
df = load_data(args.data)
df = calculate_all_indicators(df)
df_processed = preprocess_data(df)
hierarchical_trader = HierarchicalTrader(data=df_processed)
results = hierarchical_trader.backtest()
logger.info("Backtesting Results:")
logger.info(results)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,42 @@
"""
examples/run_live.py
Script to initiate live trading simulation with simulated live data.
"""
import argparse
import pandas as pd
from src.data.loader import load_data
from src.data.technical_indicators import calculate_all_indicators
from src.data.preprocessing import preprocess_data
from src.models.hierarchical import HierarchicalTrader
from src.trading.env import FuturesTradingEnv
import logging
from src.utils.logger import setup_logging
import os
def main():
parser = argparse.ArgumentParser(description="Run live trading simulation for FuturesTradingAI")
parser.add_argument("--data", type=str, required=True, help="Path to CSV data file for simulation")
args = parser.parse_args()
log_dir = os.path.join(os.getcwd(), "logs")
setup_logging(log_dir)
logger = logging.getLogger(__name__)
df = load_data(args.data)
df = calculate_all_indicators(df)
df_processed = preprocess_data(df)
env = FuturesTradingEnv(data=df_processed)
hierarchical_trader = HierarchicalTrader(data=df_processed, env=env)
logger.info("Starting live trading simulation. Press Ctrl+C to exit.")
try:
hierarchical_trader.live_trading()
except KeyboardInterrupt:
logger.info("Live trading simulation terminated by user.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
2025-04-12 20:44:28,103 - __main__ - INFO - Starting FuturesTradingAI application...
2025-04-12 20:44:28,104 - __main__ - INFO - Loading data from /home/midas/codeWS/Projects/MidasTechnologiesINC/MidasEngine/src/MidasHL/data/sample_data.csv...
2025-04-12 20:44:28,107 - src.data.loader - ERROR - Data file not found: /home/midas/codeWS/Projects/MidasTechnologiesINC/MidasEngine/src/MidasHL/data/sample_data.csv

View File

@@ -0,0 +1,12 @@
tensorflow
gym
stable-baselines3
optuna
numpy
pandas
scikit-learn
joblib
flask
psutil
GPUtil

58
src/MidasHL/setup.py Normal file
View File

@@ -0,0 +1,58 @@
"""
setup.py for FuturesTradingAI
This script sets up the package installation, including compilation of performancecritical C modules.
It also ensures that NumPy headers are available and adds the IBKR API package (ibapi) to the dependencies.
"""
import os
from setuptools import setup, Extension
from setuptools.command.build_ext import build_ext
import numpy
# Custom build_ext command (can be extended for platform-specific flags)
class BuildFastIndicators(build_ext):
def build_extension(self, ext):
build_ext.build_extension(self, ext)
# Define the C extension module for fast technical indicator computations.
fast_indicators_module = Extension(
'fast_indicators', # Name of the generated module
sources=['src/c_modules/fast_indicators.c'], # Path to the C source file
extra_compile_args=[], # Extra compiler flags, if needed
include_dirs=[numpy.get_include()] # Add NumPy include directory
)
setup(
name='FuturesTradingAI',
version='0.1.0',
description='Hierarchical Learning for Live /MES Futures Trading',
author='Your Name',
author_email='your.email@example.com',
packages=[
'src',
'src.data',
'src.models',
'src.trading',
'src.api',
'src.utils'
],
ext_modules=[fast_indicators_module],
cmdclass={'build_ext': BuildFastIndicators},
install_requires=[
'tensorflow',
'gym',
'stable-baselines3',
'optuna',
'numpy',
'pandas',
'scikit-learn',
'joblib',
'flask',
'psutil',
'GPUtil',
'ibapi' # or "ib_insync" if you prefer an easier-to-use interface for IBKR.
],
python_requires='>=3.7'
)

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,85 @@
"""
src/api/api_ai.py
This module implements an API layer exposing endpoints for the trading AI.
Using Flask, it offers RESTful endpoints that receive market data and return trade signals.
"""
from flask import Flask, request, jsonify
import logging
from src.models.hierarchical import HierarchicalTrader
from src.models.lstm_forecaster import LSTMForecaster
from src.models.ppo_trader import PPOTrader
import pandas as pd
# Initialize Flask app
app = Flask(__name__)
logger = logging.getLogger(__name__)
# Global variable to hold the trading model
global_hierarchical_trader = None
@app.route('/initialize', methods=['POST'])
def initialize_models():
"""
Initialize trading models.
Expects JSON payload with parameters such as data path and model paths.
Example payload:
{
"data_path": "path/to/data.csv",
"lstm_model_path": "path/to/lstm_model.h5",
"ppo_model_path": "path/to/ppo_trader.zip"
}
"""
global global_hierarchical_trader
data = request.json
data_path = data.get('data_path')
lstm_model_path = data.get('lstm_model_path')
ppo_model_path = data.get('ppo_model_path')
try:
df = pd.read_csv(data_path, parse_dates=['time'])
except Exception as e:
logger.error(f"Error loading data: {e}")
return jsonify({"status": "error", "message": "Failed to load data"}), 400
# For illustration, create new models. In production load models from disk.
lstm_forecaster = LSTMForecaster(data=df)
ppo_trader = PPOTrader(data=df, lstm_model=lstm_forecaster.model)
ppo_trader.load(ppo_model_path)
global_hierarchical_trader = HierarchicalTrader(data=df, lstm_model=lstm_forecaster.model, ppo_model=ppo_trader.model)
logger.info("Models initialized successfully.")
return jsonify({"status": "success", "message": "Models initialized."}), 200
@app.route('/trade_signal', methods=['POST'])
def get_trade_signal():
"""
Endpoint to obtain a trade signal based on a provided observation.
Expected payload:
{
"observation": [ ... ]
}
Returns:
{
"trade_signal": <integer signal>
}
"""
global global_hierarchical_trader
if global_hierarchical_trader is None:
return jsonify({"status": "error", "message": "Models not initialized."}), 400
data = request.json
observation = data.get('observation')
if observation is None:
return jsonify({"status": "error", "message": "Observation not provided."}), 400
signal = global_hierarchical_trader.generate_trade_signal(observation)
return jsonify({"trade_signal": signal}), 200
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,93 @@
/*
src/c_modules/fast_indicators.c
This C module implements performancecritical technical indicator calculations.
Here we implement a fast Average True Range (ATR) computation.
Compile this module as a Python extension.
*/
#include <Python.h>
#include <numpy/arrayobject.h>
#include <math.h>
// Function to compute ATR given high, low, and close arrays and a period.
static PyObject* compute_atr(PyObject* self, PyObject* args) {
PyArrayObject *high_array, *low_array, *close_array;
int period;
if (!PyArg_ParseTuple(args, "O!O!O!i",
&PyArray_Type, &high_array,
&PyArray_Type, &low_array,
&PyArray_Type, &close_array,
&period))
{
return NULL;
}
int n = (int)PyArray_SIZE(high_array);
if (n < period) {
PyErr_SetString(PyExc_ValueError, "Array length is smaller than period");
return NULL;
}
// Create output array for ATR values.
npy_intp dims[1] = { n };
PyObject* atr_obj = PyArray_SimpleNew(1, dims, NPY_DOUBLE);
double* atr = (double*)PyArray_DATA((PyArrayObject*)atr_obj);
double tr, tr_sum = 0.0;
atr[0] = 0.0;
// Calculate True Range for each element from index 1.
for (int i = 1; i < n; i++) {
double high = *(double*)PyArray_GETPTR1(high_array, i);
double low = *(double*)PyArray_GETPTR1(low_array, i);
double prev_close = *(double*)PyArray_GETPTR1(close_array, i - 1);
double diff1 = high - low;
double diff2 = fabs(high - prev_close);
double diff3 = fabs(low - prev_close);
tr = diff1;
if (diff2 > tr)
tr = diff2;
if (diff3 > tr)
tr = diff3;
atr[i] = tr;
}
// For indices less than 'period', set ATR to 0.
for (int i = 0; i < period; i++) {
atr[i] = 0.0;
}
// Compute ATR as the simple moving average of TR over the period.
for (int i = period; i < n; i++) {
tr_sum = 0.0;
for (int j = i - period + 1; j <= i; j++) {
tr_sum += atr[j];
}
atr[i] = tr_sum / period;
}
return atr_obj;
}
// Module's method table.
static PyMethodDef FastIndicatorsMethods[] = {
{"compute_atr", compute_atr, METH_VARARGS, "Compute ATR using high, low, and close prices."},
{NULL, NULL, 0, NULL}
};
// Module definition structure.
static struct PyModuleDef fastindicatorsmodule = {
PyModuleDef_HEAD_INIT,
"fast_indicators", // Module name
"Module for fast technical indicator calculations.",
-1,
FastIndicatorsMethods
};
// Module initialization function.
PyMODINIT_FUNC PyInit_fast_indicators(void) {
import_array(); // Initialize NumPy API.
return PyModule_Create(&fastindicatorsmodule);
}

View File

@@ -0,0 +1,23 @@
"""
src/c_modules/setup_fast_indicators.py
This script can be used to compile the fast_indicators C extension module independently.
Run this script to build the C module.
"""
from setuptools import setup, Extension
import numpy
fast_indicators_module = Extension(
'fast_indicators',
sources=['fast_indicators.c'],
include_dirs=[numpy.get_include()]
)
setup(
name='fast_indicators',
version='0.1',
description='Fast technical indicator computations in C',
ext_modules=[fast_indicators_module]
)

68
src/MidasHL/src/config.py Normal file
View File

@@ -0,0 +1,68 @@
"""
src/config.py
This module holds central configuration settings and hyperparameters for the FuturesTradingAI system.
It includes settings for data paths, model training, trading environment parameters, risk management, etc.
"""
import os
# Base directory configuration
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# Data paths
DATA_DIR = os.path.join(BASE_DIR, 'data')
SAMPLE_DATA_PATH = os.path.join(DATA_DIR, 'sample_data.csv')
# Output directories
OUTPUT_DIR = os.path.join(BASE_DIR, 'output')
MODEL_DIR = os.path.join(OUTPUT_DIR, 'models')
LOG_DIR = os.path.join(OUTPUT_DIR, 'logs')
# LSTM model hyperparameters
LSTM_PARAMS = {
'num_layers': 2, # Number of LSTM layers
'units': 64, # Number of units per LSTM layer
'dropout_rate': 0.2, # Dropout rate to prevent overfitting
'learning_rate': 1e-3, # Initial learning rate
'l2_regularization': 1e-4 # L2 regularization factor
}
# PPO agent hyperparameters
PPO_PARAMS = {
'n_steps': 2048,
'batch_size': 64,
'gae_lambda': 0.95,
'gamma': 0.99,
'learning_rate': 3e-4,
'ent_coef': 0.0,
'verbose': 1
}
# Trading environment configuration
TRADING_ENV_CONFIG = {
'window_size': 15, # Lookback window size for LSTM forecasting
'transaction_cost': 0.001, # Transaction cost fee as a fraction of notional
'max_contracts': 1, # Initial max number of contracts for trading (scalable for multicontract trading)
'risk_reward_threshold': 2.0, # Minimum risk/reward ratio required to execute a trade (2:1)
'atr_period': 14, # Period for ATR calculation
'trailing_stop_multiplier': 1.5 # Multiplier for ATRbased trailing stop
}
# API configuration
API_CONFIG = {
'host': '127.0.0.1', # Host IP for API server
'port': 5000 # Port for API server
}
# Logging settings
LOGGING_CONFIG = {
'log_level': 'INFO', # Logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL
'log_format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
}
# Misc configuration for resource monitoring, etc.
MONITOR_CONFIG = {
'interval': 60 # Monitor system resources every 60 seconds
}

View File

@@ -0,0 +1,59 @@
"""
src/data/loader.py
This module provides functions to load and clean CSV market data.
"""
import pandas as pd
import logging
def load_data(file_path):
"""
Load CSV data from the specified file path.
Parameters:
- file_path (str): Path to the CSV file.
Returns:
- pandas.DataFrame: Loaded and cleaned data.
"""
logger = logging.getLogger(__name__)
try:
# Attempt to read the CSV with proper parsing of date columns
df = pd.read_csv(file_path, parse_dates=['time'])
logger.info(f"Successfully loaded data from {file_path}")
except FileNotFoundError:
logger.error(f"Data file not found: {file_path}")
raise
except pd.errors.ParserError as e:
logger.error(f"Error parsing the CSV file: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error loading data: {e}")
raise
# Standardize column names (e.g., time, open, high, low, close, volume)
expected_cols = ['time', 'open', 'high', 'low', 'close', 'volume']
df.columns = [col.strip().lower() for col in df.columns]
if not all(col in df.columns for col in expected_cols):
logger.warning("Input data does not contain all expected columns. Attempting to map columns.")
# Rename columns if necessary (this can be extended based on the actual CSV structure)
rename_mapping = {
'time': 'time',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
}
df = df.rename(columns=rename_mapping)
# Sort data chronologically and reset index
df.sort_values(by='time', inplace=True)
df.reset_index(drop=True, inplace=True)
# Handle missing values by forward filling
df.fillna(method='ffill', inplace=True)
return df

View File

@@ -0,0 +1,64 @@
"""
src/data/preprocessing.py
This module provides advanced data preprocessing routines, including parallel feature engineering,
normalization, and any additional cleanup required for the trading system.
"""
import pandas as pd
import numpy as np
import logging
from multiprocessing import Pool, cpu_count
def parallel_feature_engineering(row):
"""
Perform feature engineering on a single row of data.
This can include custom transformations, creation of new features, etc.
Currently, this is an example that adds a feature: the ratio of high to low prices.
Parameters:
- row (pandas.Series): A row from the DataFrame.
Returns:
- dict: A dictionary with engineered features.
"""
try:
engineered = row.to_dict()
engineered['hl_ratio'] = row['high'] / (row['low'] + 1e-9)
return engineered
except Exception as e:
logging.error(f"Error processing row for feature engineering: {e}")
return row.to_dict()
def preprocess_data(df):
"""
Perform preprocessing on the entire DataFrame, including parallel feature engineering
and normalization routines.
Parameters:
- df (pandas.DataFrame): Input market data.
Returns:
- pandas.DataFrame: Preprocessed data ready for modeling.
"""
logger = logging.getLogger(__name__)
logger.info("Starting parallel feature engineering...")
num_workers = max(1, cpu_count() - 2)
with Pool(processes=num_workers) as pool:
processed_data = pool.map(parallel_feature_engineering, [row for _, row in df.iterrows()])
df_processed = pd.DataFrame(processed_data)
# Normalization: Scale pricerelated columns (open, high, low, close) using minmax scaling.
price_cols = ['open', 'high', 'low', 'close']
for col in price_cols:
if col in df_processed.columns:
min_val = df_processed[col].min()
max_val = df_processed[col].max()
df_processed[col] = (df_processed[col] - min_val) / (max_val - min_val + 1e-9)
logger.info("Data preprocessing and feature engineering completed.")
return df_processed

View File

@@ -0,0 +1,214 @@
"""
src/data/technical_indicators.py
This module implements a comprehensive set of technical indicators used in the trading system.
Indicators include:
- RSI (Relative Strength Index)
- MACD (Moving Average Convergence Divergence)
- OBV (On-Balance Volume)
- ADX (Average Directional Index)
- Bollinger Bands
- MFI (Money Flow Index)
- ATR (Average True Range) for trailing stop calculations
"""
import pandas as pd
import numpy as np
def compute_rsi(series, window=14):
"""
Compute Relative Strength Index (RSI).
Parameters:
- series (pandas.Series): Series of prices.
- window (int): Number of periods to use for calculation.
Returns:
- pandas.Series: RSI values.
"""
delta = series.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(window=window, min_periods=window).mean()
avg_loss = loss.rolling(window=window, min_periods=window).mean()
rs = avg_gain / (avg_loss + 1e-9)
rsi = 100 - (100 / (1 + rs))
return rsi
def compute_macd(series, span_short=12, span_long=26, span_signal=9):
"""
Compute MACD (Moving Average Convergence Divergence) indicator.
Parameters:
- series (pandas.Series): Series of prices.
- span_short (int): Shortterm EMA span.
- span_long (int): Longterm EMA span.
- span_signal (int): Signal line EMA span.
Returns:
- pandas.Series: MACD histogram values.
"""
ema_short = series.ewm(span=span_short, adjust=False).mean()
ema_long = series.ewm(span=span_long, adjust=False).mean()
macd_line = ema_short - ema_long
signal_line = macd_line.ewm(span=span_signal, adjust=False).mean()
macd_hist = macd_line - signal_line
return macd_hist
def compute_obv(df):
"""
Compute OnBalance Volume (OBV).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'close' and 'volume'.
Returns:
- pandas.Series: OBV values.
"""
direction = np.sign(df['close'].diff()).fillna(0)
obv = (direction * df['volume']).cumsum()
return obv
def compute_adx(df, window=14):
"""
Compute Average Directional Index (ADX).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'high', 'low', and 'close'.
- window (int): Window size for calculation.
Returns:
- pandas.Series: ADX values.
"""
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = (high - close.shift()).abs()
tr3 = (low - close.shift()).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=window, min_periods=window).mean()
up_move = high.diff()
down_move = low.diff().abs()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0)
plus_di = 100 * (pd.Series(plus_dm).rolling(window=window, min_periods=window).sum() / (atr + 1e-9))
minus_di = 100 * (pd.Series(minus_dm).rolling(window=window, min_periods=window).sum() / (atr + 1e-9))
dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-9)) * 100
adx = dx.rolling(window=window, min_periods=window).mean()
return adx
def compute_bollinger_bands(series, window=20, num_std=2):
"""
Compute Bollinger Bands.
Parameters:
- series (pandas.Series): Series of prices.
- window (int): Window size for moving average.
- num_std (int): Number of standard deviations for the bands.
Returns:
- tuple: Upper band, lower band, and bandwidth as pandas.Series objects.
"""
sma = series.rolling(window=window, min_periods=window).mean()
std = series.rolling(window=window, min_periods=window).std()
upper = sma + num_std * std
lower = sma - num_std * std
bandwidth = (upper - lower) / (sma + 1e-9)
return upper, lower, bandwidth
def compute_mfi(df, window=14):
"""
Compute Money Flow Index (MFI).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'high', 'low', 'close', and 'volume'.
- window (int): Window size for calculation.
Returns:
- pandas.Series: MFI values.
"""
typical_price = (df['high'] + df['low'] + df['close']) / 3
money_flow = typical_price * df['volume']
positive_flow = []
negative_flow = []
for i in range(1, len(typical_price)):
if typical_price.iloc[i] > typical_price.iloc[i-1]:
positive_flow.append(money_flow.iloc[i])
negative_flow.append(0)
elif typical_price.iloc[i] < typical_price.iloc[i-1]:
positive_flow.append(0)
negative_flow.append(money_flow.iloc[i])
else:
positive_flow.append(0)
negative_flow.append(0)
positive_mf = pd.Series(positive_flow).rolling(window=window, min_periods=window).sum()
negative_mf = pd.Series(negative_flow).rolling(window=window, min_periods=window).sum()
mfi = 100 - (100 / (1 + (positive_mf / (negative_mf + 1e-9))))
mfi.index = df.index[1:]
mfi = mfi.reindex(df.index)
return mfi
def compute_atr(df, period=14):
"""
Compute Average True Range (ATR).
Parameters:
- df (pandas.DataFrame): DataFrame containing 'high', 'low', and 'close'.
- period (int): Number of periods to use for the ATR calculation.
Returns:
- pandas.Series: ATR values.
"""
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = (high - close.shift()).abs()
tr3 = (low - close.shift()).abs()
true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = true_range.rolling(window=period, min_periods=period).mean()
return atr
def calculate_all_indicators(df):
"""
Calculate and append all technical indicators to the DataFrame.
Parameters:
- df (pandas.DataFrame): Input market data.
Returns:
- pandas.DataFrame: DataFrame enriched with technical indicator columns.
"""
df['RSI'] = compute_rsi(df['close'])
df['MACD'] = compute_macd(df['close'])
df['OBV'] = compute_obv(df)
df['ADX'] = compute_adx(df)
bollinger_upper, bollinger_lower, bollinger_bandwidth = compute_bollinger_bands(df['close'])
df['BB_Upper'] = bollinger_upper
df['BB_Lower'] = bollinger_lower
df['BB_Bandwidth'] = bollinger_bandwidth
df['MFI'] = compute_mfi(df)
df['ATR'] = compute_atr(df)
# Additional moving averages as sample features
df['SMA_5'] = df['close'].rolling(window=5, min_periods=5).mean()
df['SMA_10'] = df['close'].rolling(window=10, min_periods=10).mean()
# Drop rows with NaN values resulting from indicator calculations
df.dropna(inplace=True)
return df

104
src/MidasHL/src/main.py Normal file
View File

@@ -0,0 +1,104 @@
"""
src/main.py
Main entry point for the FuturesTradingAI application.
This script parses commandline arguments, sets up logging, loads configurations,
initializes data processing, model training, and can start the live trading loop or backtesting.
"""
import argparse
import os
import logging
import threading
from src.config import SAMPLE_DATA_PATH, OUTPUT_DIR, MODEL_DIR, LOG_DIR, MONITOR_CONFIG
from src.utils.logger import setup_logging
from src.data.loader import load_data
from src.data.technical_indicators import calculate_all_indicators
from src.data.preprocessing import preprocess_data
from src.models.lstm_forecaster import LSTMForecaster
from src.models.ppo_trader import PPOTrader
from src.models.hierarchical import HierarchicalTrader
from src.trading.env import FuturesTradingEnv
from src.utils.resource_monitor import start_resource_monitor
def parse_args():
"""
Parse commandline arguments.
"""
parser = argparse.ArgumentParser(
description="FuturesTradingAI - Hierarchical Learning for Live /MES Futures Trading"
)
parser.add_argument("--data", type=str, default=SAMPLE_DATA_PATH, help="Path to input CSV data file")
parser.add_argument(
"--mode", type=str, choices=["train", "backtest", "live"], default="train",
help="Mode to run: train for model training, backtest for historical simulation, live for live trading"
)
return parser.parse_args()
def main():
# Parse commandline arguments
args = parse_args()
# Ensure necessary output directories exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
# Setup logging configuration
setup_logging(LOG_DIR)
logger = logging.getLogger(__name__)
logger.info("Starting FuturesTradingAI application...")
# Optionally start system resource monitoring in a separate thread
monitor_thread = threading.Thread(target=start_resource_monitor, args=(MONITOR_CONFIG['interval'],), daemon=True)
monitor_thread.start()
# Load historical market data
logger.info(f"Loading data from {args.data}...")
df = load_data(args.data)
# Calculate technical indicators (RSI, MACD, OBV, ADX, Bollinger Bands, MFI, ATR, etc.)
logger.info("Calculating technical indicators on dataset...")
df = calculate_all_indicators(df)
# Advanced data preprocessing and feature engineering
logger.info("Preprocessing data...")
df_processed = preprocess_data(df)
# Depending on mode, run training, backtesting or live trading
if args.mode == "train":
# Train LSTM forecaster
logger.info("Initializing LSTM forecaster...")
lstm_forecaster = LSTMForecaster(data=df_processed)
lstm_forecaster.train()
# Train PPO trading agent using custom environment
logger.info("Initializing PPO trader...")
ppo_trader = PPOTrader(data=df_processed, lstm_model=lstm_forecaster.model)
ppo_trader.train()
# Save models
lstm_forecaster.save(MODEL_DIR)
ppo_trader.save(MODEL_DIR)
elif args.mode == "backtest":
# Perform backtesting using hierarchical integration
logger.info("Running backtesting simulation...")
hierarchical_trader = HierarchicalTrader(data=df_processed)
results = hierarchical_trader.backtest()
logger.info("Backtest completed. Results:")
logger.info(results)
elif args.mode == "live":
# Initiate live trading simulation
logger.info("Starting live trading simulation...")
env = FuturesTradingEnv(data=df_processed)
hierarchical_trader = HierarchicalTrader(data=df_processed, env=env)
hierarchical_trader.live_trading()
logger.info("FuturesTradingAI application finished.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,145 @@
"""
src/models/hierarchical.py
This module integrates the LSTM forecaster and the PPO trading agent to form a hierarchical decisionmaking system.
It fuses shortterm price forecasts with trading signals and enforces risk management constraints before executing trades.
"""
import logging
import numpy as np
class HierarchicalTrader:
"""
HierarchicalTrader integrates the outputs from the LSTM forecaster and PPO agent,
applies risk management filters, and simulates trade execution.
"""
def __init__(self, data, lstm_model=None, ppo_model=None, env=None, risk_reward_threshold=2.0):
"""
Initialize HierarchicalTrader.
Parameters:
- data (pandas.DataFrame): Preprocessed market data.
- lstm_model: Pretrained LSTM model for forecasting.
- ppo_model: Trained PPO model for trading decisions.
- env: Trading environment.
- risk_reward_threshold (float): Minimum risk/reward ratio required to execute trades.
"""
self.logger = logging.getLogger(__name__)
self.data = data
self.lstm_model = lstm_model
self.ppo_model = ppo_model
self.env = env
self.risk_reward_threshold = risk_reward_threshold
def assess_risk_reward(self, current_price, forecast, atr, current_position):
"""
Assess risk/reward ratio based on current price, forecast, ATR, and position.
Parameters:
- current_price (float): Current market price.
- forecast (float): Forecasted price change (as a ratio).
- atr (float): Current ATR value.
- current_position (int): Number of contracts held.
Returns:
- risk_reward (float): Calculated risk/reward ratio.
"""
expected_price = current_price * (1 + forecast)
reward = abs(expected_price - current_price)
risk = atr
risk_reward = reward / (risk + 1e-9)
return risk_reward
def generate_trade_signal(self, observation):
"""
Generate a trade signal by combining PPO agent output and LSTM forecast.
Applies risk/reward filtering.
Parameters:
- observation: Current state from the trading environment.
Returns:
- signal (int): Trade signal (e.g., number of contracts to buy/sell), or 0 for no action.
"""
# Assume observation consists of technical indicators, normalized position, and forecast (last element)
current_forecast = observation[-1]
current_position = observation[-2]
if self.ppo_model is None:
self.logger.error("PPO model not provided for hierarchical trading.")
return 0
action, _ = self.ppo_model.predict(observation, deterministic=True)
trade_signal = int(action) if isinstance(action, (int, np.integer)) else int(round(float(action)))
# For simplicity, we simulate current price and ATR extraction; in practice, these come from the environment.
current_price = 1.0 # Placeholder
atr = 0.01 # Placeholder
rr_ratio = self.assess_risk_reward(current_price, current_forecast, atr, current_position)
self.logger.info(f"Calculated risk/reward ratio: {rr_ratio:.2f}")
if rr_ratio < self.risk_reward_threshold:
self.logger.info("Risk/Reward ratio below threshold. No trade executed.")
return 0
self.logger.info(f"Trade signal generated: {trade_signal}")
return trade_signal
def backtest(self):
"""
Perform backtesting simulation over historical data using hierarchical decision making.
Returns:
- dict: Backtesting results (e.g., total PnL, win rate).
"""
self.logger.info("Starting backtest simulation...")
total_reward = 0.0
num_trades = 0
for idx, row in self.data.iterrows():
observation = self._create_observation(row)
signal = self.generate_trade_signal(observation)
reward = signal * 0.001 # Simplified reward calculation
total_reward += reward
if signal != 0:
num_trades += 1
results = {
'total_reward': total_reward,
'num_trades': num_trades,
'average_reward': total_reward / (num_trades + 1e-9)
}
self.logger.info("Backtest simulation completed.")
return results
def _create_observation(self, row):
"""
Create a synthetic observation vector from a data row.
In a full implementation, this should mirror the environment's observation space.
Parameters:
- row (pandas.Series): A row from the historical dataset.
Returns:
- numpy.array: Observation vector.
"""
tech_indicators = row.drop(labels=['time', 'close']).values
normalized_position = np.array([0.0])
forecast = np.array([0.001]) # Placeholder forecast value
observation = np.concatenate([tech_indicators, normalized_position, forecast])
return observation
def live_trading(self):
"""
Simulate live trading by continuously generating trade signals.
This function is a placeholder for integrating with live market data and execution systems.
"""
self.logger.info("Starting live trading simulation...")
import time
while True:
observation = self._create_observation(self.data.iloc[-1])
signal = self.generate_trade_signal(observation)
self.logger.info(f"Live trade signal: {signal}")
# In production, signal triggers execution via the execution module.
time.sleep(5) # Simulate 5minute reevaluation in a live setting.

View File

@@ -0,0 +1,240 @@
"""
src/models/lstm_forecaster.py
This module implements an LSTMbased forecasting model for predicting shortterm future prices.
The model uses a bidirectional LSTM with dropout and L2 regularization.
It supports hyperparameter tuning with Optuna and provides training, evaluation, and saving functionality.
"""
import os
import numpy as np
import pandas as pd
import logging
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, Nadam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.preprocessing import MinMaxScaler
import optuna
class LSTMForecaster:
"""
LSTMForecaster builds, trains, and evaluates an LSTM model for price forecasting.
"""
def __init__(self, data, window_size=15, model_save_path=None, hyperparams=None):
"""
Initialize the LSTMForecaster.
Parameters:
- data (pandas.DataFrame): Preprocessed market data with technical indicators.
- window_size (int): The number of time steps in the input sequence.
- model_save_path (str): Directory path where the model will be saved.
- hyperparams (dict): Dictionary of hyperparameters (optional).
"""
self.logger = logging.getLogger(__name__)
self.data = data.copy()
self.window_size = window_size
self.model_save_path = model_save_path if model_save_path else os.getcwd()
self.hyperparams = hyperparams
# Define target column and feature columns. Here we assume 'close' is the target.
self.target_column = 'close'
self.feature_columns = [col for col in data.columns if col not in [self.target_column, 'time']]
# Initialize scalers for features and target
self.scaler_features = MinMaxScaler()
self.scaler_target = MinMaxScaler()
# Prepare training sequences
self.X, self.y = self._create_sequences()
# Build model based on hyperparameters or default settings
if self.hyperparams is None:
self.hyperparams = {
'num_layers': 2,
'units': 64,
'dropout_rate': 0.2,
'learning_rate': 1e-3,
'optimizer': 'Adam',
'l2_reg': 1e-4
}
self.model = self._build_model()
def _create_sequences(self):
"""
Create sequences from the data for LSTM training.
Returns:
- X (numpy.array): 3D array for model input.
- y (numpy.array): 1D array for target variable.
"""
features = self.data[self.feature_columns].values
target = self.data[[self.target_column]].values
features_scaled = self.scaler_features.fit_transform(features)
target_scaled = self.scaler_target.fit_transform(target)
X_seq, y_seq = [], []
for i in range(len(features_scaled) - self.window_size):
X_seq.append(features_scaled[i:i+self.window_size])
y_seq.append(target_scaled[i+self.window_size])
X_seq = np.array(X_seq)
y_seq = np.array(y_seq).flatten()
self.logger.info(f"Created sequences: X shape = {X_seq.shape}, y shape = {y_seq.shape}")
return X_seq, y_seq
def _build_model(self):
"""
Build the LSTM model architecture.
Returns:
- model (tf.keras.Model): Compiled LSTM model.
"""
self.logger.info("Building LSTM model with hyperparameters: {}".format(self.hyperparams))
model = Sequential()
num_layers = self.hyperparams['num_layers']
units = self.hyperparams['units']
dropout_rate = self.hyperparams['dropout_rate']
l2_reg = self.hyperparams['l2_reg']
# Add multiple Bidirectional LSTM layers with dropout
for i in range(num_layers):
return_sequences = True if i < num_layers - 1 else False
if i == 0:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg)),
input_shape=(self.X.shape[1], self.X.shape[2])))
else:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg))))
model.add(Dropout(dropout_rate))
# Final Dense layer for regression output
model.add(Dense(1, activation='linear'))
# Select optimizer
optimizer_name = self.hyperparams.get('optimizer', 'Adam')
learning_rate = self.hyperparams.get('learning_rate', 1e-3)
if optimizer_name == 'Adam':
optimizer = Adam(learning_rate=learning_rate)
elif optimizer_name == 'Nadam':
optimizer = Nadam(learning_rate=learning_rate)
else:
optimizer = Adam(learning_rate=learning_rate)
model.compile(loss='mse', optimizer=optimizer, metrics=['mae'])
self.logger.info("LSTM model built and compiled successfully.")
return model
def train(self, epochs=100, batch_size=16, validation_split=0.2):
"""
Train the LSTM model.
Parameters:
- epochs (int): Number of training epochs.
- batch_size (int): Batch size for training.
- validation_split (float): Fraction of data to use for validation.
"""
callbacks = [
EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]
self.logger.info("Starting LSTM training...")
history = self.model.fit(
self.X, self.y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=callbacks,
verbose=1
)
self.logger.info("LSTM training completed.")
return history
def predict(self, X):
"""
Generate predictions using the trained model.
Parameters:
- X (numpy.array): Input data sequences.
Returns:
- numpy.array: Predicted values (inverse transformed).
"""
pred_scaled = self.model.predict(X)
predictions = self.scaler_target.inverse_transform(pred_scaled)
return predictions.flatten()
def save(self, save_dir):
"""
Save the trained model and scalers.
Parameters:
- save_dir (str): Directory where the model and scalers will be saved.
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
model_path = os.path.join(save_dir, 'lstm_forecaster.h5')
self.model.save(model_path)
import joblib
joblib.dump(self.scaler_features, os.path.join(save_dir, 'scaler_features.pkl'))
joblib.dump(self.scaler_target, os.path.join(save_dir, 'scaler_target.pkl'))
self.logger.info(f"Model and scalers saved to {save_dir}")
@staticmethod
def objective(trial, X_train, y_train, X_val, y_val, input_shape):
"""
Objective function for hyperparameter tuning with Optuna.
Parameters:
- trial: Optuna trial object.
- X_train, y_train: Training data.
- X_val, y_val: Validation data.
- input_shape: Shape of the input data sequence.
Returns:
- float: Validation loss (MAE) to minimize.
"""
num_layers = trial.suggest_int('num_layers', 1, 3)
units = trial.suggest_categorical('units', [32, 64, 128])
dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
optimizer_choice = trial.suggest_categorical('optimizer', ['Adam', 'Nadam'])
l2_reg = trial.suggest_float('l2_reg', 1e-5, 1e-3, log=True)
model = Sequential()
for i in range(num_layers):
return_sequences = True if i < num_layers - 1 else False
if i == 0:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg)),
input_shape=input_shape))
else:
model.add(Bidirectional(LSTM(units, return_sequences=return_sequences,
kernel_regularizer=l2(l2_reg))))
model.add(Dropout(dropout_rate))
model.add(Dense(1, activation='linear'))
if optimizer_choice == 'Adam':
optimizer = Adam(learning_rate=learning_rate)
else:
optimizer = Nadam(learning_rate=learning_rate)
model.compile(loss='mse', optimizer=optimizer, metrics=['mae'])
callbacks = [
EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6)
]
history = model.fit(
X_train, y_train,
epochs=50,
batch_size=16,
validation_data=(X_val, y_val),
callbacks=callbacks,
verbose=0
)
val_mae = min(history.history['val_mae'])
return val_mae

View File

@@ -0,0 +1,109 @@
"""
src/models/ppo_trader.py
This module implements a Proximal Policy Optimization (PPO) agent for trading decisions
within a custom Gym environment tailored for /MES futures trading.
The agent uses technical indicators and LSTM forecasts as inputs.
"""
import os
import logging
import gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from src.trading.env import FuturesTradingEnv
from src.config import TRADING_ENV_CONFIG
class PPOTrader:
"""
PPOTrader encapsulates the training and evaluation of a PPO agent in a custom futures trading environment.
"""
def __init__(self, data, lstm_model, env_config=TRADING_ENV_CONFIG, model_save_path=None):
"""
Initialize the PPOTrader.
Parameters:
- data (pandas.DataFrame): Preprocessed market data.
- lstm_model: Pretrained LSTM model used for forecasting.
- env_config (dict): Configuration for the trading environment.
- model_save_path (str): Directory to save the trained PPO model.
"""
self.logger = logging.getLogger(__name__)
self.data = data
self.lstm_model = lstm_model
self.env_config = env_config
self.model_save_path = model_save_path if model_save_path else os.getcwd()
# Initialize the custom trading environment with required parameters
self.env = FuturesTradingEnv(data=self.data, lstm_model=self.lstm_model, config=self.env_config)
# Wrap environment in DummyVecEnv for PPO training
self.vec_env = DummyVecEnv([lambda: self.env])
# Define PPO hyperparameters (can be customized)
self.ppo_params = {
'n_steps': self.env_config.get('n_steps', 2048),
'batch_size': 64,
'gae_lambda': 0.95,
'gamma': 0.99,
'learning_rate': 3e-4,
'ent_coef': 0.0,
'verbose': 1
}
self.model = None
def train(self, total_timesteps=100000):
"""
Train the PPO model.
Parameters:
- total_timesteps (int): Total number of timesteps for training.
"""
self.logger.info("Starting PPO training...")
self.model = PPO('MlpPolicy', self.vec_env, **self.ppo_params)
self.model.learn(total_timesteps=total_timesteps)
self.logger.info("PPO training completed.")
return self.model
def predict(self, obs):
"""
Predict the action for a given observation.
Parameters:
- obs: Current observation from the trading environment.
Returns:
- action: Predicted action.
"""
if self.model is None:
self.logger.error("Model is not trained yet!")
return None
action, _ = self.model.predict(obs, deterministic=True)
return action
def save(self, save_dir):
"""
Save the trained PPO model.
Parameters:
- save_dir (str): Directory where the model will be saved.
"""
if self.model is None:
self.logger.error("No model to save. Train the model first.")
return
if not os.path.exists(save_dir):
os.makedirs(save_dir)
model_path = os.path.join(save_dir, 'ppo_trader.zip')
self.model.save(model_path)
self.logger.info(f"PPO model saved to {model_path}")
def load(self, model_path):
"""
Load a pretrained PPO model.
Parameters:
- model_path (str): Path to the saved PPO model.
"""
self.model = PPO.load(model_path, env=self.vec_env)
self.logger.info(f"PPO model loaded from {model_path}")

View File

@@ -0,0 +1,163 @@
"""
src/trading/env.py
This module defines a custom OpenAI Gym environment for /MES futures trading on a 5minute timeframe.
The environment includes logic for:
- Tracking positions (long/short)
- Evaluating positions every 5 minutes
- Implementing ATRbased trailing stops for risk management
- Allowing a single contract initially with hooks for multicontract trading in the future
"""
import gym
from gym import spaces
import numpy as np
import pandas as pd
import logging
import threading
class FuturesTradingEnv(gym.Env):
"""
Custom Gym environment for futures trading.
"""
metadata = {'render.modes': ['human']}
def __init__(self, data, lstm_model=None, config=None):
"""
Initialize the trading environment.
Parameters:
- data (pandas.DataFrame): Preprocessed market data.
- lstm_model: Pretrained LSTM model for price forecasting (optional).
- config (dict): Configuration parameters for the environment.
"""
super(FuturesTradingEnv, self).__init__()
self.logger = logging.getLogger(__name__)
self.data = data.reset_index(drop=True)
self.lstm_model = lstm_model
self.config = config if config is not None else {}
# Define action space: discrete orders ranging from -max_contracts to +max_contracts.
self.max_contracts = self.config.get('max_contracts', 1)
self.action_space = spaces.Discrete(2 * self.max_contracts + 1)
# Define observation space: technical indicators plus current position and forecast.
self.tech_indicator_cols = [col for col in self.data.columns if col not in ['time', 'close']]
obs_dim = len(self.tech_indicator_cols) + 2 # +1 for normalized position, +1 for forecast.
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32)
# Trading variables
self.current_step = 0
self.contracts_held = 0
self.entry_price = None
# ATRbased trailing stop parameters
self.atr_period = self.config.get('atr_period', 14)
self.trailing_stop_multiplier = self.config.get('trailing_stop_multiplier', 1.5)
self.trailing_stop = None
# Lock for threadsafe LSTM predictions
self.lstm_lock = threading.Lock()
def _get_observation(self):
"""
Construct the observation vector from the current market data.
Returns:
- observation (numpy.array): Observation vector for the current step.
"""
row = self.data.iloc[self.current_step]
tech_indicators = row[self.tech_indicator_cols].values.astype(np.float32)
norm_position = np.array([self.contracts_held / self.max_contracts], dtype=np.float32)
if self.lstm_model and self.current_step >= self.config.get('window_size', 15):
# For proper forecasting, construct an input sequence; here we use a placeholder.
forecast = np.array([0.001], dtype=np.float32)
else:
forecast = np.array([0.0], dtype=np.float32)
observation = np.concatenate([tech_indicators, norm_position, forecast])
return observation
def reset(self):
"""
Reset the environment to an initial state.
Returns:
- observation (numpy.array): The initial observation.
"""
self.current_step = 0
self.contracts_held = 0
self.entry_price = None
self.trailing_stop = None
return self._get_observation()
def step(self, action):
"""
Execute one time step in the environment.
Parameters:
- action (int): Action taken by the agent (discrete order adjustment).
Returns:
- observation (numpy.array): Next state observation.
- reward (float): Reward obtained from the action.
- done (bool): Whether the episode has ended.
- info (dict): Additional environment info.
"""
trade_adjustment = action - self.max_contracts
current_price = self.data.iloc[self.current_step]['close']
transaction_cost = self.config.get('transaction_cost', 0.001) * abs(trade_adjustment) * current_price
reward = 0.0
if trade_adjustment != 0:
if self.contracts_held == 0:
self.contracts_held = trade_adjustment
self.entry_price = current_price
atr = self.data.iloc[self.current_step].get('ATR', 0.01)
self.trailing_stop = current_price - self.trailing_stop_multiplier * atr if trade_adjustment > 0 else current_price + self.trailing_stop_multiplier * atr
else:
prev_position = self.contracts_held
self.contracts_held += trade_adjustment
if self.contracts_held != 0:
self.entry_price = (self.entry_price * prev_position + current_price * trade_adjustment) / self.contracts_held
else:
self.entry_price = None
reward -= transaction_cost
else:
if self.contracts_held != 0 and self.entry_price is not None:
reward = (current_price - self.entry_price) * self.contracts_held
# ATRbased trailing stop logic:
if self.trailing_stop is not None:
if self.contracts_held > 0 and current_price < self.trailing_stop:
self.logger.info("ATR trailing stop triggered for long position.")
reward += (current_price - self.entry_price) * self.contracts_held
self.contracts_held = 0
self.entry_price = None
self.trailing_stop = None
elif self.contracts_held < 0 and current_price > self.trailing_stop:
self.logger.info("ATR trailing stop triggered for short position.")
reward += (self.entry_price - current_price) * abs(self.contracts_held)
self.contracts_held = 0
self.entry_price = None
self.trailing_stop = None
self.current_step += 1
done = self.current_step >= len(self.data) - 1
observation = self._get_observation()
info = {
'contracts_held': self.contracts_held,
'entry_price': self.entry_price
}
return observation, reward, done, info
def render(self, mode='human'):
"""
Render the current state.
"""
current_price = self.data.iloc[self.current_step]['close']
print(f"Step: {self.current_step}, Price: {current_price:.4f}, Contracts Held: {self.contracts_held}, Entry Price: {self.entry_price}")

View File

@@ -0,0 +1,53 @@
"""
src/trading/execution.py
This module abstracts the interaction with broker APIs for executing market orders.
It provides placeholder functions for live order execution.
In a production setting, integrate these functions with the appropriate broker API.
"""
import logging
def execute_order(order_type, quantity, price=None):
"""
Execute a market order.
Parameters:
- order_type (str): 'buy' or 'sell'.
- quantity (int): Number of contracts.
- price (float): Execution price (None for market orders).
Returns:
- dict: Order execution confirmation details.
"""
logger = logging.getLogger(__name__)
logger.info(f"Executing {order_type.upper()} order for {quantity} contract(s) at {price if price else 'Market Price'}.")
# Placeholder: In production, execute order via broker API.
order_confirmation = {
'order_type': order_type,
'quantity': quantity,
'price': price,
'status': 'executed'
}
return order_confirmation
def get_live_market_data():
"""
Retrieve live market data from a data feed.
Returns:
- dict: Latest market data (e.g., price and volume).
"""
logger = logging.getLogger(__name__)
live_data = {
'time': None,
'open': None,
'high': None,
'low': None,
'close': 1.0, # Dummy value for testing
'volume': None
}
logger.info("Retrieved live market data (placeholder).")
return live_data

View File

@@ -0,0 +1,49 @@
"""
src/trading/risk_manager.py
This module provides advanced risk management functionalities.
It calculates the current risk/reward ratio and manages live positions,
including enforcing a minimum 2:1 risk/reward ratio and ATRbased trailing stops.
"""
import logging
def calculate_risk_reward(current_price, entry_price, atr):
"""
Calculate the risk/reward ratio for a position.
Parameters:
- current_price (float): Current market price.
- entry_price (float): Position entry price.
- atr (float): Average True Range (volatility measure).
Returns:
- float: Risk/reward ratio.
"""
logger = logging.getLogger(__name__)
risk = atr
reward = abs(current_price - entry_price)
if risk == 0:
logger.warning("ATR is zero; risk/reward ratio undefined. Defaulting to 0.")
return 0.0
risk_reward = reward / risk
return risk_reward
def is_trade_acceptable(current_price, entry_price, atr, minimum_ratio=2.0):
"""
Check if the trade meets the required minimum risk/reward ratio.
Parameters:
- current_price (float): Current price.
- entry_price (float): Entry price.
- atr (float): Average True Range.
- minimum_ratio (float): Minimum required ratio.
Returns:
- bool: True if acceptable, False otherwise.
"""
rr_ratio = calculate_risk_reward(current_price, entry_price, atr)
return rr_ratio >= minimum_ratio

View File

@@ -0,0 +1,32 @@
"""
src/utils/logger.py
This module sets up centralized logging for the FuturesTradingAI system.
Logs are output to both console and file.
"""
import logging
import os
from src.config import LOGGING_CONFIG
def setup_logging(log_dir):
"""
Setup logging to a file and the console.
Parameters:
- log_dir (str): Directory where log files will be stored.
"""
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, 'futures_trading_ai.log')
logging.basicConfig(
level=LOGGING_CONFIG.get('log_level', 'INFO'),
format=LOGGING_CONFIG.get('log_format'),
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)

View File

@@ -0,0 +1,42 @@
"""
src/utils/resource_monitor.py
This module provides functions to monitor system resources (CPU and GPU).
It logs resource utilization at specified intervals.
"""
import logging
import time
import psutil
import GPUtil
def monitor_resources(interval=60):
"""
Monitor CPU and GPU resources at regular intervals.
Parameters:
- interval (int): Seconds between checks.
"""
logger = logging.getLogger(__name__)
while True:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
logger.info(f"CPU usage per core: {cpu_percent}")
gpus = GPUtil.getGPUs()
for gpu in gpus:
logger.info(
f"GPU {gpu.id} ({gpu.name}): Load: {gpu.load*100:.1f}%, Memory: "
f"{gpu.memoryUsed}/{gpu.memoryTotal} MB, Temperature: {gpu.temperature} °C"
)
time.sleep(interval)
def start_resource_monitor(interval=60):
"""
Start monitoring system resources.
Parameters:
- interval (int): Interval in seconds.
"""
monitor_resources(interval)

View File

@@ -0,0 +1,45 @@
"""
tests/test_data.py
Unit tests for data loading and preprocessing modules.
"""
import unittest
import os
import pandas as pd
from src.data.loader import load_data
from src.data.preprocessing import preprocess_data
class TestDataLoader(unittest.TestCase):
def setUp(self):
# Create a simple sample CSV file for testing.
self.test_csv = "tests/sample_test_data.csv"
data = {
'time': ['2021-01-01 09:30', '2021-01-01 09:35', '2021-01-01 09:40'],
'open': [100, 101, 102],
'high': [101, 102, 103],
'low': [99, 100, 101],
'close': [100.5, 101.5, 102.5],
'volume': [1000, 1100, 1050]
}
df = pd.DataFrame(data)
df.to_csv(self.test_csv, index=False)
def tearDown(self):
if os.path.exists(self.test_csv):
os.remove(self.test_csv)
def test_load_data(self):
df = load_data(self.test_csv)
self.assertIsInstance(df, pd.DataFrame)
self.assertIn('time', df.columns)
def test_preprocess_data(self):
df = load_data(self.test_csv)
df_processed = preprocess_data(df)
self.assertIsInstance(df_processed, pd.DataFrame)
self.assertIn('hl_ratio', df_processed.columns)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,49 @@
"""
tests/test_models.py
Unit tests for the LSTMForecaster and PPOTrader models.
"""
import unittest
import pandas as pd
import numpy as np
from src.models.lstm_forecaster import LSTMForecaster
from src.models.ppo_trader import PPOTrader
class TestModels(unittest.TestCase):
def setUp(self):
# Create a simple DataFrame with dummy data.
data = {
'time': pd.date_range(start='2021-01-01', periods=50, freq='5T'),
'open': np.random.rand(50) * 100,
'high': np.random.rand(50) * 100,
'low': np.random.rand(50) * 100,
'close': np.random.rand(50) * 100,
'volume': np.random.randint(1000, 5000, 50),
'RSI': np.random.rand(50) * 100,
'MACD': np.random.rand(50) * 5,
'OBV': np.random.rand(50) * 10000,
'ADX': np.random.rand(50) * 50,
'BB_Upper': np.random.rand(50) * 110,
'BB_Lower': np.random.rand(50) * 90,
'BB_Bandwidth': np.random.rand(50),
'MFI': np.random.rand(50) * 100,
'ATR': np.random.rand(50)
}
self.df = pd.DataFrame(data)
def test_lstm_forecaster_training(self):
forecaster = LSTMForecaster(data=self.df, window_size=5)
history = forecaster.train(epochs=2, batch_size=2, validation_split=0.2)
self.assertIsNotNone(history)
def test_ppo_trader_training(self):
forecaster = LSTMForecaster(data=self.df, window_size=5)
ppo_trader = PPOTrader(data=self.df, lstm_model=forecaster.model)
obs = np.random.rand(ppo_trader.env.observation_space.shape[0]).astype(np.float32)
action = ppo_trader.predict(obs)
self.assertIsNotNone(action)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,41 @@
"""
tests/test_trading.py
Unit tests for the custom trading environment and risk management logic.
"""
import unittest
import pandas as pd
import numpy as np
from src.trading.env import FuturesTradingEnv
from src.trading.risk_manager import calculate_risk_reward, is_trade_acceptable
class TestTradingEnv(unittest.TestCase):
def setUp(self):
data = {
'time': pd.date_range(start='2021-01-01', periods=20, freq='5T'),
'open': np.linspace(100, 120, 20),
'high': np.linspace(101, 121, 20),
'low': np.linspace(99, 119, 20),
'close': np.linspace(100, 120, 20),
'volume': np.random.randint(1000, 5000, 20),
'ATR': np.full(20, 0.5)
}
self.df = pd.DataFrame(data)
self.env = FuturesTradingEnv(data=self.df, lstm_model=None, config={'max_contracts': 1, 'transaction_cost': 0.001, 'atr_period': 14})
def test_environment_step(self):
obs = self.env.reset()
self.assertIsNotNone(obs)
obs, reward, done, info = self.env.step(1)
self.assertIsInstance(obs, (list, np.ndarray))
def test_risk_reward(self):
rr = calculate_risk_reward(current_price=105, entry_price=100, atr=2)
self.assertGreaterEqual(rr, 0)
acceptable = is_trade_acceptable(current_price=105, entry_price=100, atr=2, minimum_ratio=2.0)
self.assertIsInstance(acceptable, bool)
if __name__ == '__main__':
unittest.main()