From 8d9ed2e7ea5ae8ec90d9977d22f36cdeba5d87a5 Mon Sep 17 00:00:00 2001 From: Jcrispy13 Date: Wed, 23 Oct 2024 23:27:52 -0400 Subject: [PATCH] Create GUSHTradingBotV1.0 First bot --- GUSHTradingBotV1.0 | 199 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 GUSHTradingBotV1.0 diff --git a/GUSHTradingBotV1.0 b/GUSHTradingBotV1.0 new file mode 100644 index 0000000..d5d227f --- /dev/null +++ b/GUSHTradingBotV1.0 @@ -0,0 +1,199 @@ +import numpy as np +import pandas as pd +import yfinance as yf +from scipy.optimize import minimize + + +def ticker_info(): + ticker = "gush" + return ticker.upper() + +def fetch_expiration_dates(ticker): + print(f"Fetching available expiration dates for {ticker}...") + stock = yf.Ticker(ticker) + expiration_dates = stock.options + print(f"Available expiration dates: {expiration_dates}") + return expiration_dates + +def select_expiration_date(expiration_dates): + print("Selecting the first available expiration date...") + expiration_date = expiration_dates[0] + print(f"Selected expiration date: {expiration_date}") + return expiration_date + +def fetch_option_chain(ticker, expiration_date): + print(f"Fetching option chain for {ticker} with expiration date {expiration_date}...") + stock = yf.Ticker(ticker) + options_chain = stock.option_chain(expiration_date) + print("Option chain fetched successfully!") + return options_chain + +def get_price_data(ticker, start_date, end_date): + print(f"Fetching price data for {ticker} from {start_date} to {end_date}...") + data = yf.download(ticker, start=start_date, end=end_date) + print(f"Price data fetched successfully for {ticker}!") + return data +def moving_average_strategy(data, short_window=20, long_window=50): + data['Short_MA'] = data['Close'].rolling(window=short_window).mean() + data['Long_MA'] = data['Close'].rolling(window=long_window).mean() + data['Signal'] = np.where(data['Short_MA'] > data['Long_MA'], 1, -1) + return data['Signal'] + +def rsi_strategy(data, window=14, overbought=70, oversold=30): + delta = data['Close'].diff(1) + gain = np.where(delta > 0, delta, 0) + loss = np.where(delta < 0, abs(delta), 0) + avg_gain = pd.Series(gain).rolling(window=window).mean() + avg_loss = pd.Series(loss).rolling(window=window).mean() + rs = avg_gain / avg_loss + rsi = 100 - (100 / (1 + rs)) + signal = np.where(rsi < oversold, 1, np.where(rsi > overbought, -1, 0)) + return pd.Series(signal, index=data.index) + +def bollinger_bands_strategy(data, window=20, num_std=2): + data['Moving_Avg'] = data['Close'].rolling(window=window).mean() + data['Band_Upper'] = data['Moving_Avg'] + num_std * data['Close'].rolling(window).std() + data['Band_Lower'] = data['Moving_Avg'] - num_std * data['Close'].rolling(window).std() + signal = np.where(data['Close'] < data['Band_Lower'], 1, np.where(data['Close'] > data['Band_Upper'], -1, 0)) + return pd.Series(signal, index=data.index) +def generate_signals(data): + ma_signal = moving_average_strategy(data) + rsi_signal = rsi_strategy(data) + bollinger_signal = bollinger_bands_strategy(data) + return [ma_signal, rsi_signal, bollinger_signal] +def backtest_option_trades(option_chain, signals, stock_data): + """ + Backtest option trades based on the given signals and stock data. + """ + trades = [] + current_position = None + signals = pd.Series(signals) # Convert signals to pandas Series + + # Ensure both stock_data and option_chain indices are sorted in ascending order + stock_data = stock_data.sort_index() + + # Convert 'lastTradeDate' or any date-related columns to datetime in option_chain + if 'lastTradeDate' in option_chain.columns: + option_chain['lastTradeDate'] = pd.to_datetime(option_chain['lastTradeDate']) + option_chain = option_chain.set_index('lastTradeDate') + + # If option_chain index isn't datetime, convert it to datetime (ensuring compatibility) + option_chain.index = pd.to_datetime(option_chain.index) + + # Remove the timezone from option_chain index + option_chain.index = option_chain.index.tz_localize(None) + + # Now reindex the option chain to match the stock data index (forward fill missing option prices) + option_chain = option_chain.sort_index() + option_chain = option_chain.reindex(stock_data.index, method='ffill') + + for i in range(len(signals)): + if signals.iloc[i] == 1 and current_position is None: + # BUY signal + entry_price = option_chain['lastPrice'].iloc[i] + if pd.isna(entry_price): # If price is nan, log the error and continue + print(f"Missing entry price on {stock_data.index[i]}, skipping trade.") + continue + entry_date = stock_data.index[i] + current_position = { + 'entry_price': entry_price, + 'entry_date': entry_date + } + print(f"BUY signal on {entry_date}: Entry Price = {entry_price}") + + elif signals.iloc[i] == -1 and current_position is not None: + # SELL signal + exit_price = option_chain['lastPrice'].iloc[i] + if pd.isna(exit_price): # If price is nan, log the error and continue + print(f"Missing exit price on {stock_data.index[i]}, skipping trade.") + continue + exit_date = stock_data.index[i] + pnl = (exit_price - current_position['entry_price']) * 100 + print(f"SELL signal on {exit_date}: Exit Price = {exit_price}, P&L = {pnl}") + + trades.append({ + 'entry_date': current_position['entry_date'], + 'entry_price': current_position['entry_price'], + 'exit_date': exit_date, + 'exit_price': exit_price, + 'pnl': pnl + }) + current_position = None + + cumulative_pnl = sum(trade['pnl'] for trade in trades) + total_wins = sum(1 for trade in trades if trade['pnl'] > 0) + total_trades = len(trades) + win_rate = total_wins / total_trades if total_trades > 0 else 0 + + return cumulative_pnl, trades, win_rate + total_trades = len(trades) + cumulative_pnl, daily_pnls, win_rate, total_trades = backtest_option_trades(options_chain.calls, weighted_signals, test_data) + + return cumulative_pnl, trades, win_rate, total_trades +def objective_function_profit(weights, strategy_signals, data, option_chain): + weights = np.array(weights) + weights /= np.sum(weights) # Normalize weights + weighted_signals = np.sum([signal * weight for signal, weight in zip(strategy_signals, weights)], axis=0) + + # Since `backtest_option_trades` returns 3 values, we only unpack those + cumulative_pnl, _, _ = backtest_option_trades(option_chain, weighted_signals, data) + + # Return negative cumulative P&L to maximize profit + return -cumulative_pnl + +def optimize_weights(strategy_signals, data, option_chain): + initial_weights = [1/len(strategy_signals)] * len(strategy_signals) + constraints = ({'type': 'eq', 'fun': lambda weights: np.sum(weights) - 1}) + bounds = [(0, 1)] * len(strategy_signals) + + result = minimize(objective_function_profit, initial_weights, args=(strategy_signals, data, option_chain), + method='SLSQP', bounds=bounds, constraints=constraints) + return result.x # Optimal weights +def weighted_signal_combination(strategy_signals, weights): + weighted_signals = np.sum([signal * weight for signal, weight in zip(strategy_signals, weights)], axis=0) + return weighted_signals + +def main_decision(weighted_signals): + last_signal = weighted_signals[-1] # Latest signal + if last_signal > 0: + return "BUY" + elif last_signal < 0: + return "SELL" + else: + return "HOLD" +def run_backtest(): + ticker = ticker_info() + expiration_dates = fetch_expiration_dates(ticker) + expiration_date = select_expiration_date(expiration_dates) + options_chain = fetch_option_chain(ticker, expiration_date) + + # Fetch training data + train_data = get_price_data(ticker, '2010-01-01', '2022-01-01') + + # Generate signals + strategy_signals_train = generate_signals(train_data) + + # Optimize weights + optimal_weights = optimize_weights(strategy_signals_train, train_data, options_chain.calls) + + # Fetch test data + test_data = get_price_data(ticker, '2022-01-02', '2024-01-01') + + # Generate test signals + strategy_signals_test = generate_signals(test_data) + + # Combine signals and backtest + weighted_signals = weighted_signal_combination(strategy_signals_test, optimal_weights) + cumulative_pnl, daily_pnls, win_rate = backtest_option_trades(options_chain.calls, weighted_signals, test_data) + + # Make final decision + decision = main_decision(weighted_signals) + print(f"Final decision: {decision}") + + # Output results + print(f"Cumulative P&L: {cumulative_pnl}") + print(f"Win Rate: {win_rate * 100:.2f}%") + + +# Call the main function +run_backtest()