mirror of
https://github.com/Priyanshu-hawk/ChatGPT-unofficial-api-selenium.git
synced 2026-06-02 06:03:34 +02:00
440 lines
19 KiB
Python
440 lines
19 KiB
Python
import os
|
|
import time
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException, WebDriverException
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
import undetected_chromedriver as uc
|
|
import helper_fn
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%m/%d/%Y %I:%M:%S %p'
|
|
)
|
|
|
|
class ChromeDriverManager:
|
|
def __init__(self):
|
|
self.driver = None
|
|
self.max_retries = 3
|
|
self.retry_delay = 5
|
|
self.setup_chrome_options()
|
|
|
|
def setup_chrome_options(self):
|
|
"""Set up Chrome options for optimal stability."""
|
|
self.options = uc.ChromeOptions()
|
|
|
|
# Create user data directory if it doesn't exist
|
|
user_data_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'chrome_user_data')
|
|
os.makedirs(user_data_dir, exist_ok=True)
|
|
|
|
# Add user data directory argument
|
|
self.options.add_argument(f'--user-data-dir={user_data_dir}')
|
|
|
|
# Existing options
|
|
self.options.add_argument('--no-sandbox')
|
|
self.options.add_argument('--disable-dev-shm-usage')
|
|
self.options.add_argument('--disable-gpu')
|
|
self.options.add_argument('--disable-software-rasterizer')
|
|
self.options.add_argument('--disable-extensions')
|
|
self.options.add_argument('--disable-notifications')
|
|
self.options.add_argument('--disable-popup-blocking')
|
|
self.options.add_argument('--ignore-certificate-errors')
|
|
self.options.add_argument('--start-maximized')
|
|
self.options.add_argument('--disable-blink-features=AutomationControlled')
|
|
self.options.add_argument('--disable-logging')
|
|
self.options.add_argument('--log-level=3')
|
|
self.options.add_argument('--silent')
|
|
self.options.add_argument('--disable-automation')
|
|
self.options.add_argument('--disable-infobars')
|
|
self.options.add_argument('--disable-blink-features')
|
|
self.options.add_argument('--disable-blink-features=AutomationControlled')
|
|
self.options.add_argument('--disable-web-security')
|
|
self.options.add_argument('--allow-running-insecure-content')
|
|
|
|
def initialize_driver(self):
|
|
"""Initialize Chrome driver with retries."""
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
if self.driver:
|
|
try:
|
|
self.driver.quit()
|
|
except:
|
|
pass
|
|
|
|
self.driver = uc.Chrome(options=self.options)
|
|
self.driver.set_page_load_timeout(30)
|
|
helper_fn.set_driver(self.driver) # Set the driver in helper_fn
|
|
logging.info("Chrome driver initialized successfully")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Attempt {attempt + 1} failed to initialize Chrome driver: {str(e)}")
|
|
if attempt < self.max_retries - 1:
|
|
time.sleep(self.retry_delay)
|
|
else:
|
|
logging.error("Failed to initialize Chrome driver after all retries")
|
|
raise
|
|
|
|
def ensure_driver_alive(self):
|
|
"""Check if driver is alive and reconnect if necessary."""
|
|
try:
|
|
# Try to get window handles to check if driver is responsive
|
|
self.driver.window_handles
|
|
return True
|
|
except:
|
|
logging.warning("Chrome driver appears to be dead, attempting to reconnect...")
|
|
return self.initialize_driver()
|
|
|
|
def quit(self):
|
|
"""Safely quit the Chrome driver."""
|
|
if self.driver:
|
|
try:
|
|
self.driver.quit()
|
|
except:
|
|
pass
|
|
self.driver = None
|
|
|
|
# Initialize the global ChromeDriverManager
|
|
driver_manager = ChromeDriverManager()
|
|
driver = None
|
|
|
|
def initialize_chrome():
|
|
"""Initialize Chrome and navigate to ChatGPT."""
|
|
global driver
|
|
try:
|
|
if driver_manager.initialize_driver():
|
|
driver = driver_manager.driver
|
|
driver.get("https://chat.openai.com/")
|
|
logging.info("Successfully navigated to ChatGPT")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Failed to initialize Chrome: {str(e)}")
|
|
return False
|
|
|
|
def ensure_chrome_alive():
|
|
"""Ensure Chrome is alive and reconnect if necessary."""
|
|
global driver
|
|
if driver_manager.ensure_driver_alive():
|
|
driver = driver_manager.driver
|
|
return True
|
|
return False
|
|
|
|
def is_response_complete():
|
|
"""Check if the response is complete using multiple methods."""
|
|
try:
|
|
# Method 1: Check for the presence of typing indicators
|
|
typing_indicators = driver.find_elements(By.XPATH, "//div[contains(@class, 'result-streaming')]")
|
|
if not typing_indicators:
|
|
logging.info("No typing indicators found")
|
|
return True
|
|
|
|
# Method 2: Check if the response text has stopped changing
|
|
response_container_xpath = "//div[contains(@class, 'markdown prose w-full break-words')]"
|
|
current_response = driver.find_elements(By.XPATH, response_container_xpath)[-1].text
|
|
time.sleep(2) # Brief wait
|
|
new_response = driver.find_elements(By.XPATH, response_container_xpath)[-1].text
|
|
if current_response == new_response and current_response != "":
|
|
logging.info("Response text has stabilized")
|
|
return True
|
|
|
|
logging.info("Response still in progress...")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error checking response completion: {str(e)}")
|
|
return False
|
|
|
|
def make_gpt_request_and_copy(text):
|
|
"""
|
|
Function to interact with ChatGPT web interface and copy formatted response
|
|
Args:
|
|
text: Query text to send to ChatGPT
|
|
Returns:
|
|
str: Formatted response text from clipboard
|
|
"""
|
|
global driver
|
|
|
|
try:
|
|
if not ensure_chrome_alive():
|
|
raise Exception("Failed to ensure Chrome is alive")
|
|
|
|
logging.info("Starting GPT request process...")
|
|
# Initial pause to ensure page is loaded
|
|
time.sleep(1.5)
|
|
|
|
try:
|
|
# 1. Check for "Stay logged out" button first
|
|
logging.info("Step 1a: Checking for 'Stay logged out' button...")
|
|
try:
|
|
stay_logged_out_xpath = "//a[contains(text(), 'Stay logged out')]"
|
|
if helper_fn.is_element_present(stay_logged_out_xpath, timeout=1):
|
|
logging.info("'Stay logged out' button found, clicking it...")
|
|
helper_fn.click_element(stay_logged_out_xpath)
|
|
time.sleep(1)
|
|
logging.info("Clicked 'Stay logged out'")
|
|
except Exception as logout_error:
|
|
logging.warning(f"Stay logged out check encountered an error: {str(logout_error)}")
|
|
|
|
# 1b. Text input logic
|
|
logging.info("Step 1b: Finding text input area...")
|
|
text_area_xpath = "//*[@id='prompt-textarea']"
|
|
if helper_fn.send_keys(text_area_xpath, text):
|
|
logging.info("Text input successful")
|
|
else:
|
|
raise Exception("Failed to input text")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in Step 1 (Text Input): {str(e)}")
|
|
raise
|
|
|
|
try:
|
|
# 2. Find and click send button
|
|
logging.info("Step 2: Finding and clicking send button...")
|
|
send_btn_xpath = "//*[@data-testid='send-button']"
|
|
if helper_fn.click_element(send_btn_xpath):
|
|
logging.info("Send button clicked")
|
|
else:
|
|
raise Exception("Failed to click send button")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in Step 2 (Send Button): {str(e)}")
|
|
raise
|
|
|
|
try:
|
|
# 3. Wait for response container
|
|
logging.info("Step 3: Waiting for response container...")
|
|
try:
|
|
canvas_button_xpath = "//button[contains(., 'Answer in chat instead')]"
|
|
if helper_fn.is_element_present(canvas_button_xpath, timeout=5):
|
|
logging.info("Canvas mode detected, switching to chat mode...")
|
|
helper_fn.click_element(canvas_button_xpath)
|
|
time.sleep(2)
|
|
logging.info("Switched to chat mode")
|
|
else:
|
|
logging.info("Already in chat mode")
|
|
except Exception as canvas_error:
|
|
logging.warning(f"Canvas check encountered an error: {str(canvas_error)}")
|
|
|
|
# 4. Wait for response to appear and complete
|
|
logging.info("Step 4: Waiting for response...")
|
|
try:
|
|
# First, wait for response container to appear
|
|
response_container_xpath = (
|
|
"//div[contains(@class, "
|
|
"'markdown prose dark:prose-invert w-full break-words dark')]"
|
|
)
|
|
if helper_fn.wait_for_element(response_container_xpath, timeout=30):
|
|
logging.info("Initial response detected")
|
|
|
|
# Wait for response completion with timeout
|
|
start_time = time.time()
|
|
timeout = 180 # 3 minutes timeout
|
|
last_length = 0
|
|
stable_count = 0
|
|
|
|
while time.time() - start_time < timeout:
|
|
if not ensure_chrome_alive():
|
|
raise Exception("Chrome driver died during response wait")
|
|
|
|
try:
|
|
# Get current response
|
|
responses = helper_fn.find_elements(response_container_xpath)
|
|
if responses:
|
|
current_text = responses[-1].text
|
|
current_length = len(current_text)
|
|
|
|
logging.debug(f"Current response length: {current_length}, Last length: {last_length}")
|
|
|
|
# If text length hasn't changed and response seems complete
|
|
if current_length > 0 and current_length == last_length:
|
|
if is_response_complete():
|
|
stable_count += 1
|
|
logging.info(f"Response stable for {stable_count} checks")
|
|
if stable_count >= 3: # Require 3 consecutive stable checks
|
|
time.sleep(2) # Final verification wait
|
|
if current_length == len(responses[-1].text):
|
|
logging.info("Response completion confirmed")
|
|
break
|
|
else:
|
|
stable_count = 0 # Reset stable count if response is not complete
|
|
else:
|
|
stable_count = 0 # Reset stable count if length changed
|
|
|
|
last_length = current_length
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error while checking response: {str(e)}")
|
|
stable_count = 0 # Reset stable count on error
|
|
|
|
time.sleep(2) # Increased wait time to reduce CPU usage
|
|
|
|
if time.time() - start_time >= timeout:
|
|
logging.warning("Response timeout reached")
|
|
raise TimeoutException("Response generation timed out")
|
|
|
|
else:
|
|
raise Exception("Response container not found")
|
|
|
|
except TimeoutException as te:
|
|
logging.error(f"Timeout while waiting for response: {str(te)}")
|
|
raise
|
|
except Exception as e:
|
|
logging.error(f"Error in Step 4 (Response Detection): {str(e)}")
|
|
raise
|
|
|
|
# Short wait for UI to stabilize
|
|
time.sleep(2)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in Step 3 (Response Container): {str(e)}")
|
|
raise
|
|
|
|
# 5. Get response text directly
|
|
logging.info("Step 5: Getting response text...")
|
|
try:
|
|
response_elements = helper_fn.find_elements(response_container_xpath)
|
|
if response_elements:
|
|
last_response = response_elements[-1]
|
|
# Use JavaScript to get formatted text
|
|
formatted_text = driver.execute_script("""
|
|
var element = arguments[0];
|
|
var text = '';
|
|
var lastWasNewline = false;
|
|
var lastWasSpace = false;
|
|
|
|
function extractText(node) {
|
|
if (node.nodeType === Node.TEXT_NODE) {
|
|
var content = node.textContent.trim();
|
|
if (content) {
|
|
if (lastWasNewline) {
|
|
text += '\\n';
|
|
}
|
|
text += content;
|
|
lastWasNewline = false;
|
|
lastWasSpace = false;
|
|
}
|
|
} else if (node.nodeName === 'PRE') {
|
|
// Handle code blocks
|
|
if (lastWasNewline) {
|
|
text += '\\n';
|
|
}
|
|
text += '```\\n' + node.textContent + '\\n```\\n';
|
|
lastWasNewline = true;
|
|
lastWasSpace = false;
|
|
} else if (node.nodeName === 'CODE') {
|
|
// Handle inline code
|
|
text += '`' + node.textContent + '`';
|
|
lastWasNewline = false;
|
|
lastWasSpace = false;
|
|
} else if (node.nodeName === 'STRONG' || node.nodeName === 'B') {
|
|
// Handle bold text
|
|
if (!lastWasSpace) {
|
|
text += ' ';
|
|
}
|
|
text += '**' + node.textContent + '**';
|
|
lastWasNewline = false;
|
|
lastWasSpace = true;
|
|
} else if (node.nodeName === 'EM' || node.nodeName === 'I') {
|
|
// Handle italic text
|
|
if (!lastWasSpace) {
|
|
text += ' ';
|
|
}
|
|
text += '*' + node.textContent + '*';
|
|
lastWasNewline = false;
|
|
lastWasSpace = true;
|
|
} else if (node.nodeName === 'BLOCKQUOTE') {
|
|
// Handle blockquotes
|
|
if (lastWasNewline) {
|
|
text += '\\n';
|
|
}
|
|
text += '> ' + node.textContent.trim().replace(/\\n/g, '\\n> ') + '\\n';
|
|
lastWasNewline = true;
|
|
lastWasSpace = false;
|
|
} else if (node.nodeName === 'HR') {
|
|
// Handle horizontal rules
|
|
if (lastWasNewline) {
|
|
text += '\\n';
|
|
}
|
|
text += '\\n---\\n\\n';
|
|
lastWasNewline = true;
|
|
lastWasSpace = false;
|
|
} else if (node.nodeName === 'UL' || node.nodeName === 'OL') {
|
|
// Handle lists
|
|
if (lastWasNewline) {
|
|
text += '\\n';
|
|
}
|
|
var items = node.getElementsByTagName('li');
|
|
for (var i = 0; i < items.length; i++) {
|
|
text += (node.nodeName === 'OL' ? (i + 1) + '. ' : '- ') + items[i].textContent.trim() + '\\n';
|
|
}
|
|
lastWasNewline = true;
|
|
lastWasSpace = false;
|
|
} else if (node.nodeName === 'P') {
|
|
// Handle paragraphs
|
|
if (lastWasNewline) {
|
|
text += '\\n';
|
|
}
|
|
for (var child of node.childNodes) {
|
|
extractText(child);
|
|
}
|
|
text += '\\n';
|
|
lastWasNewline = true;
|
|
lastWasSpace = false;
|
|
} else if (node.nodeName === 'BR') {
|
|
// Handle line breaks
|
|
text += '\\n';
|
|
lastWasNewline = true;
|
|
lastWasSpace = false;
|
|
} else {
|
|
// Process child nodes
|
|
for (var child of node.childNodes) {
|
|
extractText(child);
|
|
}
|
|
}
|
|
}
|
|
|
|
extractText(element);
|
|
|
|
// Clean up excessive newlines
|
|
text = text.replace(/\\n{3,}/g, '\\n\\n');
|
|
|
|
// Ensure proper spacing around horizontal rules
|
|
text = text.replace(/\\n---\\n/g, '\\n\\n---\\n\\n');
|
|
|
|
// Clean up any double spaces
|
|
text = text.replace(/ +/g, ' ');
|
|
|
|
return text.trim();
|
|
""", last_response)
|
|
logging.info("Successfully extracted response text")
|
|
return formatted_text
|
|
else:
|
|
raise Exception("No response elements found")
|
|
except Exception as e:
|
|
logging.error(f"Error extracting response text: {str(e)}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Process failed: {str(e)}")
|
|
if not ensure_chrome_alive():
|
|
logging.error("Chrome driver died during process")
|
|
return f"Error occurred: {str(e)}"
|
|
|
|
def cleanup():
|
|
"""Clean up resources."""
|
|
driver_manager.quit()
|
|
|
|
if __name__ == "__main__":
|
|
initialize_chrome()
|
|
try:
|
|
response = make_gpt_request_and_copy("Hello, how are you?")
|
|
print(response)
|
|
finally:
|
|
cleanup()
|