BrowserLAM — LLM-Powered Browser Agent
An automation layer that navigates Chrome based on natural language. Finetuned for better web browsing.
How BrowserLAM Works

BrowserLAM uses a loop: starting with Chrome setup, going to URLs, getting the page state, sending it to an LLM, processing responses (tool calls or messages), and executing actions until the user quits.
Finetuning Details
BrowserLAM is finetuned using a SL where a human simulates the LLM, making decisions based on the same state the model would see. The user doesn't see the Chrome window (This prevents hallucination since in some cases, looking at the chrome window could give you more info on what's happening. Ex: a native popup that doesn't show up in the screenshots, since it's an overlay)
This code is used to gather training data for finetuning. It guides the user through opening Chrome with remote debugging, collects screenshots, interactable elements, and user/assistant actions, and saves them in a JSONL file for supervised learning:
from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys import os import time import re import json from llm import available_functions, system_prompt import random import io import base64 from PIL import Image, ImageDraw, ImageFont def wait_for_ajax(driver, timeout=5, max_inflight=2, idle_delay=500): """ Wait for network idle by monitoring active network requests. This function injects JavaScript to intercept XMLHttpRequest and fetch calls, tracking the number of active network requests. It then waits until the number of active requests is less than or equal to max_inflight and no new requests have been initiated for at least idle_delay milliseconds. Args: driver: Selenium WebDriver instance timeout: Maximum wait time in seconds (default: 15) max_inflight: Maximum allowed number of active connections (default: 2) idle_delay: Required idle time in milliseconds (default: 500) """ simulate_early_human_interaction(driver) time.sleep(0.3) start_time = time.time() driver.execute_script(""" if (!window.__networkIdleSetup) { window.__pendingRequests = 0; window.__lastRequestTime = Date.now(); window.__networkIdleSetup = true; (function(open) { XMLHttpRequest.prototype.open = function() { this.addEventListener("readystatechange", function(){ if (this.readyState === 1) { window.__pendingRequests++; window.__lastRequestTime = Date.now(); } if (this.readyState === 4) { window.__pendingRequests = Math.max(0, window.__pendingRequests - 1); if (window.__pendingRequests > 0) { window.__lastRequestTime = Date.now(); } } }, false); open.apply(this, arguments); }; })(XMLHttpRequest.prototype.open); if (window.fetch) { const originalFetch = window.fetch; window.fetch = function() { window.__pendingRequests++; window.__lastRequestTime = Date.now(); return originalFetch.apply(this, arguments).then(function(response) { window.__pendingRequests = Math.max(0, window.__pendingRequests - 1); if (window.__pendingRequests > 0) { window.__lastRequestTime = Date.now(); } return response; }).catch(function(error) { window.__pendingRequests = Math.max(0, window.__pendingRequests - 1); if (window.__pendingRequests > 0) { window.__lastRequestTime = Date.now(); } throw error; }); }; } } """) while True: pending = driver.execute_script("return window.__pendingRequests") if pending is None: pending = 0 now = int(time.time() * 1000) lt = driver.execute_script("return window.__lastRequestTime") last_time = int(lt) if lt is not None else now idle_time = now - last_time if pending <= max_inflight and idle_time >= idle_delay: break if time.time() - start_time > timeout: break time.sleep(0.1) quick_simulate_interaction(driver) def simulate_early_human_interaction(driver): """ Simulates human interaction to bypass anti-bot measures without actually clicking any elements. Uses a virtual "ghost click" approach that triggers JavaScript events but is guaranteed not to interact with page elements. """ try: driver.execute_script(""" // APPROACH 1: Document-level events (safest, won't click any actual elements) // Focus the window first (helps with activation) window.focus(); // Dispatch events directly to document object, not to any element document.dispatchEvent(new MouseEvent('mousemove', { bubbles: true, cancelable: true, view: window, clientX: 1, clientY: 1 })); document.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true, view: window, clientX: -10, // Negative coordinates ensure we're not clicking any elements clientY: -10 // Completely outside viewport })); // APPROACH 2: Create a temporary invisible element to interact with const ghostElement = document.createElement('div'); ghostElement.style.position = 'fixed'; ghostElement.style.top = '-100px'; // Off-screen ghostElement.style.left = '-100px'; // Off-screen ghostElement.style.width = '10px'; ghostElement.style.height = '10px'; ghostElement.style.pointerEvents = 'none'; // Can't be clicked by real mouse // Append temporarily document.body.appendChild(ghostElement); // Focus and click the ghost element ghostElement.focus(); ghostElement.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true, view: window })); // Clean up - remove the element document.body.removeChild(ghostElement); // APPROACH 3: Trigger user-like activity events that aren't clicks window.dispatchEvent(new Event('resize')); window.dispatchEvent(new Event('scroll')); return true; """) return True except Exception as e: return False def quick_simulate_interaction(driver): """ Quickly simulate minimal user interaction (focus and mouse movement) to trigger site behaviors that require human-like activity. Total execution time should be less than 100ms. Args: driver: Selenium WebDriver instance """ try: driver.execute_script("window.focus();") driver.execute_script(""" // Dispatch mousemove event at center of screen const centerX = window.innerWidth / 2; const centerY = window.innerHeight / 2; // Create and dispatch event const moveEvent = new MouseEvent('mousemove', { view: window, bubbles: true, cancelable: true, clientX: centerX, clientY: centerY }); document.dispatchEvent(moveEvent); // Create and dispatch a second move event with small offset const moveEvent2 = new MouseEvent('mousemove', { view: window, bubbles: true, cancelable: true, clientX: centerX + 10, clientY: centerY + 5 }); document.dispatchEvent(moveEvent2); """) return True except Exception as e: return False def setup_chrome_with_remote_debugging(): """ Set up Chrome with remote debugging to use the user's profile. Returns: WebDriver instance connected to the Chrome browser """ debugging_port = 9222 chrome_options = Options() chrome_options.add_experimental_option("debuggerAddress", f"127.0.0.1:{debugging_port}") driver = webdriver.Chrome(options=chrome_options) prevent_new_tabs(driver) return driver def prevent_new_tabs(driver): """ Inject JavaScript to prevent opening new tabs/windows, redirecting all such attempts to the current tab. Args: driver: Selenium WebDriver instance """ try: driver.execute_script(""" // Store the original window.open function const originalWindowOpen = window.open; // Override window.open to redirect to the same tab window.open = function(url, name, features) { if (url) { // Instead of opening a new tab, navigate the current window window.location.href = url; return window; } // If no URL provided, fallback to original but return the current window return originalWindowOpen(url, name, features) || window; }; // Add event listener for links with target="_blank" document.addEventListener('click', function(e) { // Find closest anchor element let target = e.target; while (target && target.tagName !== 'A') { target = target.parentElement; } // If this is a link that would open in a new tab/window if (target && (target.getAttribute('target') === '_blank' || target.getAttribute('rel') === 'noopener' || target.getAttribute('rel') === 'noreferrer')) { // Prevent the default action e.preventDefault(); // Get the href and navigate current window instead const href = target.getAttribute('href'); if (href && !href.startsWith('javascript:')) { window.location.href = href; } } }, true); // Override any window.open calls from within iframes try { const frames = document.querySelectorAll('iframe'); for (let i = 0; i < frames.length; i++) { if (frames[i].contentWindow) { frames[i].contentWindow.open = window.open; } } } catch (e) { // Ignore cross-origin errors } """) return True except Exception as e: print(f"Warning: Failed to set up new tab prevention: {e}") return False def get_interactable_elements_fast(driver, max_elements=100): """ Get interactable elements with improved detection for search results and other dynamic content. Args: driver: Selenium WebDriver instance max_elements: Maximum number of elements to return Returns: List of tuples containing (info_dict, index) for display """ try: result = driver.execute_script(""" // Find all potentially interactable elements with a more comprehensive selector const selector = "a, button, input, textarea, select, [role='button'], [role='link'], [role='tab'], " + "[role='menuitem'], [role='checkbox'], [role='radio'], [role='combobox'], " + "[onclick], [tabindex], .g .yuRUbf > a, .g .LC20lb, .tF2Cxc, .yuRUbf, " + ".rc, .r, .jtfYYd, .DKV0Md, .vvjwJb, .dbsr"; // Added Google-specific selectors const allElements = Array.from(document.querySelectorAll(selector)); // Improved visibility detection function isVisibleElement(el) { // Skip elements that are definitely not visible if (el.offsetWidth === 0 && el.offsetHeight === 0 && el.tagName !== 'LINK') return false; // Get computed style const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden') return false; // Check if element is within viewport const rect = el.getBoundingClientRect(); if (rect.width === 0 || rect.height === 0) return false; // Check if element is within the viewport or just outside it const viewHeight = Math.max(document.documentElement.clientHeight, window.innerHeight); const viewWidth = Math.max(document.documentElement.clientWidth, window.innerWidth); // Allow elements that are slightly outside viewport but might be scrolled to const vertInView = (rect.top > -500 && rect.top < viewHeight + 500); const horizInView = (rect.left > -500 && rect.left < viewWidth + 500); return vertInView && horizInView; } // Filter and process elements const results = []; for (const el of allElements) { // Skip duplicates (elements we've already processed) if (results.some(r => r.element === el)) continue; // Check if element is visible and enabled if (isVisibleElement(el) && !el.disabled) { // Get element info const info = { tagName: el.tagName.toLowerCase(), attributes: {}, xpath: getXPath(el) }; // Get important attributes ['id', 'name', 'class', 'type', 'value', 'placeholder', 'href', 'role', 'aria-label'].forEach(attr => { if (el.hasAttribute(attr)) { info.attributes[attr] = el.getAttribute(attr); } }); // Get text content const text = el.textContent.trim(); if (text) { info.attributes.text = text; } results.push({element: el, info: info}); // Limit results if (results.length >= arguments[0]) break; } } return results; // Function to get XPath for an element function getXPath(element) { if (element.id !== '') return `//*[@id="${element.id}"]`; if (element === document.body) return '/html/body'; let ix = 0; const siblings = element.parentNode.childNodes; for (let i = 0; i < siblings.length; i++) { const sibling = siblings[i]; if (sibling === element) { const path = getXPath(element.parentNode); const tag = element.tagName.toLowerCase(); return `${path}/${tag}[${ix+1}]`; } if (sibling.nodeType === 1 && sibling.tagName.toLowerCase() === element.tagName.toLowerCase()) { ix++; } } } """, max_elements) elements_with_info = [] for i, item in enumerate(result): elements_with_info.append((item['info'], i)) return elements_with_info except Exception as e: print(f"Error retrieving elements: {e}") return [] def format_element_info(element_info, index=None): """Format element info for display""" if index is None: index = element_info[1] # Use stored index if not provided info = element_info[0] # Get the info dict tag_name = info['tagName'] attrs = [] for k, v in info['attributes'].items(): if v and len(v) > 20: v = v[:20] + "..." attrs.append(f"{k}='{v}'") attr_str = " ".join(attrs) return f"{index}. <{tag_name} {attr_str}>" def get_active_form_element_info(driver): """Get information about the currently active form element if any""" try: active_element = driver.execute_script("return document.activeElement;") form_tags = ['input', 'textarea', 'select'] if active_element.tag_name.lower() in form_tags: attrs = {} for attr in ['id', 'name', 'class', 'type', 'value', 'placeholder', 'maxlength', 'required']: try: value = active_element.get_attribute(attr) if value: attrs[attr] = value except: pass try: form = driver.execute_script("return arguments[0].form;", active_element) if form: attrs['form_id'] = form.get_attribute('id') attrs['form_name'] = form.get_attribute('name') attrs['form_action'] = form.get_attribute('action') except: pass attr_str = ' '.join([f"{k}='{v}'" for k, v in attrs.items()]) return f"ACTIVE FORM ELEMENT: <{active_element.tag_name} {attr_str}>" return None except: return None def find_clickable_child(driver, element_info): """Find the most clickable child element of a container""" try: xpath = element_info.get('xpath', '') if not xpath: return None, None try: container = driver.find_element(By.XPATH, xpath) except: return None, None try: links = container.find_elements(By.TAG_NAME, 'a') if links: link_info = { 'tagName': 'a', 'attributes': {} } for attr in ['href', 'id', 'class', 'text']: value = links[0].get_attribute(attr) if value: link_info['attributes'][attr] = value return links[0], link_info buttons = container.find_elements(By.TAG_NAME, 'button') if buttons: return buttons[0], None clickables = container.find_elements(By.CSS_SELECTOR, '[onclick], [role="button"]') if clickables: return clickables[0], None except: pass return None, None except: return None, None def click_element_by_index(driver, index, elements_info): """Click element by index using a single reliable method""" if 0 <= index < len(elements_info): try: info = elements_info[index][0] print(f"Clicking element: {format_element_info(elements_info[index])}") if info['tagName'] in ['div', 'span', 'section', 'article']: child_element, child_info = find_clickable_child(driver, info) if child_element: print(f" Found clickable child element: {child_info['tagName'] if child_info else 'unknown'}") element = child_element else: xpath = info.get('xpath', '') element = None try: if xpath: element = driver.find_element(By.XPATH, xpath) except Exception as e: print(f" Warning: Could not find element by XPath: {e}") return False else: xpath = info.get('xpath', '') element = None try: if xpath: element = driver.find_element(By.XPATH, xpath) except Exception as e: print(f" Warning: Could not find element by XPath: {e}") return False if element: try: driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", element) time.sleep(0.3) # Short wait for scroll try: actions = ActionChains(driver) actions.move_to_element(element).pause(0.2) actions.move_by_offset(random.randint(-3, 3), random.randint(-3, 3)).pause(0.1) actions.click_and_hold().pause(0.1).release().perform() print(" Click method: Enhanced ActionChains click sequence") except Exception as e: print(f" ActionChains click failed: {e}") try: ActionChains(driver).move_to_element(element).click().perform() print(" Click method: Simple ActionChains click") except Exception as e: print(f" Simple ActionChains click failed: {e}") driver.execute_script("arguments[0].click();", element) print(" Also using JavaScript click for reliability") time.sleep(0.5) try: wait_for_ajax(driver) print("Loading complete") except Exception as e: print(f"Note: Error during page load wait: {e}") print("Continuing anyway...") return True except Exception as e: print(f" Error clicking element: {e}") return False else: print(" No element found to click") return False except Exception as e: print(f"Error clicking element: {e}") return False else: print(f"Invalid element number. Please enter a number between 0 and {len(elements_info)-1}") return False def build_selector_from_info(info): """Build a CSS selector from element info""" selectors = [] if 'id' in info['attributes']: selectors.append(f"#{info['attributes']['id']}") if 'class' in info['attributes']: class_selector = "." + info['attributes']['class'].replace(' ', '.') if len(class_selector) < 100: # Avoid overly complex selectors selectors.append(f"{info['tagName']}{class_selector}") if 'name' in info['attributes']: selectors.append(f"{info['tagName']}[name='{info['attributes']['name']}']") if 'role' in info['attributes'] and 'text' in info['attributes']: text = info['attributes']['text'] if len(text) < 20: # Only use short text selectors.append(f"{info['tagName']}[role='{info['attributes']['role']}']:contains('{text}')") if selectors: return selectors[0] # Use the first/best selector return info['tagName'] def get_cleaned_html(driver): """ Get a cleaned version of the HTML that focuses on visible content and structure, removing clutter like excessive attributes, empty elements, and script/style tags. Args: driver: Selenium WebDriver instance Returns: Cleaned HTML as a string """ try: html = driver.execute_script(""" // Create a function to clean the HTML function cleanHTML(node, depth = 0) { // Skip invisible elements if (node.nodeType === 1) { // Element node const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || (style.height === '0px' && style.overflow === 'hidden')) { return ''; } } // Text node - return text content if (node.nodeType === 3) { // Text node const text = node.textContent.trim(); if (text) { return ' '.repeat(depth) + text + '\\n'; } return ''; } // Skip script, style, link, meta, and other non-visible elements if (node.nodeType !== 1 || ['SCRIPT', 'STYLE', 'LINK', 'META', 'NOSCRIPT', 'TEMPLATE'].includes(node.tagName)) { return ''; } // Start building the element representation let result = ''; // Only include elements that might have visible content const tagName = node.tagName.toLowerCase(); // Get important attributes const attrs = []; if (node.id) attrs.push(`id="${node.id}"`); if (node.className && typeof node.className === 'string') { // Simplify class names (take only first 2 classes if there are many) const classes = node.className.split(' ').filter(c => c.trim()); if (classes.length > 2) { attrs.push(`class="${classes.slice(0, 2).join(' ')}..."`); } else if (classes.length > 0) { attrs.push(`class="${classes.join(' ')}"`); } } // Add href for links if (tagName === 'a' && node.href) { const href = node.href.replace(window.location.origin, ''); if (href.length > 30) { attrs.push(`href="${href.substring(0, 30)}..."`); } else { attrs.push(`href="${href}"`); } } // Add type for inputs if (tagName === 'input' && node.type) { attrs.push(`type="${node.type}"`); } // Add placeholder for inputs if ((tagName === 'input' || tagName === 'textarea') && node.placeholder) { attrs.push(`placeholder="${node.placeholder}"`); } // Add role if present if (node.getAttribute('role')) { attrs.push(`role="${node.getAttribute('role')}"`); } // Start the element const indent = ' '.repeat(depth); result += indent + '<' + tagName; if (attrs.length > 0) { result += ' ' + attrs.join(' '); } // Check if element has children const childNodes = Array.from(node.childNodes).filter(child => { // Filter out empty text nodes return !(child.nodeType === 3 && child.textContent.trim() === ''); }); if (childNodes.length === 0) { // Self-closing tag result += ' />\\n'; } else { result += '>\\n'; // Process children for (const child of childNodes) { result += cleanHTML(child, depth + 1); } // Close tag result += indent + '' + tagName + '>\\n'; } return result; } // Start with the body element return cleanHTML(document.body, 0); """) return html except Exception as e: return f"Error retrieving HTML: {e}" def save_training_data(messages): """Save the conversation history to training_data.jsonl""" TRAINING_DATA_FILE = "training_data.jsonl" example = { "messages": messages } if available_functions: example["functions"] = available_functions with open(TRAINING_DATA_FILE, "a") as f: f.write(json.dumps(example) + "\n") print(f"Added training example with {len(messages)} messages and {len(available_functions) if available_functions else 0} functions") def scroll_page(driver, direction="down", amount=500): """ Scroll the page or a scrollable element within the viewport Args: driver: Selenium WebDriver instance direction: Either "up" or "down" amount: Number of pixels to scroll Returns: True if the scroll was successful, False otherwise """ try: scroll_amount = amount if direction == "down" else -amount scroll_script = """ function findScrollableElement() { // First, check if any element has focus and is scrollable const activeElement = document.activeElement; if (activeElement && activeElement !== document.body && activeElement !== document.documentElement) { const style = window.getComputedStyle(activeElement); const overflowY = style.getPropertyValue('overflow-y'); if (['scroll', 'auto'].includes(overflowY) && activeElement.scrollHeight > activeElement.clientHeight) { return activeElement; } } // Next, look for modals or common scrollable containers currently visible const modalSelectors = [ '.modal.show', '.modal-body', '.modal-content', '[role="dialog"]', '[aria-modal="true"]', '.popup', '.overlay', '.drawer', '.scroll-container', '.overflow-y-auto', '.overflow-y-scroll', '.v-dialog', '.MuiModal-root', '.ReactModal__Content' ]; for (const selector of modalSelectors) { const elements = Array.from(document.querySelectorAll(selector)); for (const el of elements) { if (el.offsetParent !== null) { // Check if visible const style = window.getComputedStyle(el); const overflowY = style.getPropertyValue('overflow-y'); if (['scroll', 'auto'].includes(overflowY) && el.scrollHeight > el.clientHeight) { return el; } } } } // Check for scrollable elements under the cursor (middle of viewport) const viewportMiddleX = window.innerWidth / 2; const viewportMiddleY = window.innerHeight / 2; let element = document.elementFromPoint(viewportMiddleX, viewportMiddleY); while (element && element !== document.body && element !== document.documentElement) { const style = window.getComputedStyle(element); const overflowY = style.getPropertyValue('overflow-y'); if (['scroll', 'auto'].includes(overflowY) && element.scrollHeight > element.clientHeight) { return element; } element = element.parentElement; } // Finally, default to scrolling the whole page return window; } const scrollable = findScrollableElement(); const amount = arguments[0]; if (scrollable === window) { window.scrollBy(0, amount); } else { scrollable.scrollTop += amount; } return { element: scrollable === window ? 'window' : scrollable.tagName, scrollAmount: amount, totalHeight: scrollable === window ? document.documentElement.scrollHeight : scrollable.scrollHeight, visibleHeight: scrollable === window ? window.innerHeight : scrollable.clientHeight, scrollPosition: scrollable === window ? window.scrollY : scrollable.scrollTop }; """ result = driver.execute_script(scroll_script, scroll_amount) print(f"Scrolled {direction} by {abs(scroll_amount)} pixels") print(f"Scrollable element: {result['element']}") print(f"Current position: {result['scrollPosition']} / {result['totalHeight'] - result['visibleHeight']}") time.sleep(0.3) return True except Exception as e: print(f"Error scrolling {direction}: {e}") return False def get_human_assistant_response(): """Get assistant response from human input, formatted exactly like the API response""" print("\nNow simulate the assistant's response.") print("Options:") print("1. Text message (just type your message)") print("2. Function call: click(e.g., 'click 5')") print("3. Function call: type (e.g., 'type hello world')") print("4. Function call: press enter") print("5. Function call: navigate (e.g., 'navigate https://google.com')") print("6. Function call: scroll up [amount] (e.g., 'scroll up 300')") print("7. Function call: scroll down [amount] (e.g., 'scroll down')") print("8. Function call: command_f_click (e.g., 'command_f_click Hello World')") print("9. Function call: reload_page") print("Type 'quit' to end the session and save the training data") assistant_input = input("\nAssistant: ").strip() if assistant_input.lower() == "quit": return {"quit": True} if re.match(r"^click\s+\d+", assistant_input, re.IGNORECASE): button_index = int(re.search(r"\d+", assistant_input).group()) tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": "click_button", "arguments": json.dumps({"button_number": button_index}) } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": json.dumps({ "status": "success", "message": f"Button {button_index} was successfully clicked.", "data": { "buttonIndex": button_index, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": "click_button", "args": {"button_number": button_index} } elif assistant_input.lower().startswith("type "): text = assistant_input[5:] tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": "type_text", "arguments": json.dumps({"text": text}) } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": json.dumps({ "status": "success", "message": f"Text '{text}' was successfully typed.", "data": { "text": text, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": "type_text", "args": {"text": text} } elif assistant_input.lower() == "press enter": tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": "press_enter", "arguments": "{}" } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": json.dumps({ "status": "success", "message": "Enter key was successfully pressed.", "data": { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": "press_enter", "args": {} } elif assistant_input.lower().startswith("navigate "): url = assistant_input[9:].strip() tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": "navigate_to", "arguments": json.dumps({"url": url}) } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": json.dumps({ "status": "success", "message": f"Successfully navigated to {url}.", "data": { "url": url, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": "navigate_to", "args": {"url": url} } elif assistant_input.lower().startswith("scroll up") or assistant_input.lower().startswith("scroll down"): direction = "up" if assistant_input.lower().startswith("scroll up") else "down" function_name = "scroll_up" if direction == "up" else "scroll_down" amount = 500 # Default amount match = re.search(r"\d+", assistant_input) if match: amount = int(match.group()) tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": function_name, "arguments": json.dumps({"amount": amount}) } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": json.dumps({ "status": "success", "message": f"Successfully scrolled {direction} by {amount} pixels.", "data": { "direction": direction, "amount": amount, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": function_name, "args": {"amount": amount} } elif assistant_input.lower().startswith("command_f_click "): text = assistant_input[16:].strip() tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": "command_f_click", "arguments": json.dumps({"text": text}) } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": "" # Will be filled below } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": "command_f_click", "args": {"text": text} } elif assistant_input.lower() == "reload_page": tool_id = f"call_{int(time.time())}" assistant_entry = { "role": "assistant", "content": [], "tool_calls": [ { "type": "function", "id": tool_id, "function": { "name": "reload_page", "arguments": "{}" } } ] } tool_response = { "role": "tool", "content": [ { "type": "text", "text": "" # Will be filled below } ], "tool_call_id": tool_id } return { "entry": assistant_entry, "tool_response": tool_response, "function": "reload_page", "args": {} } else: assistant_entry = { "role": "assistant", "content": [ { "type": "text", "text": assistant_input } ] } return {"entry": assistant_entry, "message": assistant_input} def draw_bounding_boxes(screenshot_bytes, elements_info, driver): """ Draw colored bounding boxes around all interactable elements. Args: screenshot_bytes: Screenshot as bytes elements_info: List of tuples containing element info and index driver: Selenium WebDriver instance Returns: Modified screenshot as bytes and base64 string """ try: image = Image.open(io.BytesIO(screenshot_bytes)) draw = ImageDraw.Draw(image) device_pixel_ratio = driver.execute_script("return window.devicePixelRatio;") or 1 used_colors = {} for element_info, index in elements_info: xpath = element_info.get('xpath', '') if not xpath: continue try: element = driver.find_element(By.XPATH, xpath) rect = driver.execute_script(""" const rect = arguments[0].getBoundingClientRect(); return { x: rect.left, y: rect.top, width: rect.width, height: rect.height }; """, element) x = int(rect['x'] * device_pixel_ratio) y = int(rect['y'] * device_pixel_ratio) width = int(rect['width'] * device_pixel_ratio) height = int(rect['height'] * device_pixel_ratio) element_id = str(index) adjacent_colors = [] for other_id, color in used_colors.items(): if abs(int(other_id) - index) < 3: adjacent_colors.append(color) while True: r = random.randint(100, 255) g = random.randint(100, 255) b = random.randint(100, 255) color = (r, g, b) is_distinct = True for adj_color in adjacent_colors: diff = sum(abs(color[i] - adj_color[i]) for i in range(3)) if diff < 150: # Color difference threshold is_distinct = False break if is_distinct or not adjacent_colors: break used_colors[element_id] = color rect_thickness = max(1, int(2 * device_pixel_ratio)) draw.rectangle([x, y, x + width, y + height], outline=color, width=rect_thickness) font_size = max(10, int(12 * device_pixel_ratio)) try: font = ImageFont.truetype("Arial", font_size) except: font = ImageFont.load_default() text = str(index) text_width, text_height = draw.textsize(text, font=font) if hasattr(draw, 'textsize') else (font_size * len(text), font_size) text_x = x + 2 text_y = y + 2 draw.rectangle([text_x - 2, text_y - 2, text_x + text_width + 2, text_y + text_height + 2], fill=color) draw.text((text_x, text_y), text, fill=(0, 0, 0), font=font) except Exception as e: print(f"Could not draw box for element {index}: {e}") output_buffer = io.BytesIO() image.save(output_buffer, format='PNG') modified_screenshot_bytes = output_buffer.getvalue() base64_screenshot = base64.b64encode(modified_screenshot_bytes).decode('utf-8') return modified_screenshot_bytes, base64_screenshot except Exception as e: print(f"Error drawing bounding boxes: {e}") return screenshot_bytes, base64.b64encode(screenshot_bytes).decode('utf-8') def command_f_click(driver, text): """ Find text on the page and click on it, even if it's not identified as a clickable element. Args: driver: Selenium WebDriver instance text: The text to search for Returns: Dictionary with status, debug_info, and error (if any) """ debug_info = [] try: debug_info.append(f"Searching for text: '{text}'") print(f"Searching for text: '{text}'") script = """ function findElementsWithText(searchText) { const searchTextLower = searchText.toLowerCase(); // First, try to find elements that contain exactly this text const exactMatches = []; // Then, find elements that contain this text as a substring const partialMatches = []; // Walk through the DOM and find text nodes const walker = document.createTreeWalker( document.body, NodeFilter.SHOW_TEXT, null, false ); while (walker.nextNode()) { const node = walker.currentNode; const nodeText = node.textContent.trim(); if (nodeText) { const parent = node.parentElement; // Skip hidden elements if (!parent || !parent.offsetParent) continue; const style = window.getComputedStyle(parent); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { continue; } // Check for exact matches (case insensitive) if (nodeText.toLowerCase() === searchTextLower) { exactMatches.push({ element: parent, text: nodeText, isExact: true, rect: parent.getBoundingClientRect() }); } // Check for partial matches else if (nodeText.toLowerCase().includes(searchTextLower)) { partialMatches.push({ element: parent, text: nodeText, isExact: false, rect: parent.getBoundingClientRect() }); } } } // Combine matches, prioritizing exact matches const allMatches = [...exactMatches, ...partialMatches]; // If no direct matches found, try to look for buttons, links, etc. with this text if (allMatches.length === 0) { // Look for elements with aria labels, titles, etc. const potentialElements = Array.from(document.querySelectorAll( 'a, button, [role="button"], [role="link"], [onclick], [aria-label], [title]' )); for (const el of potentialElements) { // Check different attributes const label = el.getAttribute('aria-label') || ''; const title = el.getAttribute('title') || ''; const innerText = el.innerText || ''; if (label.toLowerCase().includes(searchTextLower) || title.toLowerCase().includes(searchTextLower) || innerText.toLowerCase().includes(searchTextLower)) { // Skip hidden elements if (!el.offsetParent) continue; const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { continue; } const isExact = label.toLowerCase() === searchTextLower || title.toLowerCase() === searchTextLower || innerText.toLowerCase() === searchTextLower; allMatches.push({ element: el, text: innerText || label || title, isExact: isExact, rect: el.getBoundingClientRect() }); } } } // Filter visible elements (fully or partially in viewport) const visibleMatches = allMatches.filter(match => { const rect = match.rect; return rect.width > 0 && rect.height > 0 && rect.top < window.innerHeight && rect.bottom > 0 && rect.left < window.innerWidth && rect.right > 0; }); // Return organized results return { visibleExactMatches: visibleMatches.filter(m => m.isExact), visiblePartialMatches: visibleMatches.filter(m => !m.isExact), allExactMatches: allMatches.filter(m => m.isExact), allPartialMatches: allMatches.filter(m => !m.isExact) }; } return findElementsWithText(arguments[0]); """ result = driver.execute_script(script, text) matches = result.get('visibleExactMatches', []) if not matches: matches = result.get('visiblePartialMatches', []) if not matches: matches = result.get('allExactMatches', []) if not matches: matches = result.get('allPartialMatches', []) if not matches: print(f"No elements found containing text '{text}'") return { "success": False, "debug_info": debug_info, "error": f"No elements found containing text '{text}'" } match = driver.execute_script("return arguments[0].element;", matches[0]) match_text = driver.execute_script("return arguments[0].text;", matches[0]) driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", match) time.sleep(0.5) # Wait for scroll to complete print(f"Found element with text: '{match_text}'") print(f"Element tag: {match.tag_name}") try: actions = ActionChains(driver) actions.move_to_element(match).pause(0.2).click().perform() print("Clicked using ActionChains") except Exception as e: print(f"ActionChains click failed: {e}") try: match.click() print("Clicked using WebElement.click()") except Exception as e: print(f"WebElement click failed: {e}") driver.execute_script("arguments[0].click();", match) print("Clicked using JavaScript") time.sleep(0.5) try: wait_for_ajax(driver) print("Loading complete") except Exception as e: print(f"Note: Error during page load wait: {e}") print("Continuing anyway...") return { "success": True, "debug_info": debug_info, "clicked_text": match_text, "method": "ActionChains" if isinstance(actions, ActionChains) else "WebElement.click()" if isinstance(match, webdriver.WebElement) else "JavaScript", "data": { "text": match_text, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } } except Exception as e: error_msg = f"Error finding and clicking text '{text}': {str(e)}" debug_info.append(error_msg) print(error_msg) return { "success": False, "debug_info": debug_info, "error": error_msg } def navigate_to(driver, url): """ Navigate to a URL and ensure any new tab attempts are redirected. Args: driver: Selenium WebDriver instance url: URL to navigate to """ try: driver.get(url) prevent_new_tabs(driver) wait_for_ajax(driver) return True except Exception as e: print(f"Error navigating to {url}: {e}") return False def reload_page(driver): """ Reload the current page and wait for it to load. Args: driver: Selenium WebDriver instance Returns: Dictionary with status and message """ try: current_url = driver.current_url print(f"Reloading page: {current_url}") driver.refresh() try: wait_for_ajax(driver) print("Page reload complete") except Exception as e: print(f"Note: Error during page load wait after reload: {e}") print("Continuing anyway...") prevent_new_tabs(driver) return { "success": True, "message": f"Successfully reloaded the page: {current_url}", "data": { "url": current_url, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } } except Exception as e: error_msg = f"Error reloading page: {str(e)}" print(error_msg) return { "success": False, "message": error_msg, "data": { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } } def main(): print("Please start Chrome with remote debugging using this command:") print("Windows: 'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe' --remote-debugging-port=9222") print("Mac: '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome' --remote-debugging-port=9222") print("Linux: 'google-chrome --remote-debugging-port=9222'") input("Press Enter once you've started Chrome with remote debugging...") if not os.path.exists("training_data.jsonl"): open("training_data.jsonl", "w").close() print("Created new training_data.jsonl file") if not os.path.exists("screenshots"): os.makedirs("screenshots") print("Created screenshots directory") print("\nWhat task would you like to perform? (This will be the initial user query)") initial_query = input("User query: ") if not initial_query.strip(): initial_query = "What's the weather in Tokyo?" # Default query driver = setup_chrome_with_remote_debugging() start_url = input("\nEnter starting URL (default: https://www.google.com): ") if not start_url: start_url = "https://www.google.com" navigate_to(driver, start_url) print("Initial page loaded.") messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [{"type": "text", "text": initial_query}]} ] screenshot_count = 0 while True: try: elements_with_info = get_interactable_elements_fast(driver) screenshot_bytes = driver.get_screenshot_as_png() modified_screenshot_bytes, base64_screenshot = draw_bounding_boxes(screenshot_bytes, elements_with_info, driver) screenshot_path = "screenshots/latest.png" with open(screenshot_path, "wb") as f: f.write(modified_screenshot_bytes) print(f"Updated screenshot at {screenshot_path}") screenshot_content = [ {"type": "text", "text": f"[SYSTEM] Page screenshot {screenshot_count}:"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_screenshot}", "detail": "low"}} ] messages.append({"role": "user", "content": screenshot_content}) screenshot_count += 1 output = f"\n--- CURRENT URL: {driver.current_url} ---" output += "\n\n--- INTERACTABLE ELEMENTS ---" for element_info in elements_with_info: output += "\n" + format_element_info(element_info) active_form_info = get_active_form_element_info(driver) if active_form_info: output += "\n\n" + active_form_info else: output += "\n\nNo active form element found" messages.append({"role": "system", "content": [{"type": "text", "text": output}]}) print(output) print(f"\nScreenshot {screenshot_count-1} added to conversation") assistant_response = get_human_assistant_response() if "quit" in assistant_response: break messages.append(assistant_response["entry"]) if "function" in assistant_response: function_name = assistant_response["function"] print(f"\nCalled function: {function_name}") if function_name == "click_button": index = assistant_response["args"]["button_number"] click_element_by_index(driver, index, elements_with_info) elif function_name == "type_text": text = assistant_response["args"]["text"] try: active_element = driver.switch_to.active_element active_element.send_keys(text) print(f"Typed: {text}") except Exception as e: print(f"Error typing text: {e}") elif function_name == "press_enter": try: active_element = driver.switch_to.active_element active_element.send_keys(Keys.ENTER) print("Pressed Enter key") wait_for_ajax(driver) print("Loading complete") except Exception as e: print(f"Error pressing Enter key: {e}") elif function_name == "navigate_to": url = assistant_response["args"]["url"] try: print(f"Navigating to: {url}") navigate_to(driver, url) print("Loading complete") except Exception as e: print(f"Error navigating to {url}: {e}") elif function_name == "scroll_up" or function_name == "scroll_down": amount = assistant_response["args"].get("amount", 500) direction = "up" if function_name == "scroll_up" else "down" scroll_page(driver, direction, amount) elif function_name == "command_f_click": text = assistant_response["args"]["text"] try: print(f"Finding and clicking text: {text}") result = command_f_click(driver, text) debug_str = "\n".join(result.get("debug_info", [])) if result.get("success", False): assistant_response["tool_response"]["content"][0]["text"] = json.dumps({ "status": "success", "message": f"Successfully found and clicked text '{text}'.", "clicked_text": result.get("clicked_text", text), "method": result.get("method", "unknown"), "debug_info": debug_str, "data": { "text": text, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) else: assistant_response["tool_response"]["content"][0]["text"] = json.dumps({ "status": "error", "message": f"Failed to find or click element with text '{text}'.", "error": result.get("error", "Unknown error"), "debug_info": debug_str, "data": { "text": text, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) except Exception as e: print(f"Error finding and clicking text '{text}': {e}") assistant_response["tool_response"]["content"][0]["text"] = json.dumps({ "status": "error", "message": f"Failed to find or click element with text '{text}'.", "error": str(e), "data": { "text": text, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) messages.append(assistant_response["tool_response"]) continue elif function_name == "reload_page": try: print("Reloading page") result = reload_page(driver) if result.get("success", False): assistant_response["tool_response"]["content"][0]["text"] = json.dumps({ "status": "success", "message": "Successfully reloaded the page.", "data": { "url": driver.current_url, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) else: assistant_response["tool_response"]["content"][0]["text"] = json.dumps({ "status": "error", "message": "Failed to reload the page.", "error": result.get("message", "Unknown error"), "data": { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) except Exception as e: print(f"Error reloading page: {e}") assistant_response["tool_response"]["content"][0]["text"] = json.dumps({ "status": "error", "message": "Failed to reload the page.", "error": str(e), "data": { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } }, indent=2) messages.append(assistant_response["tool_response"]) continue if "message" in assistant_response: user_response = input("Enter additional user response (or press Enter to continue, 'quit' to end): ") if user_response.strip().lower() == "quit": print("User requested to quit. Ending session.") break elif user_response.strip(): messages.append({ "role": "user", "content": [{"type": "text", "text": user_response}] }) continue except Exception as e: print(f"An error occurred: {e}") if input("Continue? (y/n): ").lower() != 'y': break print("Session ended. Saving training data...") save_training_data(messages) print("You can continue using the Chrome browser.") if __name__ == "__main__": main()
This makes sure the LLM is trained on realistic, high-quality examples of browser automation, matching the deployment environment.