""" Test Contact Finder - Verify real person discovery works """ import asyncio import logging from services.enhanced_contact_finder import get_enhanced_contact_finder from services.prospect_discovery import get_prospect_discovery_service # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def test_enhanced_finder(): """Test the enhanced contact finder with real companies""" print("\n" + "="*80) print("TESTING ENHANCED CONTACT FINDER") print("="*80 + "\n") # Test companies test_cases = [ { "name": "Shopify", "domain": "shopify.com", "titles": ["CEO", "Chief Customer Officer", "VP Customer Experience"] }, { "name": "Stripe", "domain": "stripe.com", "titles": ["CEO", "Head of Customer Success", "VP Support"] }, { "name": "Airbnb", "domain": "airbnb.com", "titles": ["CEO", "Chief Customer Officer", "VP Community"] } ] finder = get_enhanced_contact_finder() for test in test_cases: print(f"\n{'-'*80}") print(f"Testing: {test['name']} ({test['domain']})") print(f"{'-'*80}\n") try: contacts = await finder.find_real_contacts( company_name=test['name'], domain=test['domain'], target_titles=test['titles'], max_contacts=3 ) if contacts: print(f"[OK] Found {len(contacts)} REAL contacts:\n") for i, contact in enumerate(contacts, 1): print(f" {i}. {contact.name}") print(f" Title: {contact.title}") print(f" Email: {contact.email}") print() else: print(f"[FAIL] No contacts found (will use fallback)\n") except Exception as e: print(f"[ERROR] {str(e)}\n") logger.error(f"Error testing {test['name']}: {e}") async def test_prospect_discovery(): """Test the full prospect discovery service""" print("\n" + "="*80) print("TESTING PROSPECT DISCOVERY SERVICE") print("="*80 + "\n") test_cases = [ { "name": "Zapier", "domain": "zapier.com", "size": 500 }, { "name": "Notion", "domain": "notion.so", "size": 200 } ] discovery = get_prospect_discovery_service() for test in test_cases: print(f"\n{'-'*80}") print(f"Testing: {test['name']} ({test['domain']}) - {test['size']} employees") print(f"{'-'*80}\n") try: contacts = await discovery.discover_contacts( company_name=test['name'], domain=test['domain'], company_size=test['size'], max_contacts=3, skip_search=False # Use real search ) if contacts: print(f"[OK] Found {len(contacts)} contacts:\n") for i, contact in enumerate(contacts, 1): print(f" {i}. {contact.name}") print(f" Title: {contact.title}") print(f" Email: {contact.email}") print() else: print(f"[FAIL] No contacts found\n") except Exception as e: print(f"[ERROR] {str(e)}\n") logger.error(f"Error testing {test['name']}: {e}") async def test_email_generation_with_contacts(): """Test that emails use contact names""" print("\n" + "="*80) print("TESTING EMAIL GENERATION WITH CONTACTS") print("="*80 + "\n") from app.schema import Prospect, Company, Contact from agents.writer import Writer from mcp.registry import MCPRegistry import uuid # Create a test prospect with real-looking contact prospect = Prospect( id=str(uuid.uuid4()), company=Company( id=str(uuid.uuid4()), name="Test E-commerce Co", domain="testecommerce.com", industry="E-commerce", size=150, pains=["High customer churn", "Poor support response times"], notes=["Growing fast, needs to scale support"] ), contacts=[ Contact( id=str(uuid.uuid4()), name="Sarah Johnson", email="sarah.johnson@testecommerce.com", title="VP Customer Experience", prospect_id="" ), Contact( id=str(uuid.uuid4()), name="Michael Chen", email="michael.chen@testecommerce.com", title="Director of Customer Success", prospect_id="" ) ] ) print(f"Company: {prospect.company.name}") print(f"Contacts:") for contact in prospect.contacts: print(f" - {contact.name} ({contact.title}) - {contact.email}") print() # Generate email registry = MCPRegistry() writer = Writer(registry) print("Generating personalized email...\n") try: result = await writer.run(prospect) if result.email_draft: print(f"[OK] Email Generated:\n") print(f"Subject: {result.email_draft.get('subject', 'N/A')}") print(f"\nBody:\n{result.email_draft.get('body', 'N/A')}") print() # Check if contact name is used body = result.email_draft.get('body', '') first_name = prospect.contacts[0].name.split()[0] if first_name in body: print(f"[OK] Contact first name '{first_name}' found in email!") else: print(f"[FAIL] Contact first name '{first_name}' NOT found in email!") print(f" This means personalization failed!") else: print(f"[FAIL] No email draft generated") except Exception as e: print(f"[ERROR] {str(e)}") logger.error(f"Error generating email: {e}") async def main(): """Run all tests""" print("\n[TEST] CONTACT FINDER TEST SUITE") print("=" * 80) # Test 1: Enhanced Contact Finder print("\n\n[TEST 1] Enhanced Contact Finder") await test_enhanced_finder() # Test 2: Prospect Discovery Service print("\n\n[TEST 2] Prospect Discovery Service") await test_prospect_discovery() # Test 3: Email Generation with Contacts print("\n\n[TEST 3] Email Generation with Contacts") await test_email_generation_with_contacts() print("\n\n" + "="*80) print("[DONE] ALL TESTS COMPLETE") print("="*80 + "\n") if __name__ == "__main__": asyncio.run(main())