""" ๐Ÿค– Fagun Browser Automation Testing Agent - Browser Use Agent ============================================================= Advanced browser automation agent for comprehensive website testing. Author: Mejbaur Bahar Fagun Role: Software Engineer in Test LinkedIn: https://www.linkedin.com/in/mejbaur/ """ from __future__ import annotations import asyncio import logging import os # Disable telemetry os.environ["BROWSER_USE_TELEMETRY"] = "false" os.environ["BROWSER_USE_DISABLE_TELEMETRY"] = "true" # from lmnr.sdk.decorators import observe from browser_use.agent.gif import create_history_gif from browser_use.agent.service import Agent, AgentHookFunc from browser_use.agent.views import ( ActionResult, AgentHistory, AgentHistoryList, AgentStepInfo, ToolCallingMethod, ) from browser_use.browser.views import BrowserStateHistory from browser_use.utils import time_execution_async from dotenv import load_dotenv from browser_use.agent.message_manager.utils import is_model_without_tool_support from src.utils.screenshot_capture import screenshot_capture from src.utils.advanced_testing import advanced_testing_engine from src.utils.enhanced_ai_testing import enhanced_ai_testing_engine from src.utils.error_monitor import error_monitor from src.utils.intelligent_form_testing import IntelligentFormTester from src.utils.ai_thinking_engine import AIThinkingEngine from src.utils.credential_manager import CredentialManager from datetime import datetime import json load_dotenv() logger = logging.getLogger(__name__) SKIP_LLM_API_KEY_VERIFICATION = ( os.environ.get("SKIP_LLM_API_KEY_VERIFICATION", "false").lower()[0] in "ty1" ) class BrowserUseAgent(Agent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Initialize error monitor self.error_monitor = error_monitor # Initialize enhanced AI testing engine self.enhanced_ai_testing_engine = enhanced_ai_testing_engine # Initialize intelligent testing components self.intelligent_form_tester = None self.ai_thinking_engine = None self.credential_manager = None self.intelligent_testing_enabled = True def _set_tool_calling_method(self) -> ToolCallingMethod | None: tool_calling_method = self.settings.tool_calling_method if tool_calling_method == 'auto': if is_model_without_tool_support(self.model_name): return 'raw' elif self.chat_model_library == 'ChatGoogleGenerativeAI': return None elif self.chat_model_library == 'ChatOpenAI': return 'function_calling' elif self.chat_model_library == 'AzureChatOpenAI': return 'function_calling' else: return None else: return tool_calling_method @time_execution_async("--run (agent)") async def run( self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None, on_step_end: AgentHookFunc | None = None ) -> AgentHistoryList: """Execute the task with maximum number of steps""" loop = asyncio.get_event_loop() # Set up the Ctrl+C signal handler with callbacks specific to this agent from browser_use.utils import SignalHandler signal_handler = SignalHandler( loop=loop, pause_callback=self.pause, resume_callback=self.resume, custom_exit_callback=None, # No special cleanup needed on forced exit exit_on_second_int=True, ) signal_handler.register() try: self._log_agent_run() # Start error monitoring if hasattr(self, 'browser_context') and self.browser_context: try: # Use the correct method to get a page from browser context if hasattr(self.browser_context, 'new_page'): page = await self.browser_context.new_page() elif hasattr(self.browser_context, 'pages') and self.browser_context.pages: page = self.browser_context.pages[0] else: # Try to get page from browser state if hasattr(self.state, 'browser_state') and self.state.browser_state: page = self.state.browser_state.current_page else: page = None if page: await self.error_monitor.start_monitoring(page) logger.info("๐Ÿ” Error monitoring started") else: logger.warning("Could not get page for error monitoring") except Exception as e: logger.warning(f"Failed to start error monitoring: {e}") # Execute initial actions if provided if self.initial_actions: result = await self.multi_act(self.initial_actions, check_for_new_elements=False) self.state.last_result = result for step in range(max_steps): # Check if waiting for user input after Ctrl+C if self.state.paused: signal_handler.wait_for_resume() signal_handler.reset() # Check if we should stop due to too many failures if self.state.consecutive_failures >= self.settings.max_failures: logger.error(f'โŒ Stopping due to {self.settings.max_failures} consecutive failures') break # Check control flags before each step if self.state.stopped: logger.info('Agent stopped') break while self.state.paused: await asyncio.sleep(0.2) # Small delay to prevent CPU spinning if self.state.stopped: # Allow stopping while paused break if on_step_start is not None: await on_step_start(self) step_info = AgentStepInfo(step_number=step, max_steps=max_steps) await self.step(step_info) if on_step_end is not None: await on_step_end(self) if self.state.history.is_done(): if self.settings.validate_output and step < max_steps - 1: if not await self._validate_output(): continue await self.log_completion() break else: error_message = 'Failed to complete task in maximum steps' self.state.history.history.append( AgentHistory( model_output=None, result=[ActionResult(error=error_message, include_in_memory=True)], state=BrowserStateHistory( url='', title='', tabs=[], interacted_element=[], screenshot=None, ), metadata=None, ) ) logger.info(f'โŒ {error_message}') return self.state.history except KeyboardInterrupt: # Already handled by our signal handler, but catch any direct KeyboardInterrupt as well logger.info('Got KeyboardInterrupt during execution, returning current history') return self.state.history finally: # Unregister signal handlers before cleanup signal_handler.unregister() if self.settings.save_playwright_script_path: logger.info( f'Agent run finished. Attempting to save Playwright script to: {self.settings.save_playwright_script_path}' ) try: # Extract sensitive data keys if sensitive_data is provided keys = list(self.sensitive_data.keys()) if self.sensitive_data else None # Pass browser and context config to the saving method self.state.history.save_as_playwright_script( self.settings.save_playwright_script_path, sensitive_data_keys=keys, browser_config=self.browser.config, context_config=self.browser_context.config, ) except Exception as script_gen_err: # Log any error during script generation/saving logger.error(f'Failed to save Playwright script: {script_gen_err}', exc_info=True) await self.close() if self.settings.generate_gif: output_path: str = 'agent_history.gif' if isinstance(self.settings.generate_gif, str): output_path = self.settings.generate_gif create_history_gif(task=self.task, history=self.state.history, output_path=output_path) # PDF report generation disabled per project settings await self._generate_automated_pdf_report() # Stop error monitoring try: if hasattr(self, 'error_monitor') and self.error_monitor: await self.error_monitor.stop_monitoring() logger.info("๐Ÿ›‘ Error monitoring stopped") except Exception as e: logger.warning(f"Failed to stop error monitoring: {e}") async def initialize_intelligent_testing(self): """Initialize intelligent testing components.""" try: if not self.intelligent_testing_enabled: return logger.info("๐Ÿง  Initializing intelligent testing components...") # Initialize AI thinking engine self.ai_thinking_engine = AIThinkingEngine( llm=self.chat_model, page=self.browser_context.page ) # Initialize intelligent form tester self.intelligent_form_tester = IntelligentFormTester( llm=self.chat_model, page=self.browser_context.page ) # Initialize credential manager self.credential_manager = CredentialManager( page=self.browser_context.page ) logger.info("โœ… Intelligent testing components initialized") except Exception as e: logger.error(f"โŒ Error initializing intelligent testing: {e}") self.intelligent_testing_enabled = False async def run_intelligent_form_testing(self) -> Dict[str, Any]: """Run intelligent form testing with comprehensive scenarios.""" try: if not self.intelligent_testing_enabled or not self.intelligent_form_tester: logger.warning("โš ๏ธ Intelligent form testing not available") return {"error": "Intelligent form testing not available"} logger.info("๐Ÿš€ Starting intelligent form testing...") # Discover form fields form_fields = await self.intelligent_form_tester.discover_form_fields() if not form_fields: logger.warning("โš ๏ธ No form fields found on the page") return {"error": "No form fields found"} # Generate test scenarios test_scenarios = await self.intelligent_form_tester.generate_test_scenarios() # Execute test scenarios test_results = await self.intelligent_form_tester.execute_test_scenarios(test_scenarios) # Generate comprehensive report report = await self.intelligent_form_tester.generate_comprehensive_report() # Add detailed error report error_report = self.intelligent_form_tester.get_detailed_error_report() report["detailed_error_analysis"] = error_report logger.info(f"โœ… Intelligent form testing complete: {len(test_results)} tests executed") return report except Exception as e: logger.error(f"โŒ Error in intelligent form testing: {e}") return {"error": str(e)} async def run_intelligent_credential_testing(self) -> Dict[str, Any]: """Run intelligent credential testing with various scenarios.""" try: if not self.intelligent_testing_enabled or not self.credential_manager: logger.warning("โš ๏ธ Intelligent credential testing not available") return {"error": "Intelligent credential testing not available"} logger.info("๐Ÿ” Starting intelligent credential testing...") # Run comprehensive credential testing report = await self.credential_manager.run_comprehensive_credential_testing() logger.info("โœ… Intelligent credential testing complete") return report except Exception as e: logger.error(f"โŒ Error in intelligent credential testing: {e}") return {"error": str(e)} async def run_ai_thinking_analysis(self) -> Dict[str, Any]: """Run AI thinking analysis of the current page.""" try: if not self.intelligent_testing_enabled or not self.ai_thinking_engine: logger.warning("โš ๏ธ AI thinking analysis not available") return {"error": "AI thinking analysis not available"} logger.info("๐Ÿค” Starting AI thinking analysis...") # Analyze the page intelligently page_analysis = await self.ai_thinking_engine.analyze_page_intelligence() # Generate testing strategy testing_strategy = await self.ai_thinking_engine.generate_testing_strategy(page_analysis) # Get thinking summary thinking_summary = self.ai_thinking_engine.get_thinking_summary() analysis_result = { "page_analysis": page_analysis, "testing_strategy": { "approach": testing_strategy.approach, "priority_order": testing_strategy.priority_order, "focus_areas": testing_strategy.focus_areas, "risk_assessment": testing_strategy.risk_assessment, "estimated_duration": testing_strategy.estimated_duration, "reasoning": testing_strategy.reasoning }, "thinking_summary": thinking_summary } logger.info("โœ… AI thinking analysis complete") return analysis_result except Exception as e: logger.error(f"โŒ Error in AI thinking analysis: {e}") return {"error": str(e)} async def run_comprehensive_intelligent_testing(self) -> Dict[str, Any]: """Run comprehensive intelligent testing combining all features.""" try: if not self.intelligent_testing_enabled: logger.warning("โš ๏ธ Intelligent testing not enabled") return {"error": "Intelligent testing not enabled"} logger.info("๐ŸŽฏ Starting comprehensive intelligent testing...") # Initialize intelligent testing components await self.initialize_intelligent_testing() # Run AI thinking analysis ai_analysis = await self.run_ai_thinking_analysis() # Run intelligent form testing form_testing = await self.run_intelligent_form_testing() # Run intelligent credential testing credential_testing = await self.run_intelligent_credential_testing() # Combine all results comprehensive_result = { "ai_analysis": ai_analysis, "form_testing": form_testing, "credential_testing": credential_testing, "timestamp": datetime.now().isoformat(), "summary": { "ai_analysis_success": "error" not in ai_analysis, "form_testing_success": "error" not in form_testing, "credential_testing_success": "error" not in credential_testing } } logger.info("โœ… Comprehensive intelligent testing complete") return comprehensive_result except Exception as e: logger.error(f"โŒ Error in comprehensive intelligent testing: {e}") return {"error": str(e)} async def _generate_automated_pdf_report(self): """PDF report generation disabled.""" logger.info("๐Ÿ“Š PDF report generation is disabled. Skipping.") async def _prepare_test_data_for_report(self): """Prepare test data from execution history for PDF report.""" try: logger.info("๐Ÿ” Starting test data preparation...") # Debug: Log the history structure logger.info(f"History type: {type(self.state.history)}") # Check if state exists if not hasattr(self, 'state') or not self.state: logger.error("โŒ No state found in agent") raise Exception("No state found in agent") # Check if history exists if not hasattr(self.state, 'history') or not self.state.history: logger.error("โŒ No history found in state") raise Exception("No history found in state") # Access steps correctly from AgentHistoryList # AgentHistoryList is iterable, so we can convert it to a list steps = list(self.state.history) logger.info(f"Steps type: {type(steps)}") logger.info(f"Number of steps: {len(steps)}") # Debug: Log first few steps to understand structure if steps: logger.info(f"First step type: {type(steps[0])}") logger.info(f"First step attributes: {dir(steps[0])}") if hasattr(steps[0], 'action'): logger.info(f"First step action: {steps[0].action}") if hasattr(steps[0], 'result'): logger.info(f"First step result: {steps[0].result}") # Log all steps for debugging for i, step in enumerate(steps[:5]): # Log first 5 steps logger.info(f"Step {i+1}: {type(step)} - {getattr(step, 'action', 'No action')}") # Get execution statistics total_steps = len(steps) successful_steps = 0 failed_steps = 0 # Count successful and failed steps for step in steps: if hasattr(step, 'result') and step.result: if step.result.success: successful_steps += 1 else: failed_steps += 1 else: # If no result or success field, consider it successful successful_steps += 1 # Calculate success rate success_rate = (successful_steps / total_steps * 100) if total_steps > 0 else 0 logger.info(f"Total steps: {total_steps}, Successful: {successful_steps}, Failed: {failed_steps}, Success rate: {success_rate:.1f}%") # Get screenshots screenshots = screenshot_capture.get_screenshots() # Run advanced testing logger.info("๐Ÿงช Starting advanced testing...") advanced_test_results = [] enhanced_bugs = [] try: if hasattr(self, 'browser_context') and self.browser_context: logger.info("โœ… Browser context found") # Use the correct method to get a page from browser context page = None if hasattr(self.browser_context, 'new_page'): logger.info("๐Ÿ” Trying to create new page...") page = await self.browser_context.new_page() elif hasattr(self.browser_context, 'pages') and self.browser_context.pages: logger.info("๐Ÿ” Using existing page...") page = self.browser_context.pages[0] else: # Try to get page from browser state if hasattr(self.state, 'browser_state') and self.state.browser_state: logger.info("๐Ÿ” Using page from browser state...") page = self.state.browser_state.current_page if page: logger.info("โœ… Page found, running advanced testing...") # Run both advanced testing and enhanced AI testing advanced_test_results = await advanced_testing_engine.run_comprehensive_testing(page) logger.info(f"โœ… Advanced testing completed: {len(advanced_test_results)} results") enhanced_bugs = await self.enhanced_ai_testing_engine.run_comprehensive_testing(page) logger.info(f"โœ… Enhanced AI testing completed: {len(enhanced_bugs)} bugs found") # Get errors from injected script injected_errors = await self.error_monitor.get_injected_errors(page) self.error_monitor.errors.extend(injected_errors) logger.info(f"โœ… Error monitoring completed: {len(injected_errors)} errors") # Close page if we created it if hasattr(self.browser_context, 'new_page'): await page.close() else: logger.warning("โŒ Could not get page for advanced testing") else: logger.warning("โŒ No browser context found") except Exception as e: logger.warning(f"โŒ Advanced testing failed: {e}") import traceback logger.warning(f"Advanced testing traceback: {traceback.format_exc()}") # Get error monitoring data try: error_summary = self.error_monitor.get_error_summary() all_errors = self.error_monitor.get_all_errors() except Exception as e: logger.warning(f"Error getting error monitoring data: {e}") error_summary = { 'total_errors': 0, 'errors_by_type': {}, 'errors_by_severity': {}, 'console_errors': 0, 'js_errors': 0, 'network_errors': 0, 'dom_errors': 0, 'performance_issues': 0 } all_errors = [] # Prepare test cases from execution steps test_cases = [] for i, step in enumerate(steps, 1): # Extract action type safely action_type = "Unknown" if hasattr(step, 'action') and step.action: if hasattr(step.action, 'action_type'): action_type = step.action.action_type elif hasattr(step.action, '__class__'): action_type = step.action.__class__.__name__ # Extract result information safely result_text = "Completed" error_message = None is_success = True if hasattr(step, 'result') and step.result: is_success = getattr(step.result, 'success', True) if hasattr(step.result, 'result') and step.result.result: result_text = str(step.result.result) if hasattr(step.result, 'error_message') and step.result.error_message: error_message = str(step.result.error_message) result_text = f"Failed: {error_message}" # Extract duration safely duration = "N/A" if hasattr(step, 'duration') and step.duration: duration = f"{step.duration:.2f} seconds" test_case = { "name": f"Step {i}: {action_type}", "status": "PASSED" if is_success else "FAILED", "duration": duration, "description": f"Action: {action_type}", "expected_result": "Action should complete successfully", "actual_result": result_text, "error_message": error_message } test_cases.append(test_case) # Prepare bugs from failed steps and advanced testing bugs = [] # Add bugs from failed steps for step in steps: if hasattr(step, 'result') and step.result and not getattr(step.result, 'success', True): action_type = "Unknown" if hasattr(step, 'action') and step.action: if hasattr(step.action, 'action_type'): action_type = step.action.action_type elif hasattr(step.action, '__class__'): action_type = step.action.__class__.__name__ error_message = "Unknown error" if hasattr(step.result, 'error_message') and step.result.error_message: error_message = str(step.result.error_message) bug = { "title": f"Test Failure: {action_type}", "severity": "High" if "error" in error_message.lower() else "Medium", "status": "Open", "description": error_message, "steps_to_reproduce": f"1. Execute action: {action_type}\n2. Check for errors", "expected_behavior": "Action should complete successfully", "actual_behavior": f"Action failed with error: {error_message}" } bugs.append(bug) # Add bugs from advanced testing for test_result in advanced_test_results: if test_result.status in ["FAILED", "WARNING"]: bug = { "title": test_result.test_name, "severity": "High" if test_result.status == "FAILED" else "Medium", "status": "Open", "description": test_result.description, "steps_to_reproduce": f"1. Navigate to the page\n2. {test_result.description}", "expected_behavior": "No security vulnerabilities or issues should be present", "actual_behavior": test_result.description, "recommendations": test_result.recommendations } bugs.append(bug) # Add bugs from enhanced AI testing for bug_report in enhanced_bugs: bug = { "title": bug_report.title, "severity": bug_report.severity.value.title(), "status": "Open", "description": bug_report.description, "steps_to_reproduce": bug_report.steps_to_reproduce, "expected_behavior": bug_report.expected_behavior, "actual_behavior": bug_report.actual_behavior, "recommendations": bug_report.recommendations, "url": bug_report.url, "element_info": bug_report.element_info } bugs.append(bug) # Add bugs from error monitoring for error in all_errors: if error.severity in ["high", "critical"]: bug = { "title": f"Error Detected: {error.error_type}", "severity": "Critical" if error.severity == "critical" else "High", "status": "Open", "description": error.error_message, "steps_to_reproduce": f"1. Navigate to {error.url}\n2. Monitor console/network for errors", "expected_behavior": "No errors should occur during normal operation", "actual_behavior": error.error_message, "recommendations": [ "Check browser console for detailed error information", "Verify network connectivity and server status", "Review JavaScript code for potential issues", "Implement proper error handling" ], "error_context": error.context } bugs.append(bug) # Prepare performance metrics durations = [] for step in steps: if hasattr(step, 'duration') and step.duration: try: durations.append(float(step.duration)) except (ValueError, TypeError): pass avg_duration = sum(durations) / len(durations) if durations else 0 max_duration = max(durations) if durations else 0 min_duration = min(durations) if durations else 0 # Get task information task_description = "Unknown task" if hasattr(self, 'task') and self.task: task_description = str(self.task) # Create comprehensive test data test_data = { "duration": f"{total_steps * 2} minutes", # Estimated "total_tests": total_steps, "passed_tests": successful_steps, "failed_tests": failed_steps, "error_tests": 0, "success_rate": success_rate, "passed_percentage": (successful_steps / total_steps * 100) if total_steps > 0 else 0, "failed_percentage": (failed_steps / total_steps * 100) if total_steps > 0 else 0, "error_percentage": 0, "browser": "Chrome", # Default, could be extracted from browser config "browser_version": "119.0.6045.105", # Default "os": "Windows 10", # Default "framework": "Fagun Browser Automation Testing Agent", "execution_time": datetime.now().isoformat(), "test_data_source": "Automated Test Execution", "task_description": task_description, "performance_metrics": { "avg_response_time": f"{avg_duration:.2f} seconds", "max_response_time": f"{max_duration:.2f} seconds", "min_response_time": f"{min_duration:.2f} seconds" }, "test_cases": test_cases, "screenshots": screenshots, "bugs": bugs, "error_monitoring": { "total_errors": error_summary['total_errors'], "errors_by_type": error_summary['errors_by_type'], "errors_by_severity": error_summary['errors_by_severity'], "console_errors": error_summary['console_errors'], "js_errors": error_summary['js_errors'], "network_errors": error_summary['network_errors'], "dom_errors": error_summary['dom_errors'], "performance_issues": error_summary['performance_issues'], "detailed_errors": [ { "type": error.error_type, "message": error.error_message, "severity": error.severity, "timestamp": error.timestamp.isoformat(), "url": error.url, "source": error.source, "context": error.context } for error in all_errors ] }, "key_findings": [ f"Executed {total_steps} test steps successfully", f"Achieved {success_rate:.1f}% success rate", f"Captured {len(screenshots)} screenshots during testing", f"Identified {len(bugs)} issues requiring attention", f"Task completed: {task_description[:100]}{'...' if len(task_description) > 100 else ''}", f"Advanced testing: {len(advanced_test_results)} comprehensive tests performed", f"Enhanced AI testing: {len(enhanced_bugs)} bugs found by AI agents", f"Security tests: {len([r for r in advanced_test_results if r.test_type.value == 'security'])}", f"Broken URL checks: {len([r for r in advanced_test_results if r.test_type.value == 'broken_url'])}", f"Grammar checks: {len([r for r in advanced_test_results if r.test_type.value == 'grammar'])}", f"Form tests: {len([r for r in advanced_test_results if r.test_type.value == 'form_testing'])}", f"Error monitoring: {error_summary['total_errors']} errors detected", f"Console errors: {error_summary['console_errors']}", f"JavaScript errors: {error_summary['js_errors']}", f"Network errors: {error_summary['network_errors']}", f"Performance issues: {error_summary['performance_issues']}" ], "recommendations": [ "Review failed test steps for potential improvements", "Consider adding more error handling", "Implement retry mechanisms for flaky tests", "Regular monitoring of test execution performance" ], "notes": [ "This report was generated automatically by the Fagun Browser Automation Testing Agent.", "All test results are based on actual execution data.", "For questions or clarifications, please contact the test automation team." ] } logger.info(f"โœ… Test data preparation completed successfully!") logger.info(f"๐Ÿ“Š Final test data summary:") logger.info(f" โ€ข Total tests: {total_steps}") logger.info(f" โ€ข Passed tests: {successful_steps}") logger.info(f" โ€ข Failed tests: {failed_steps}") logger.info(f" โ€ข Success rate: {success_rate:.1f}%") logger.info(f" โ€ข Bugs found: {len(bugs)}") logger.info(f" โ€ข Test cases: {len(test_cases)}") logger.info(f" โ€ข Enhanced AI bugs: {len(enhanced_bugs)}") logger.info(f" โ€ข Advanced test results: {len(advanced_test_results)}") return test_data except Exception as e: logger.error(f"Error preparing test data for report: {str(e)}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") # Return minimal test data if preparation fails return { "duration": "Unknown", "total_tests": 0, "passed_tests": 0, "failed_tests": 0, "error_tests": 0, "success_rate": 0, "browser": "Unknown", "test_cases": [], "screenshots": [], "bugs": [], "key_findings": [f"Error preparing test data: {str(e)}"], "recommendations": ["Check test execution logs"], "notes": [f"Report generation encountered an error: {str(e)}"] }