import json import re import os import time from selenium import webdriver from selenium.webdriver.firefox.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup from tqdm import tqdm # Progress bar OIL_NEWS_URL = "https://oilprice.com/Latest-Energy-News/World-News/" SCRAPER_DIR = os.path.dirname(os.path.dirname(__file__)) # One level up DATA_DIR = os.path.join(SCRAPER_DIR, "data") KEYWORD_FILE_PATH = os.path.join(SCRAPER_DIR, "assets", "oil_key_words.txt") if not os.path.exists(DATA_DIR): os.makedirs(DATA_DIR) def load_existing_data(file_path): if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) return [] def save_to_json(data, file_path): existing_data = load_existing_data(file_path) existing_links = {article['link'] for article in existing_data if 'link' in article} new_data = [] for article in data: if 'link' not in article or article['link'] in existing_links: print(f"Skipping duplicate or missing link article: {article.get('headline', 'Unknown Headline')}") continue new_data.append(article) combined_data = existing_data + new_data with open(file_path, 'w', encoding='utf-8') as f: json.dump(combined_data, f, ensure_ascii=False, indent=4) print(f"Data saved to {file_path}") def load_keyword_importance(file_path): keyword_importance = {} if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as f: for line in f: parts = line.strip().split() if len(parts) == 2: keyword, importance = parts keyword_importance[keyword.lower()] = int(importance) else: print(f"Keyword file not found at {file_path}") return keyword_importance keyword_importance = load_keyword_importance(KEYWORD_FILE_PATH) def extract_keywords(text, keyword_importance): words = re.findall(r'\b\w+\b', text.lower()) keywords = {word: keyword_importance[word] for word in words if word in keyword_importance} return sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10] def filter_content(content): """Remove advertisements, irrelevant phrases, headers, and disclaimers from content.""" patterns = [ r'ADVERTISEMENT', r'Click Here for \d+\+ Global Oil Prices', r'Find us on:', r'Back to homepage', r'Join the discussion', r'More Top Reads From Oilprice.com', r'©OilPrice\.com.*?educational purposes', r'A Media Solutions.*?Oilprice.com', r'\"It\'s most important 8 minute read of my week…\"', r'^[\w\s]*?is a [\w\s]*? for Oilprice\.com.*?More Info', r'^.*?DNOW is a supplier.*?,', ] for pattern in patterns: content = re.sub(pattern, '', content, flags=re.IGNORECASE) content = re.sub(r'\s+', ' ', content).strip() return content def extract_author_info(driver, article_soup, headline_pages=1): """Extract detailed author information from the 'read more' link if available.""" author = "Unknown Author" author_bio = "" contributor_since = "" other_articles = [] author_tag = article_soup.find('a', text=re.compile(r'More Info|Read More', re.IGNORECASE)) if author_tag: retries = 3 # Set retry limit for attempt in range(retries): try: driver.get(author_tag['href']) WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "authorBio")) ) bio_soup = BeautifulSoup(driver.page_source, "html.parser") # Extract author's name author_name_tag = bio_soup.find('h1') author = author_name_tag.get_text(strip=True) if author_name_tag else "Unknown Author" # Extract author's bio description author_bio_tag = bio_soup.find('p') author_bio = author_bio_tag.get_text(strip=True) if author_bio_tag else "No bio available" # Extract contributor since date contributor_since_tag = bio_soup.find(text=re.compile(r"Contributor since", re.IGNORECASE)) if contributor_since_tag: contributor_since = contributor_since_tag.parent.get_text(strip=True).replace("Contributor since: ", "") # Extract headlines of latest articles by the author, limited by `headline_pages` for page in range(1, headline_pages + 1): driver.get(f"{author_tag['href']}Page-{page}.html") WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "categoryArticle")) ) page_soup = BeautifulSoup(driver.page_source, "html.parser") article_tags = page_soup.find_all('h2', class_='categoryArticle__title') for article in article_tags: other_articles.append(article.get_text(strip=True)) break # Break loop if successful except Exception as e: print(f"Attempt {attempt + 1} failed for author bio page. Retrying...") time.sleep(2) # Wait before retrying if attempt == retries - 1: print(f"Author bio page failed to load or extract after {retries} attempts. Error: {e}") return { "name": author, "bio": author_bio, "contributor_since": contributor_since, "other_articles": other_articles } def scrape_oil_news(): print("Scraping oil news articles for sentiment analysis...") options = Options() options.headless = True driver = webdriver.Firefox(options=options) news_data = [] page_number = 1 max_pages = 1 total_articles = 0 while page_number <= max_pages: driver.get(f"{OIL_NEWS_URL}Page-{page_number}.html") try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "categoryArticle")) ) except: break soup = BeautifulSoup(driver.page_source, "html.parser") total_articles += len(soup.find_all('div', class_='categoryArticle')) page_number += 1 page_number = 1 with tqdm(total=total_articles, desc="Scraping articles", unit="article") as pbar: while page_number <= max_pages: print(f"\nProcessing page {page_number}...") driver.get(f"{OIL_NEWS_URL}Page-{page_number}.html") soup = BeautifulSoup(driver.page_source, "html.parser") articles = soup.find_all('div', class_='categoryArticle') if not articles: break for article in articles: headline = article.find('h2', class_='categoryArticle__title').get_text(strip=True) if article.find('h2', class_='categoryArticle__title') else None link_tag = article.find('a', href=True) link = link_tag['href'] if link_tag else None date_meta = article.find('p', class_='categoryArticle__meta') date = date_meta.get_text(strip=True).split('|')[0].strip() if date_meta else None content = "" if link: print(f"Fetching article: {link}") driver.get(link) try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "singleArticle")) ) article_soup = BeautifulSoup(driver.page_source, "html.parser") raw_content = " ".join([p.get_text(strip=True) for p in article_soup.find_all('p')]) content = filter_content(raw_content) author, author_bio = extract_author_info(driver, article_soup) except: print(f"Error: Content did not load for article {headline}.") extracted_keywords = extract_keywords(f"{headline} {content}", keyword_importance) if headline and link and date: author_info = extract_author_info(driver, article_soup, headline_pages=1) news_data.append({ 'headline': headline, 'link': link, 'content': content, 'date': date, 'author': author_info['name'], 'author_bio': author_info['bio'], 'contributor_since': author_info['contributor_since'], 'other_articles': author_info['other_articles'], 'keywords': extracted_keywords, }) pbar.set_postfix_str(f"Processing article: {headline[:40]}...") pbar.update(1) page_number += 1 time.sleep(2) driver.quit() return news_data def run_preprocessor(): file_path = os.path.join(DATA_DIR, 'preprocessed_oil_news.json') news_data = scrape_oil_news() save_to_json(news_data, file_path) if __name__ == "__main__": run_preprocessor()