BrowserLAM — LLM-Powered Browser Agent
An automation layer that navigates Chrome based on natural language. Finetuned for better web browsing.
How BrowserLAM Works
BrowserLAM uses a loop: starting with Chrome setup, going to URLs, getting the page state, sending it to an LLM, processing responses (tool calls or messages), and executing actions until the user quits.
Finetuning Details
BrowserLAM is finetuned using a SL where a human simulates the LLM, making decisions based on the same state the model would see. The user doesn't see the Chrome window (This prevents hallucination since in some cases, looking at the chrome window could give you more info on what's happening. Ex: a native popup that doesn't show up in the screenshots, since it's an overlay)
This code is used to gather training data for finetuning. It guides the user through opening Chrome with remote debugging, collects screenshots, interactable elements, and user/assistant actions, and saves them in a JSONL file for supervised learning:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
import os
import time
import re
import json
from llm import available_functions, system_prompt
import random
import io
import base64
from PIL import Image, ImageDraw, ImageFont
def wait_for_ajax(driver, timeout=5, max_inflight=2, idle_delay=500):
"""
Wait for network idle by monitoring active network requests.
This function injects JavaScript to intercept XMLHttpRequest and fetch calls,
tracking the number of active network requests. It then waits until the number of
active requests is less than or equal to max_inflight and no new requests have been
initiated for at least idle_delay milliseconds.
Args:
driver: Selenium WebDriver instance
timeout: Maximum wait time in seconds (default: 15)
max_inflight: Maximum allowed number of active connections (default: 2)
idle_delay: Required idle time in milliseconds (default: 500)
"""
simulate_early_human_interaction(driver)
time.sleep(0.3)
start_time = time.time()
driver.execute_script("""
if (!window.__networkIdleSetup) {
window.__pendingRequests = 0;
window.__lastRequestTime = Date.now();
window.__networkIdleSetup = true;
(function(open) {
XMLHttpRequest.prototype.open = function() {
this.addEventListener("readystatechange", function(){
if (this.readyState === 1) {
window.__pendingRequests++;
window.__lastRequestTime = Date.now();
}
if (this.readyState === 4) {
window.__pendingRequests = Math.max(0, window.__pendingRequests - 1);
if (window.__pendingRequests > 0) {
window.__lastRequestTime = Date.now();
}
}
}, false);
open.apply(this, arguments);
};
})(XMLHttpRequest.prototype.open);
if (window.fetch) {
const originalFetch = window.fetch;
window.fetch = function() {
window.__pendingRequests++;
window.__lastRequestTime = Date.now();
return originalFetch.apply(this, arguments).then(function(response) {
window.__pendingRequests = Math.max(0, window.__pendingRequests - 1);
if (window.__pendingRequests > 0) {
window.__lastRequestTime = Date.now();
}
return response;
}).catch(function(error) {
window.__pendingRequests = Math.max(0, window.__pendingRequests - 1);
if (window.__pendingRequests > 0) {
window.__lastRequestTime = Date.now();
}
throw error;
});
};
}
}
""")
while True:
pending = driver.execute_script("return window.__pendingRequests")
if pending is None:
pending = 0
now = int(time.time() * 1000)
lt = driver.execute_script("return window.__lastRequestTime")
last_time = int(lt) if lt is not None else now
idle_time = now - last_time
if pending <= max_inflight and idle_time >= idle_delay:
break
if time.time() - start_time > timeout:
break
time.sleep(0.1)
quick_simulate_interaction(driver)
def simulate_early_human_interaction(driver):
"""
Simulates human interaction to bypass anti-bot measures without actually clicking any elements.
Uses a virtual "ghost click" approach that triggers JavaScript events but is guaranteed
not to interact with page elements.
"""
try:
driver.execute_script("""
// APPROACH 1: Document-level events (safest, won't click any actual elements)
// Focus the window first (helps with activation)
window.focus();
// Dispatch events directly to document object, not to any element
document.dispatchEvent(new MouseEvent('mousemove', {
bubbles: true,
cancelable: true,
view: window,
clientX: 1,
clientY: 1
}));
document.dispatchEvent(new MouseEvent('click', {
bubbles: true,
cancelable: true,
view: window,
clientX: -10, // Negative coordinates ensure we're not clicking any elements
clientY: -10 // Completely outside viewport
}));
// APPROACH 2: Create a temporary invisible element to interact with
const ghostElement = document.createElement('div');
ghostElement.style.position = 'fixed';
ghostElement.style.top = '-100px'; // Off-screen
ghostElement.style.left = '-100px'; // Off-screen
ghostElement.style.width = '10px';
ghostElement.style.height = '10px';
ghostElement.style.pointerEvents = 'none'; // Can't be clicked by real mouse
// Append temporarily
document.body.appendChild(ghostElement);
// Focus and click the ghost element
ghostElement.focus();
ghostElement.dispatchEvent(new MouseEvent('click', {
bubbles: true,
cancelable: true,
view: window
}));
// Clean up - remove the element
document.body.removeChild(ghostElement);
// APPROACH 3: Trigger user-like activity events that aren't clicks
window.dispatchEvent(new Event('resize'));
window.dispatchEvent(new Event('scroll'));
return true;
""")
return True
except Exception as e:
return False
def quick_simulate_interaction(driver):
"""
Quickly simulate minimal user interaction (focus and mouse movement)
to trigger site behaviors that require human-like activity.
Total execution time should be less than 100ms.
Args:
driver: Selenium WebDriver instance
"""
try:
driver.execute_script("window.focus();")
driver.execute_script("""
// Dispatch mousemove event at center of screen
const centerX = window.innerWidth / 2;
const centerY = window.innerHeight / 2;
// Create and dispatch event
const moveEvent = new MouseEvent('mousemove', {
view: window,
bubbles: true,
cancelable: true,
clientX: centerX,
clientY: centerY
});
document.dispatchEvent(moveEvent);
// Create and dispatch a second move event with small offset
const moveEvent2 = new MouseEvent('mousemove', {
view: window,
bubbles: true,
cancelable: true,
clientX: centerX + 10,
clientY: centerY + 5
});
document.dispatchEvent(moveEvent2);
""")
return True
except Exception as e:
return False
def setup_chrome_with_remote_debugging():
"""
Set up Chrome with remote debugging to use the user's profile.
Returns:
WebDriver instance connected to the Chrome browser
"""
debugging_port = 9222
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", f"127.0.0.1:{debugging_port}")
driver = webdriver.Chrome(options=chrome_options)
prevent_new_tabs(driver)
return driver
def prevent_new_tabs(driver):
"""
Inject JavaScript to prevent opening new tabs/windows,
redirecting all such attempts to the current tab.
Args:
driver: Selenium WebDriver instance
"""
try:
driver.execute_script("""
// Store the original window.open function
const originalWindowOpen = window.open;
// Override window.open to redirect to the same tab
window.open = function(url, name, features) {
if (url) {
// Instead of opening a new tab, navigate the current window
window.location.href = url;
return window;
}
// If no URL provided, fallback to original but return the current window
return originalWindowOpen(url, name, features) || window;
};
// Add event listener for links with target="_blank"
document.addEventListener('click', function(e) {
// Find closest anchor element
let target = e.target;
while (target && target.tagName !== 'A') {
target = target.parentElement;
}
// If this is a link that would open in a new tab/window
if (target &&
(target.getAttribute('target') === '_blank' ||
target.getAttribute('rel') === 'noopener' ||
target.getAttribute('rel') === 'noreferrer')) {
// Prevent the default action
e.preventDefault();
// Get the href and navigate current window instead
const href = target.getAttribute('href');
if (href && !href.startsWith('javascript:')) {
window.location.href = href;
}
}
}, true);
// Override any window.open calls from within iframes
try {
const frames = document.querySelectorAll('iframe');
for (let i = 0; i < frames.length; i++) {
if (frames[i].contentWindow) {
frames[i].contentWindow.open = window.open;
}
}
} catch (e) {
// Ignore cross-origin errors
}
""")
return True
except Exception as e:
print(f"Warning: Failed to set up new tab prevention: {e}")
return False
def get_interactable_elements_fast(driver, max_elements=100):
"""
Get interactable elements with improved detection for search results and other dynamic content.
Args:
driver: Selenium WebDriver instance
max_elements: Maximum number of elements to return
Returns:
List of tuples containing (info_dict, index) for display
"""
try:
result = driver.execute_script("""
// Find all potentially interactable elements with a more comprehensive selector
const selector = "a, button, input, textarea, select, [role='button'], [role='link'], [role='tab'], " +
"[role='menuitem'], [role='checkbox'], [role='radio'], [role='combobox'], " +
"[onclick], [tabindex], .g .yuRUbf > a, .g .LC20lb, .tF2Cxc, .yuRUbf, " +
".rc, .r, .jtfYYd, .DKV0Md, .vvjwJb, .dbsr"; // Added Google-specific selectors
const allElements = Array.from(document.querySelectorAll(selector));
// Improved visibility detection
function isVisibleElement(el) {
// Skip elements that are definitely not visible
if (el.offsetWidth === 0 && el.offsetHeight === 0 && el.tagName !== 'LINK') return false;
// Get computed style
const style = window.getComputedStyle(el);
if (style.display === 'none' || style.visibility === 'hidden') return false;
// Check if element is within viewport
const rect = el.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
// Check if element is within the viewport or just outside it
const viewHeight = Math.max(document.documentElement.clientHeight, window.innerHeight);
const viewWidth = Math.max(document.documentElement.clientWidth, window.innerWidth);
// Allow elements that are slightly outside viewport but might be scrolled to
const vertInView = (rect.top > -500 && rect.top < viewHeight + 500);
const horizInView = (rect.left > -500 && rect.left < viewWidth + 500);
return vertInView && horizInView;
}
// Filter and process elements
const results = [];
for (const el of allElements) {
// Skip duplicates (elements we've already processed)
if (results.some(r => r.element === el)) continue;
// Check if element is visible and enabled
if (isVisibleElement(el) && !el.disabled) {
// Get element info
const info = {
tagName: el.tagName.toLowerCase(),
attributes: {},
xpath: getXPath(el)
};
// Get important attributes
['id', 'name', 'class', 'type', 'value', 'placeholder', 'href', 'role', 'aria-label'].forEach(attr => {
if (el.hasAttribute(attr)) {
info.attributes[attr] = el.getAttribute(attr);
}
});
// Get text content
const text = el.textContent.trim();
if (text) {
info.attributes.text = text;
}
results.push({element: el, info: info});
// Limit results
if (results.length >= arguments[0]) break;
}
}
return results;
// Function to get XPath for an element
function getXPath(element) {
if (element.id !== '') return `//*[@id="${element.id}"]`;
if (element === document.body) return '/html/body';
let ix = 0;
const siblings = element.parentNode.childNodes;
for (let i = 0; i < siblings.length; i++) {
const sibling = siblings[i];
if (sibling === element) {
const path = getXPath(element.parentNode);
const tag = element.tagName.toLowerCase();
return `${path}/${tag}[${ix+1}]`;
}
if (sibling.nodeType === 1 && sibling.tagName.toLowerCase() === element.tagName.toLowerCase()) {
ix++;
}
}
}
""", max_elements)
elements_with_info = []
for i, item in enumerate(result):
elements_with_info.append((item['info'], i))
return elements_with_info
except Exception as e:
print(f"Error retrieving elements: {e}")
return []
def format_element_info(element_info, index=None):
"""Format element info for display"""
if index is None:
index = element_info[1] # Use stored index if not provided
info = element_info[0] # Get the info dict
tag_name = info['tagName']
attrs = []
for k, v in info['attributes'].items():
if v and len(v) > 20:
v = v[:20] + "..."
attrs.append(f"{k}='{v}'")
attr_str = " ".join(attrs)
return f"{index}. <{tag_name} {attr_str}>"
def get_active_form_element_info(driver):
"""Get information about the currently active form element if any"""
try:
active_element = driver.execute_script("return document.activeElement;")
form_tags = ['input', 'textarea', 'select']
if active_element.tag_name.lower() in form_tags:
attrs = {}
for attr in ['id', 'name', 'class', 'type', 'value', 'placeholder', 'maxlength', 'required']:
try:
value = active_element.get_attribute(attr)
if value:
attrs[attr] = value
except:
pass
try:
form = driver.execute_script("return arguments[0].form;", active_element)
if form:
attrs['form_id'] = form.get_attribute('id')
attrs['form_name'] = form.get_attribute('name')
attrs['form_action'] = form.get_attribute('action')
except:
pass
attr_str = ' '.join([f"{k}='{v}'" for k, v in attrs.items()])
return f"ACTIVE FORM ELEMENT: <{active_element.tag_name} {attr_str}>"
return None
except:
return None
def find_clickable_child(driver, element_info):
"""Find the most clickable child element of a container"""
try:
xpath = element_info.get('xpath', '')
if not xpath:
return None, None
try:
container = driver.find_element(By.XPATH, xpath)
except:
return None, None
try:
links = container.find_elements(By.TAG_NAME, 'a')
if links:
link_info = {
'tagName': 'a',
'attributes': {}
}
for attr in ['href', 'id', 'class', 'text']:
value = links[0].get_attribute(attr)
if value:
link_info['attributes'][attr] = value
return links[0], link_info
buttons = container.find_elements(By.TAG_NAME, 'button')
if buttons:
return buttons[0], None
clickables = container.find_elements(By.CSS_SELECTOR, '[onclick], [role="button"]')
if clickables:
return clickables[0], None
except:
pass
return None, None
except:
return None, None
def click_element_by_index(driver, index, elements_info):
"""Click element by index using a single reliable method"""
if 0 <= index < len(elements_info):
try:
info = elements_info[index][0]
print(f"Clicking element: {format_element_info(elements_info[index])}")
if info['tagName'] in ['div', 'span', 'section', 'article']:
child_element, child_info = find_clickable_child(driver, info)
if child_element:
print(f" Found clickable child element: {child_info['tagName'] if child_info else 'unknown'}")
element = child_element
else:
xpath = info.get('xpath', '')
element = None
try:
if xpath:
element = driver.find_element(By.XPATH, xpath)
except Exception as e:
print(f" Warning: Could not find element by XPath: {e}")
return False
else:
xpath = info.get('xpath', '')
element = None
try:
if xpath:
element = driver.find_element(By.XPATH, xpath)
except Exception as e:
print(f" Warning: Could not find element by XPath: {e}")
return False
if element:
try:
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", element)
time.sleep(0.3) # Short wait for scroll
try:
actions = ActionChains(driver)
actions.move_to_element(element).pause(0.2)
actions.move_by_offset(random.randint(-3, 3), random.randint(-3, 3)).pause(0.1)
actions.click_and_hold().pause(0.1).release().perform()
print(" Click method: Enhanced ActionChains click sequence")
except Exception as e:
print(f" ActionChains click failed: {e}")
try:
ActionChains(driver).move_to_element(element).click().perform()
print(" Click method: Simple ActionChains click")
except Exception as e:
print(f" Simple ActionChains click failed: {e}")
driver.execute_script("arguments[0].click();", element)
print(" Also using JavaScript click for reliability")
time.sleep(0.5)
try:
wait_for_ajax(driver)
print("Loading complete")
except Exception as e:
print(f"Note: Error during page load wait: {e}")
print("Continuing anyway...")
return True
except Exception as e:
print(f" Error clicking element: {e}")
return False
else:
print(" No element found to click")
return False
except Exception as e:
print(f"Error clicking element: {e}")
return False
else:
print(f"Invalid element number. Please enter a number between 0 and {len(elements_info)-1}")
return False
def build_selector_from_info(info):
"""Build a CSS selector from element info"""
selectors = []
if 'id' in info['attributes']:
selectors.append(f"#{info['attributes']['id']}")
if 'class' in info['attributes']:
class_selector = "." + info['attributes']['class'].replace(' ', '.')
if len(class_selector) < 100: # Avoid overly complex selectors
selectors.append(f"{info['tagName']}{class_selector}")
if 'name' in info['attributes']:
selectors.append(f"{info['tagName']}[name='{info['attributes']['name']}']")
if 'role' in info['attributes'] and 'text' in info['attributes']:
text = info['attributes']['text']
if len(text) < 20: # Only use short text
selectors.append(f"{info['tagName']}[role='{info['attributes']['role']}']:contains('{text}')")
if selectors:
return selectors[0] # Use the first/best selector
return info['tagName']
def get_cleaned_html(driver):
"""
Get a cleaned version of the HTML that focuses on visible content and structure,
removing clutter like excessive attributes, empty elements, and script/style tags.
Args:
driver: Selenium WebDriver instance
Returns:
Cleaned HTML as a string
"""
try:
html = driver.execute_script("""
// Create a function to clean the HTML
function cleanHTML(node, depth = 0) {
// Skip invisible elements
if (node.nodeType === 1) { // Element node
const style = window.getComputedStyle(node);
if (style.display === 'none' || style.visibility === 'hidden' ||
(style.height === '0px' && style.overflow === 'hidden')) {
return '';
}
}
// Text node - return text content
if (node.nodeType === 3) { // Text node
const text = node.textContent.trim();
if (text) {
return ' '.repeat(depth) + text + '\\n';
}
return '';
}
// Skip script, style, link, meta, and other non-visible elements
if (node.nodeType !== 1 ||
['SCRIPT', 'STYLE', 'LINK', 'META', 'NOSCRIPT', 'TEMPLATE'].includes(node.tagName)) {
return '';
}
// Start building the element representation
let result = '';
// Only include elements that might have visible content
const tagName = node.tagName.toLowerCase();
// Get important attributes
const attrs = [];
if (node.id) attrs.push(`id="${node.id}"`);
if (node.className && typeof node.className === 'string') {
// Simplify class names (take only first 2 classes if there are many)
const classes = node.className.split(' ').filter(c => c.trim());
if (classes.length > 2) {
attrs.push(`class="${classes.slice(0, 2).join(' ')}..."`);
} else if (classes.length > 0) {
attrs.push(`class="${classes.join(' ')}"`);
}
}
// Add href for links
if (tagName === 'a' && node.href) {
const href = node.href.replace(window.location.origin, '');
if (href.length > 30) {
attrs.push(`href="${href.substring(0, 30)}..."`);
} else {
attrs.push(`href="${href}"`);
}
}
// Add type for inputs
if (tagName === 'input' && node.type) {
attrs.push(`type="${node.type}"`);
}
// Add placeholder for inputs
if ((tagName === 'input' || tagName === 'textarea') && node.placeholder) {
attrs.push(`placeholder="${node.placeholder}"`);
}
// Add role if present
if (node.getAttribute('role')) {
attrs.push(`role="${node.getAttribute('role')}"`);
}
// Start the element
const indent = ' '.repeat(depth);
result += indent + '<' + tagName;
if (attrs.length > 0) {
result += ' ' + attrs.join(' ');
}
// Check if element has children
const childNodes = Array.from(node.childNodes).filter(child => {
// Filter out empty text nodes
return !(child.nodeType === 3 && child.textContent.trim() === '');
});
if (childNodes.length === 0) {
// Self-closing tag
result += ' />\\n';
} else {
result += '>\\n';
// Process children
for (const child of childNodes) {
result += cleanHTML(child, depth + 1);
}
// Close tag
result += indent + '' + tagName + '>\\n';
}
return result;
}
// Start with the body element
return cleanHTML(document.body, 0);
""")
return html
except Exception as e:
return f"Error retrieving HTML: {e}"
def save_training_data(messages):
"""Save the conversation history to training_data.jsonl"""
TRAINING_DATA_FILE = "training_data.jsonl"
example = {
"messages": messages
}
if available_functions:
example["functions"] = available_functions
with open(TRAINING_DATA_FILE, "a") as f:
f.write(json.dumps(example) + "\n")
print(f"Added training example with {len(messages)} messages and {len(available_functions) if available_functions else 0} functions")
def scroll_page(driver, direction="down", amount=500):
"""
Scroll the page or a scrollable element within the viewport
Args:
driver: Selenium WebDriver instance
direction: Either "up" or "down"
amount: Number of pixels to scroll
Returns:
True if the scroll was successful, False otherwise
"""
try:
scroll_amount = amount if direction == "down" else -amount
scroll_script = """
function findScrollableElement() {
// First, check if any element has focus and is scrollable
const activeElement = document.activeElement;
if (activeElement && activeElement !== document.body && activeElement !== document.documentElement) {
const style = window.getComputedStyle(activeElement);
const overflowY = style.getPropertyValue('overflow-y');
if (['scroll', 'auto'].includes(overflowY) &&
activeElement.scrollHeight > activeElement.clientHeight) {
return activeElement;
}
}
// Next, look for modals or common scrollable containers currently visible
const modalSelectors = [
'.modal.show', '.modal-body', '.modal-content',
'[role="dialog"]', '[aria-modal="true"]',
'.popup', '.overlay', '.drawer',
'.scroll-container', '.overflow-y-auto', '.overflow-y-scroll',
'.v-dialog', '.MuiModal-root', '.ReactModal__Content'
];
for (const selector of modalSelectors) {
const elements = Array.from(document.querySelectorAll(selector));
for (const el of elements) {
if (el.offsetParent !== null) { // Check if visible
const style = window.getComputedStyle(el);
const overflowY = style.getPropertyValue('overflow-y');
if (['scroll', 'auto'].includes(overflowY) &&
el.scrollHeight > el.clientHeight) {
return el;
}
}
}
}
// Check for scrollable elements under the cursor (middle of viewport)
const viewportMiddleX = window.innerWidth / 2;
const viewportMiddleY = window.innerHeight / 2;
let element = document.elementFromPoint(viewportMiddleX, viewportMiddleY);
while (element && element !== document.body && element !== document.documentElement) {
const style = window.getComputedStyle(element);
const overflowY = style.getPropertyValue('overflow-y');
if (['scroll', 'auto'].includes(overflowY) &&
element.scrollHeight > element.clientHeight) {
return element;
}
element = element.parentElement;
}
// Finally, default to scrolling the whole page
return window;
}
const scrollable = findScrollableElement();
const amount = arguments[0];
if (scrollable === window) {
window.scrollBy(0, amount);
} else {
scrollable.scrollTop += amount;
}
return {
element: scrollable === window ? 'window' : scrollable.tagName,
scrollAmount: amount,
totalHeight: scrollable === window ? document.documentElement.scrollHeight : scrollable.scrollHeight,
visibleHeight: scrollable === window ? window.innerHeight : scrollable.clientHeight,
scrollPosition: scrollable === window ? window.scrollY : scrollable.scrollTop
};
"""
result = driver.execute_script(scroll_script, scroll_amount)
print(f"Scrolled {direction} by {abs(scroll_amount)} pixels")
print(f"Scrollable element: {result['element']}")
print(f"Current position: {result['scrollPosition']} / {result['totalHeight'] - result['visibleHeight']}")
time.sleep(0.3)
return True
except Exception as e:
print(f"Error scrolling {direction}: {e}")
return False
def get_human_assistant_response():
"""Get assistant response from human input, formatted exactly like the API response"""
print("\nNow simulate the assistant's response.")
print("Options:")
print("1. Text message (just type your message)")
print("2. Function call: click (e.g., 'click 5')")
print("3. Function call: type (e.g., 'type hello world')")
print("4. Function call: press enter")
print("5. Function call: navigate (e.g., 'navigate https://google.com')")
print("6. Function call: scroll up [amount] (e.g., 'scroll up 300')")
print("7. Function call: scroll down [amount] (e.g., 'scroll down')")
print("8. Function call: command_f_click (e.g., 'command_f_click Hello World')")
print("9. Function call: reload_page")
print("Type 'quit' to end the session and save the training data")
assistant_input = input("\nAssistant: ").strip()
if assistant_input.lower() == "quit":
return {"quit": True}
if re.match(r"^click\s+\d+", assistant_input, re.IGNORECASE):
button_index = int(re.search(r"\d+", assistant_input).group())
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": "click_button",
"arguments": json.dumps({"button_number": button_index})
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"message": f"Button {button_index} was successfully clicked.",
"data": {
"buttonIndex": button_index,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": "click_button",
"args": {"button_number": button_index}
}
elif assistant_input.lower().startswith("type "):
text = assistant_input[5:]
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": "type_text",
"arguments": json.dumps({"text": text})
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"message": f"Text '{text}' was successfully typed.",
"data": {
"text": text,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": "type_text",
"args": {"text": text}
}
elif assistant_input.lower() == "press enter":
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": "press_enter",
"arguments": "{}"
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"message": "Enter key was successfully pressed.",
"data": {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": "press_enter",
"args": {}
}
elif assistant_input.lower().startswith("navigate "):
url = assistant_input[9:].strip()
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": "navigate_to",
"arguments": json.dumps({"url": url})
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"message": f"Successfully navigated to {url}.",
"data": {
"url": url,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": "navigate_to",
"args": {"url": url}
}
elif assistant_input.lower().startswith("scroll up") or assistant_input.lower().startswith("scroll down"):
direction = "up" if assistant_input.lower().startswith("scroll up") else "down"
function_name = "scroll_up" if direction == "up" else "scroll_down"
amount = 500 # Default amount
match = re.search(r"\d+", assistant_input)
if match:
amount = int(match.group())
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": function_name,
"arguments": json.dumps({"amount": amount})
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": json.dumps({
"status": "success",
"message": f"Successfully scrolled {direction} by {amount} pixels.",
"data": {
"direction": direction,
"amount": amount,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": function_name,
"args": {"amount": amount}
}
elif assistant_input.lower().startswith("command_f_click "):
text = assistant_input[16:].strip()
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": "command_f_click",
"arguments": json.dumps({"text": text})
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": "" # Will be filled below
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": "command_f_click",
"args": {"text": text}
}
elif assistant_input.lower() == "reload_page":
tool_id = f"call_{int(time.time())}"
assistant_entry = {
"role": "assistant",
"content": [],
"tool_calls": [
{
"type": "function",
"id": tool_id,
"function": {
"name": "reload_page",
"arguments": "{}"
}
}
]
}
tool_response = {
"role": "tool",
"content": [
{
"type": "text",
"text": "" # Will be filled below
}
],
"tool_call_id": tool_id
}
return {
"entry": assistant_entry,
"tool_response": tool_response,
"function": "reload_page",
"args": {}
}
else:
assistant_entry = {
"role": "assistant",
"content": [
{
"type": "text",
"text": assistant_input
}
]
}
return {"entry": assistant_entry, "message": assistant_input}
def draw_bounding_boxes(screenshot_bytes, elements_info, driver):
"""
Draw colored bounding boxes around all interactable elements.
Args:
screenshot_bytes: Screenshot as bytes
elements_info: List of tuples containing element info and index
driver: Selenium WebDriver instance
Returns:
Modified screenshot as bytes and base64 string
"""
try:
image = Image.open(io.BytesIO(screenshot_bytes))
draw = ImageDraw.Draw(image)
device_pixel_ratio = driver.execute_script("return window.devicePixelRatio;") or 1
used_colors = {}
for element_info, index in elements_info:
xpath = element_info.get('xpath', '')
if not xpath:
continue
try:
element = driver.find_element(By.XPATH, xpath)
rect = driver.execute_script("""
const rect = arguments[0].getBoundingClientRect();
return {
x: rect.left,
y: rect.top,
width: rect.width,
height: rect.height
};
""", element)
x = int(rect['x'] * device_pixel_ratio)
y = int(rect['y'] * device_pixel_ratio)
width = int(rect['width'] * device_pixel_ratio)
height = int(rect['height'] * device_pixel_ratio)
element_id = str(index)
adjacent_colors = []
for other_id, color in used_colors.items():
if abs(int(other_id) - index) < 3:
adjacent_colors.append(color)
while True:
r = random.randint(100, 255)
g = random.randint(100, 255)
b = random.randint(100, 255)
color = (r, g, b)
is_distinct = True
for adj_color in adjacent_colors:
diff = sum(abs(color[i] - adj_color[i]) for i in range(3))
if diff < 150: # Color difference threshold
is_distinct = False
break
if is_distinct or not adjacent_colors:
break
used_colors[element_id] = color
rect_thickness = max(1, int(2 * device_pixel_ratio))
draw.rectangle([x, y, x + width, y + height], outline=color, width=rect_thickness)
font_size = max(10, int(12 * device_pixel_ratio))
try:
font = ImageFont.truetype("Arial", font_size)
except:
font = ImageFont.load_default()
text = str(index)
text_width, text_height = draw.textsize(text, font=font) if hasattr(draw, 'textsize') else (font_size * len(text), font_size)
text_x = x + 2
text_y = y + 2
draw.rectangle([text_x - 2, text_y - 2, text_x + text_width + 2, text_y + text_height + 2], fill=color)
draw.text((text_x, text_y), text, fill=(0, 0, 0), font=font)
except Exception as e:
print(f"Could not draw box for element {index}: {e}")
output_buffer = io.BytesIO()
image.save(output_buffer, format='PNG')
modified_screenshot_bytes = output_buffer.getvalue()
base64_screenshot = base64.b64encode(modified_screenshot_bytes).decode('utf-8')
return modified_screenshot_bytes, base64_screenshot
except Exception as e:
print(f"Error drawing bounding boxes: {e}")
return screenshot_bytes, base64.b64encode(screenshot_bytes).decode('utf-8')
def command_f_click(driver, text):
"""
Find text on the page and click on it, even if it's not identified as a clickable element.
Args:
driver: Selenium WebDriver instance
text: The text to search for
Returns:
Dictionary with status, debug_info, and error (if any)
"""
debug_info = []
try:
debug_info.append(f"Searching for text: '{text}'")
print(f"Searching for text: '{text}'")
script = """
function findElementsWithText(searchText) {
const searchTextLower = searchText.toLowerCase();
// First, try to find elements that contain exactly this text
const exactMatches = [];
// Then, find elements that contain this text as a substring
const partialMatches = [];
// Walk through the DOM and find text nodes
const walker = document.createTreeWalker(
document.body,
NodeFilter.SHOW_TEXT,
null,
false
);
while (walker.nextNode()) {
const node = walker.currentNode;
const nodeText = node.textContent.trim();
if (nodeText) {
const parent = node.parentElement;
// Skip hidden elements
if (!parent || !parent.offsetParent) continue;
const style = window.getComputedStyle(parent);
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
continue;
}
// Check for exact matches (case insensitive)
if (nodeText.toLowerCase() === searchTextLower) {
exactMatches.push({
element: parent,
text: nodeText,
isExact: true,
rect: parent.getBoundingClientRect()
});
}
// Check for partial matches
else if (nodeText.toLowerCase().includes(searchTextLower)) {
partialMatches.push({
element: parent,
text: nodeText,
isExact: false,
rect: parent.getBoundingClientRect()
});
}
}
}
// Combine matches, prioritizing exact matches
const allMatches = [...exactMatches, ...partialMatches];
// If no direct matches found, try to look for buttons, links, etc. with this text
if (allMatches.length === 0) {
// Look for elements with aria labels, titles, etc.
const potentialElements = Array.from(document.querySelectorAll(
'a, button, [role="button"], [role="link"], [onclick], [aria-label], [title]'
));
for (const el of potentialElements) {
// Check different attributes
const label = el.getAttribute('aria-label') || '';
const title = el.getAttribute('title') || '';
const innerText = el.innerText || '';
if (label.toLowerCase().includes(searchTextLower) ||
title.toLowerCase().includes(searchTextLower) ||
innerText.toLowerCase().includes(searchTextLower)) {
// Skip hidden elements
if (!el.offsetParent) continue;
const style = window.getComputedStyle(el);
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
continue;
}
const isExact = label.toLowerCase() === searchTextLower ||
title.toLowerCase() === searchTextLower ||
innerText.toLowerCase() === searchTextLower;
allMatches.push({
element: el,
text: innerText || label || title,
isExact: isExact,
rect: el.getBoundingClientRect()
});
}
}
}
// Filter visible elements (fully or partially in viewport)
const visibleMatches = allMatches.filter(match => {
const rect = match.rect;
return rect.width > 0 && rect.height > 0 &&
rect.top < window.innerHeight &&
rect.bottom > 0 &&
rect.left < window.innerWidth &&
rect.right > 0;
});
// Return organized results
return {
visibleExactMatches: visibleMatches.filter(m => m.isExact),
visiblePartialMatches: visibleMatches.filter(m => !m.isExact),
allExactMatches: allMatches.filter(m => m.isExact),
allPartialMatches: allMatches.filter(m => !m.isExact)
};
}
return findElementsWithText(arguments[0]);
"""
result = driver.execute_script(script, text)
matches = result.get('visibleExactMatches', [])
if not matches:
matches = result.get('visiblePartialMatches', [])
if not matches:
matches = result.get('allExactMatches', [])
if not matches:
matches = result.get('allPartialMatches', [])
if not matches:
print(f"No elements found containing text '{text}'")
return {
"success": False,
"debug_info": debug_info,
"error": f"No elements found containing text '{text}'"
}
match = driver.execute_script("return arguments[0].element;", matches[0])
match_text = driver.execute_script("return arguments[0].text;", matches[0])
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", match)
time.sleep(0.5) # Wait for scroll to complete
print(f"Found element with text: '{match_text}'")
print(f"Element tag: {match.tag_name}")
try:
actions = ActionChains(driver)
actions.move_to_element(match).pause(0.2).click().perform()
print("Clicked using ActionChains")
except Exception as e:
print(f"ActionChains click failed: {e}")
try:
match.click()
print("Clicked using WebElement.click()")
except Exception as e:
print(f"WebElement click failed: {e}")
driver.execute_script("arguments[0].click();", match)
print("Clicked using JavaScript")
time.sleep(0.5)
try:
wait_for_ajax(driver)
print("Loading complete")
except Exception as e:
print(f"Note: Error during page load wait: {e}")
print("Continuing anyway...")
return {
"success": True,
"debug_info": debug_info,
"clicked_text": match_text,
"method": "ActionChains" if isinstance(actions, ActionChains) else "WebElement.click()" if isinstance(match, webdriver.WebElement) else "JavaScript",
"data": {
"text": match_text,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}
except Exception as e:
error_msg = f"Error finding and clicking text '{text}': {str(e)}"
debug_info.append(error_msg)
print(error_msg)
return {
"success": False,
"debug_info": debug_info,
"error": error_msg
}
def navigate_to(driver, url):
"""
Navigate to a URL and ensure any new tab attempts are redirected.
Args:
driver: Selenium WebDriver instance
url: URL to navigate to
"""
try:
driver.get(url)
prevent_new_tabs(driver)
wait_for_ajax(driver)
return True
except Exception as e:
print(f"Error navigating to {url}: {e}")
return False
def reload_page(driver):
"""
Reload the current page and wait for it to load.
Args:
driver: Selenium WebDriver instance
Returns:
Dictionary with status and message
"""
try:
current_url = driver.current_url
print(f"Reloading page: {current_url}")
driver.refresh()
try:
wait_for_ajax(driver)
print("Page reload complete")
except Exception as e:
print(f"Note: Error during page load wait after reload: {e}")
print("Continuing anyway...")
prevent_new_tabs(driver)
return {
"success": True,
"message": f"Successfully reloaded the page: {current_url}",
"data": {
"url": current_url,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}
except Exception as e:
error_msg = f"Error reloading page: {str(e)}"
print(error_msg)
return {
"success": False,
"message": error_msg,
"data": {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}
def main():
print("Please start Chrome with remote debugging using this command:")
print("Windows: 'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe' --remote-debugging-port=9222")
print("Mac: '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome' --remote-debugging-port=9222")
print("Linux: 'google-chrome --remote-debugging-port=9222'")
input("Press Enter once you've started Chrome with remote debugging...")
if not os.path.exists("training_data.jsonl"):
open("training_data.jsonl", "w").close()
print("Created new training_data.jsonl file")
if not os.path.exists("screenshots"):
os.makedirs("screenshots")
print("Created screenshots directory")
print("\nWhat task would you like to perform? (This will be the initial user query)")
initial_query = input("User query: ")
if not initial_query.strip():
initial_query = "What's the weather in Tokyo?" # Default query
driver = setup_chrome_with_remote_debugging()
start_url = input("\nEnter starting URL (default: https://www.google.com): ")
if not start_url:
start_url = "https://www.google.com"
navigate_to(driver, start_url)
print("Initial page loaded.")
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [{"type": "text", "text": initial_query}]}
]
screenshot_count = 0
while True:
try:
elements_with_info = get_interactable_elements_fast(driver)
screenshot_bytes = driver.get_screenshot_as_png()
modified_screenshot_bytes, base64_screenshot = draw_bounding_boxes(screenshot_bytes, elements_with_info, driver)
screenshot_path = "screenshots/latest.png"
with open(screenshot_path, "wb") as f:
f.write(modified_screenshot_bytes)
print(f"Updated screenshot at {screenshot_path}")
screenshot_content = [
{"type": "text", "text": f"[SYSTEM] Page screenshot {screenshot_count}:"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_screenshot}", "detail": "low"}}
]
messages.append({"role": "user", "content": screenshot_content})
screenshot_count += 1
output = f"\n--- CURRENT URL: {driver.current_url} ---"
output += "\n\n--- INTERACTABLE ELEMENTS ---"
for element_info in elements_with_info:
output += "\n" + format_element_info(element_info)
active_form_info = get_active_form_element_info(driver)
if active_form_info:
output += "\n\n" + active_form_info
else:
output += "\n\nNo active form element found"
messages.append({"role": "system", "content": [{"type": "text", "text": output}]})
print(output)
print(f"\nScreenshot {screenshot_count-1} added to conversation")
assistant_response = get_human_assistant_response()
if "quit" in assistant_response:
break
messages.append(assistant_response["entry"])
if "function" in assistant_response:
function_name = assistant_response["function"]
print(f"\nCalled function: {function_name}")
if function_name == "click_button":
index = assistant_response["args"]["button_number"]
click_element_by_index(driver, index, elements_with_info)
elif function_name == "type_text":
text = assistant_response["args"]["text"]
try:
active_element = driver.switch_to.active_element
active_element.send_keys(text)
print(f"Typed: {text}")
except Exception as e:
print(f"Error typing text: {e}")
elif function_name == "press_enter":
try:
active_element = driver.switch_to.active_element
active_element.send_keys(Keys.ENTER)
print("Pressed Enter key")
wait_for_ajax(driver)
print("Loading complete")
except Exception as e:
print(f"Error pressing Enter key: {e}")
elif function_name == "navigate_to":
url = assistant_response["args"]["url"]
try:
print(f"Navigating to: {url}")
navigate_to(driver, url)
print("Loading complete")
except Exception as e:
print(f"Error navigating to {url}: {e}")
elif function_name == "scroll_up" or function_name == "scroll_down":
amount = assistant_response["args"].get("amount", 500)
direction = "up" if function_name == "scroll_up" else "down"
scroll_page(driver, direction, amount)
elif function_name == "command_f_click":
text = assistant_response["args"]["text"]
try:
print(f"Finding and clicking text: {text}")
result = command_f_click(driver, text)
debug_str = "\n".join(result.get("debug_info", []))
if result.get("success", False):
assistant_response["tool_response"]["content"][0]["text"] = json.dumps({
"status": "success",
"message": f"Successfully found and clicked text '{text}'.",
"clicked_text": result.get("clicked_text", text),
"method": result.get("method", "unknown"),
"debug_info": debug_str,
"data": {
"text": text,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
else:
assistant_response["tool_response"]["content"][0]["text"] = json.dumps({
"status": "error",
"message": f"Failed to find or click element with text '{text}'.",
"error": result.get("error", "Unknown error"),
"debug_info": debug_str,
"data": {
"text": text,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
except Exception as e:
print(f"Error finding and clicking text '{text}': {e}")
assistant_response["tool_response"]["content"][0]["text"] = json.dumps({
"status": "error",
"message": f"Failed to find or click element with text '{text}'.",
"error": str(e),
"data": {
"text": text,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
messages.append(assistant_response["tool_response"])
continue
elif function_name == "reload_page":
try:
print("Reloading page")
result = reload_page(driver)
if result.get("success", False):
assistant_response["tool_response"]["content"][0]["text"] = json.dumps({
"status": "success",
"message": "Successfully reloaded the page.",
"data": {
"url": driver.current_url,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
else:
assistant_response["tool_response"]["content"][0]["text"] = json.dumps({
"status": "error",
"message": "Failed to reload the page.",
"error": result.get("message", "Unknown error"),
"data": {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
except Exception as e:
print(f"Error reloading page: {e}")
assistant_response["tool_response"]["content"][0]["text"] = json.dumps({
"status": "error",
"message": "Failed to reload the page.",
"error": str(e),
"data": {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
}, indent=2)
messages.append(assistant_response["tool_response"])
continue
if "message" in assistant_response:
user_response = input("Enter additional user response (or press Enter to continue, 'quit' to end): ")
if user_response.strip().lower() == "quit":
print("User requested to quit. Ending session.")
break
elif user_response.strip():
messages.append({
"role": "user",
"content": [{"type": "text", "text": user_response}]
})
continue
except Exception as e:
print(f"An error occurred: {e}")
if input("Continue? (y/n): ").lower() != 'y':
break
print("Session ended. Saving training data...")
save_training_data(messages)
print("You can continue using the Chrome browser.")
if __name__ == "__main__":
main()
This makes sure the LLM is trained on realistic, high-quality examples of browser automation, matching the deployment environment.