import pdb import logging from dotenv import load_dotenv load_dotenv() import os import glob import asyncio import argparse # import os # Duplicate import removed logger = logging.getLogger(__name__) import gradio as gr from browser_use.agent.service import Agent # from playwright.async_api import async_playwright # Imported again later, keep one from browser_use.browser.browser import Browser, BrowserConfig from browser_use.browser.context import ( BrowserContextConfig as OrgBrowserContextConfig, # Renamed to avoid clash BrowserContextWindowSize, ) from langchain_ollama import ChatOllama from playwright.async_api import async_playwright # Keep this one from src.utils.agent_state import AgentState from src.utils import utils from src.agent.custom_agent import CustomAgent from src.browser.custom_browser import CustomBrowser from src.agent.custom_prompts import CustomSystemPrompt, CustomAgentMessagePrompt # Use the renamed original BrowserContextConfig for the custom context if it's different # If CustomBrowserContext expects the original, this is fine. # If it expects its own version, it should be named differently. # Assuming src.browser.custom_context.BrowserContextConfig is intentional and different. from src.browser.custom_context import BrowserContextConfig as CustomBrowserContextConfigInternal from src.browser.custom_context import CustomBrowserContext # Assuming this uses CustomBrowserContextConfigInternal from src.controller.custom_controller import CustomController from gradio.themes import Citrus, Default, Glass, Monochrome, Ocean, Origin, Soft, Base from src.utils.default_config_settings import default_config, load_config_from_file, save_config_to_file, save_current_config, update_ui_from_config from src.utils.utils import update_model_dropdown, get_latest_files, capture_screenshot # Import network diagnostic skills from network_diagnostic_skills import ( NetworkDiagnosticSkill, DNSLookupSkill, PortScannerSkill, NetworkInterfaceSkill, BandwidthTestSkill, PacketSnifferSkill, ARPScanSkill, TCPConnectionTestSkill, LatencyMonitorSkill, RouteTableSkill, ) # Global variables for persistence _global_browser = None _global_browser_context = None # Create the global agent state instance _global_agent_state = AgentState() # Instantiate network diagnostic skills network_diagnostic_tool = NetworkDiagnosticSkill() dns_lookup_tool = DNSLookupSkill() port_scanner_tool = PortScannerSkill() interface_info_tool = NetworkInterfaceSkill() bandwidth_test_tool = BandwidthTestSkill() packet_sniffer_tool = PacketSnifferSkill() arp_scan_tool = ARPScanSkill() tcp_test_tool = TCPConnectionTestSkill() latency_monitor_tool = LatencyMonitorSkill() route_table_tool = RouteTableSkill() # Define callback functions for network tools def ping_skill_webui(target_ip, packet_size, count, timeout): return network_diagnostic_tool.ping(target_ip, int(packet_size), int(count), int(timeout)) def traceroute_skill_webui(target_ip, max_hops, packet_size): return network_diagnostic_tool.traceroute(target_ip, int(max_hops), int(packet_size)) def dns_lookup_skill_webui(domain, record_type, dns_server): return dns_lookup_tool.lookup(domain, record_type, dns_server) def port_scan_skill_webui(target_ip, start_port, end_port): return port_scanner_tool.scan(target_ip, int(start_port), int(end_port)) def interface_info_skill_webui(): info = interface_info_tool.get_info() if isinstance(info, dict): output_str = "" for iface, details_list in info.items(): output_str += f"Interface: {iface}\n" for details in details_list: output_str += f" Family: {details.get('family', 'N/A')}\n" output_str += f" Address: {details.get('address', 'N/A')}\n" output_str += f" Netmask: {details.get('netmask', 'N/A')}\n" output_str += f" Broadcast: {details.get('broadcast', 'N/A')}\n" output_str += "\n" return output_str.strip() return str(info) def bandwidth_test_skill_webui(download_url, upload_url): return bandwidth_test_tool.test(download_url, upload_url) def packet_sniffer_skill_webui(filter_expr, count): return packet_sniffer_tool.sniff(filter_expr, int(count)) def arp_scan_skill_webui(ip_range): clients = arp_scan_tool.scan(ip_range) if isinstance(clients, list): if not clients: return "No devices found." output_str = "ARP Scan Results:\n" for client in clients: output_str += f" IP: {client.get('ip', 'N/A')}, MAC: {client.get('mac', 'N/A')}\n" return output_str.strip() return str(clients) def tcp_test_skill_webui(host, port, timeout): return tcp_test_tool.test(host, int(port), int(timeout)) def latency_monitor_skill_webui(target_ip, interval, duration): results = latency_monitor_tool.monitor(target_ip, int(interval), int(duration)) return "\n".join(results) def route_table_skill_webui(): routes = route_table_tool.get_routes() output = [] if routes and isinstance(routes[0], dict) and 'error' in routes[0]: return routes[0]['error'] if not routes or (isinstance(routes[0], dict) and routes[0].get('status')): return routes[0].get('status', "No routes found or failed to parse.") for route in routes: output.append(f"Destination: {route.get('destination', 'N/A')}") output.append(f" Netmask: {route.get('netmask', 'N/A')}") output.append(f" Gateway: {route.get('gateway', 'N/A')}") output.append(f" Interface: {route.get('interface', 'N/A')}") output.append("-" * 20) return "\n".join(output) async def stop_agent(): """Request the agent to stop and update UI with enhanced feedback""" global _global_agent_state try: _global_agent_state.request_stop() message = "Stop requested - the agent will halt at the next safe point" logger.info(f"🛑 {message}") return ( message, gr.update(value="Stopping...", interactive=False), gr.update(interactive=False), ) except Exception as e: error_msg = f"Error during stop: {str(e)}" logger.error(error_msg) return ( error_msg, gr.update(value="Stop", interactive=True), gr.update(interactive=True) ) async def stop_research_agent(): """Request the agent to stop and update UI with enhanced feedback for research agent""" global _global_agent_state try: _global_agent_state.request_stop() message = "Stop requested - the research agent will halt at the next safe point" logger.info(f"🛑 {message}") return ( gr.update(value="Stopping...", interactive=False), gr.update(interactive=False), ) except Exception as e: error_msg = f"Error during research stop: {str(e)}" logger.error(error_msg) return ( gr.update(value="Stop", interactive=True), gr.update(interactive=True) ) async def run_browser_agent( agent_type, # Can be "org", "custom", or "pen_test" llm_provider, llm_model_name, llm_temperature, llm_base_url, llm_api_key, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, enable_recording, task, # For "org" and "custom" agents add_infos, # For "custom" agent max_steps, use_vision, max_actions_per_step, tool_calling_method, # <<< BEGIN MODIFICATION: Add pen_test specific params >>> pen_test_target_url=None, pen_test_selected_tests=None # <<< END MODIFICATION >>> ): global _global_agent_state _global_agent_state.clear_stop() try: if not enable_recording and agent_type != "pen_test": # Pen test might not need recording by default save_recording_path = None if save_recording_path: os.makedirs(save_recording_path, exist_ok=True) if save_agent_history_path: os.makedirs(save_agent_history_path, exist_ok=True) if save_trace_path: os.makedirs(save_trace_path, exist_ok=True) existing_videos = set() if save_recording_path and agent_type != "pen_test": existing_videos = set( glob.glob(os.path.join(save_recording_path, "*.[mM][pP]4")) + glob.glob(os.path.join(save_recording_path, "*.[wW][eE][bB][mM]")) ) llm = utils.get_llm_model( provider=llm_provider, model_name=llm_model_name, temperature=llm_temperature, base_url=llm_base_url, api_key=llm_api_key, ) # <<< BEGIN MODIFICATION: Handle "pen_test" agent type >>> if agent_type == "pen_test": # For pen_test, headless is True by default, no recording/trace needed for the report # The 'task' for the LLM will be constructed within execute_web_pen_test final_result, errors, model_actions, model_thoughts, trace_file, history_file = await execute_web_pen_test( llm=llm, target_url=pen_test_target_url, selected_tests=pen_test_selected_tests, use_own_browser=use_own_browser, # Pass relevant browser settings keep_browser_open=keep_browser_open, disable_security=disable_security, window_w=window_w, window_h=window_h, agent_state=_global_agent_state # Pass agent state for potential stop requests ) latest_video = None # No video for pen test report # <<< END MODIFICATION >>> elif agent_type == "org": final_result, errors, model_actions, model_thoughts, trace_file, history_file = await run_org_agent( llm=llm, use_own_browser=use_own_browser, keep_browser_open=keep_browser_open, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, save_agent_history_path=save_agent_history_path, save_trace_path=save_trace_path, task=task, max_steps=max_steps, use_vision=use_vision, max_actions_per_step=max_actions_per_step, tool_calling_method=tool_calling_method ) elif agent_type == "custom": final_result, errors, model_actions, model_thoughts, trace_file, history_file = await run_custom_agent( llm=llm, use_own_browser=use_own_browser, keep_browser_open=keep_browser_open, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, save_agent_history_path=save_agent_history_path, save_trace_path=save_trace_path, task=task, add_infos=add_infos, max_steps=max_steps, use_vision=use_vision, max_actions_per_step=max_actions_per_step, tool_calling_method=tool_calling_method ) else: raise ValueError(f"Invalid agent type: {agent_type}") latest_video = None if save_recording_path and agent_type != "pen_test": all_videos = set( glob.glob(os.path.join(save_recording_path, "*.[mM][pP]4")) + glob.glob(os.path.join(save_recording_path, "*.[wW][eE][bB][mM]")) ) new_videos = all_videos - existing_videos if new_videos: latest_video = max(new_videos, key=os.path.getctime) return ( final_result, errors, model_actions, model_thoughts, latest_video, trace_file, history_file, gr.update(value="Stop", interactive=True), gr.update(interactive=True) ) except gr.Error: logger.error("A Gradio UI update error occurred.") raise except Exception as e: import traceback logger.error(f"Exception in run_browser_agent: {e}\n{traceback.format_exc()}") errors_msg = str(e) + "\n" + traceback.format_exc() return ( '', errors_msg, '', '', None, None, None, gr.update(value="Stop", interactive=True), gr.update(interactive=True) ) async def run_org_agent( llm, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, task, max_steps, use_vision, max_actions_per_step, tool_calling_method ): global _global_browser, _global_browser_context, _global_agent_state trace_file_path = None history_file_path = None try: _global_agent_state.clear_stop() extra_chromium_args = [f"--window-size={int(window_w)},{int(window_h)}"] if use_own_browser: chrome_path = os.getenv("CHROME_PATH") if chrome_path == "": chrome_path = None chrome_user_data = os.getenv("CHROME_USER_DATA") if chrome_user_data: extra_chromium_args.append(f"--user-data-dir={chrome_user_data}") else: chrome_path = None if _global_browser is None: _global_browser = Browser( config=BrowserConfig( headless=headless, disable_security=disable_security, chrome_instance_path=chrome_path, extra_chromium_args=extra_chromium_args, ) ) if _global_browser_context is None: _global_browser_context = await _global_browser.new_context( config=OrgBrowserContextConfig( trace_path=save_trace_path if save_trace_path else None, save_recording_path=save_recording_path if save_recording_path else None, no_viewport=False, browser_window_size=BrowserContextWindowSize( width=int(window_w), height=int(window_h) ), ) ) agent = Agent( task=task, llm=llm, use_vision=use_vision, browser=_global_browser, browser_context=_global_browser_context, max_actions_per_step=max_actions_per_step, tool_calling_method=tool_calling_method, agent_state=_global_agent_state ) history = await agent.run(max_steps=max_steps) if save_agent_history_path: os.makedirs(save_agent_history_path, exist_ok=True) history_file_path = os.path.join(save_agent_history_path, f"{agent.agent_id}.json") agent.save_history(history_file_path) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() trace_files_dict = get_latest_files(save_trace_path) trace_file_path = trace_files_dict.get('.zip') if trace_files_dict else None return final_result, errors, model_actions, model_thoughts, trace_file_path, history_file_path except Exception as e: import traceback logger.error(f"Exception in run_org_agent: {e}\n{traceback.format_exc()}") errors_msg = str(e) + "\n" + traceback.format_exc() return '', errors_msg, '', '', None, None finally: if not keep_browser_open: if _global_browser_context: await _global_browser_context.close() _global_browser_context = None if _global_browser: await _global_browser.close() _global_browser = None async def run_custom_agent( llm, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, task, add_infos, max_steps, use_vision, max_actions_per_step, tool_calling_method ): global _global_browser, _global_browser_context, _global_agent_state trace_file_path = None history_file_path = None try: _global_agent_state.clear_stop() extra_chromium_args = [f"--window-size={int(window_w)},{int(window_h)}"] if use_own_browser: chrome_path = os.getenv("CHROME_PATH") if chrome_path == "": chrome_path = None chrome_user_data = os.getenv("CHROME_USER_DATA") if chrome_user_data: extra_chromium_args.append(f"--user-data-dir={chrome_user_data}") else: chrome_path = None controller = CustomController() if _global_browser is None: _global_browser = CustomBrowser( config=BrowserConfig( headless=headless, disable_security=disable_security, chrome_instance_path=chrome_path, extra_chromium_args=extra_chromium_args, ) ) if _global_browser_context is None: _global_browser_context = await _global_browser.new_context( config=CustomBrowserContextConfigInternal( trace_path=save_trace_path if save_trace_path else None, save_recording_path=save_recording_path if save_recording_path else None, no_viewport=False, browser_window_size=BrowserContextWindowSize( width=int(window_w), height=int(window_h) ), ) ) agent = CustomAgent( task=task, add_infos=add_infos, use_vision=use_vision, llm=llm, browser=_global_browser, browser_context=_global_browser_context, controller=controller, system_prompt_class=CustomSystemPrompt, agent_prompt_class=CustomAgentMessagePrompt, max_actions_per_step=max_actions_per_step, agent_state=_global_agent_state, tool_calling_method=tool_calling_method ) history = await agent.run(max_steps=max_steps) if save_agent_history_path: os.makedirs(save_agent_history_path, exist_ok=True) history_file_path = os.path.join(save_agent_history_path, f"{agent.agent_id}.json") agent.save_history(history_file_path) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() trace_files_dict = get_latest_files(save_trace_path) trace_file_path = trace_files_dict.get('.zip') if trace_files_dict else None return final_result, errors, model_actions, model_thoughts, trace_file_path, history_file_path except Exception as e: import traceback logger.error(f"Exception in run_custom_agent: {e}\n{traceback.format_exc()}") errors_msg = str(e) + "\n" + traceback.format_exc() return '', errors_msg, '', '', None, None finally: if not keep_browser_open: if _global_browser_context: await _global_browser_context.close() _global_browser_context = None if _global_browser: await _global_browser.close() _global_browser = None # <<< BEGIN MODIFICATION: Web Penetration Testing Agent Logic >>> async def execute_web_pen_test( llm, target_url, selected_tests, use_own_browser, # To be consistent with other agents for browser setup keep_browser_open, disable_security, window_w, window_h, agent_state # For stop requests ): global _global_browser, _global_browser_context # This pen_test agent will run headlessly by default for analysis # It will manage its own browser instance or reuse global if keep_browser_open is True # and an existing compatible one is available. current_browser = None current_context = None is_new_browser_instance = False is_new_context_instance = False # Construct the detailed task for the LLM based on selected tests # This prompt needs to be carefully crafted for safety and effectiveness prompt_parts = [ f"You are a web penetration testing assistant. Your goal is to analyze the website at {target_url} for potential vulnerabilities based on the selected tests. ", "You will use the provided browser to navigate and inspect the website. Adhere strictly to the test descriptions. Do NOT attempt to exploit vulnerabilities or perform disruptive actions. Your analysis should be descriptive.", "Report your findings clearly for each selected test.\n" ] if not selected_tests: return "No tests selected. Please choose at least one penetration testing category.", "", "", "", None, None if "Information Gathering" in selected_tests: prompt_parts.append( "\n--- Information Gathering ---\n" "1. Navigate to the target URL.\n" "2. Inspect the HTML source for comments, developer information, or clues about the technology stack.\n" "3. Check for `robots.txt` and `sitemap.xml` and summarize their content if found.\n" "4. Identify server type and any prominent technologies used (e.g., CMS, JavaScript libraries) based on headers and page content.\n" "5. Report your findings." ) if "Security Headers" in selected_tests: prompt_parts.append( "\n--- Security Header Check ---\n" "1. Fetch the main page of the target URL.\n" "2. Inspect the HTTP response headers.\n" "3. Check for the presence and configuration of: Content-Security-Policy (CSP), Strict-Transport-Security (HSTS), X-Frame-Options, X-Content-Type-Options, Referrer-Policy, Permissions-Policy.\n" "4. Report on which headers are present, their values, any missing recommended headers, or potential misconfigurations." ) if "XSS (Analyze & Describe)" in selected_tests: prompt_parts.append( "\n--- XSS Analysis (Describe Test Methodology) ---\n" "1. Identify potential input vectors on the main page (e.g., URL parameters, form fields accessible without login).\n" "2. For each vector, describe how you would test for reflected and stored XSS using a benign, non-breaking test string (e.g., 'XssTestString123').\n" "3. Explain what you would look for in the HTTP response or page content to indicate a potential XSS vulnerability (e.g., the test string being rendered unescaped).\n" "4. Do NOT attempt to inject actual `