# file: agents/writer.py import json import re import logging from typing import AsyncGenerator from app.schema import Prospect from app.config import MODEL_NAME, HF_API_TOKEN, MODEL_NAME_FALLBACK from app.logging_utils import log_event from vector.retriever import Retriever from huggingface_hub import AsyncInferenceClient logger = logging.getLogger(__name__) class Writer: """Generates outreach content with HuggingFace Inference API streaming""" def __init__(self, mcp_registry): self.mcp = mcp_registry self.store = mcp_registry.get_store_client() self.retriever = Retriever() # Initialize HF client self.hf_client = AsyncInferenceClient(token=HF_API_TOKEN if HF_API_TOKEN else None) async def run_streaming(self, prospect: Prospect) -> AsyncGenerator[dict, None]: """Generate content with streaming tokens""" # IMPORTANT: Log contact information for debugging if prospect.contacts: for contact in prospect.contacts: log_event("writer", f"Using contact: {contact.name} ({contact.title}) - {contact.email}", "agent_log") logger.info(f"Writer: Using contact: {contact.name} ({contact.title}) - {contact.email}") else: log_event("writer", "WARNING: No contacts found for this prospect!", "agent_log") logger.warning(f"Writer: No contacts found for prospect {prospect.company.name}") # Get relevant facts from vector store try: relevant_facts = self.retriever.retrieve(prospect.company.id, k=5) except: relevant_facts = [] # Build comprehensive context context = f""" COMPANY PROFILE: Name: {prospect.company.name} Industry: {prospect.company.industry} Size: {prospect.company.size} employees Domain: {prospect.company.domain} KEY CHALLENGES: {chr(10).join(f'• {pain}' for pain in prospect.company.pains)} BUSINESS CONTEXT: {chr(10).join(f'• {note}' for note in prospect.company.notes) if prospect.company.notes else '• No additional notes'} RELEVANT INSIGHTS: {chr(10).join(f'• {fact["text"]} (confidence: {fact.get("score", 0.7):.2f})' for fact in relevant_facts[:3]) if relevant_facts else '• Industry best practices suggest focusing on customer experience improvements'} """ # Generate comprehensive summary first summary_prompt = f"""{context} Generate a comprehensive bullet-point summary for {prospect.company.name} that includes: 1. Company overview (industry, size) 2. Main challenges they face 3. Specific opportunities for improvement 4. Recommended actions Format: Use 5-7 bullets, each starting with "•". Be specific and actionable. Include the industry and size context in your summary.""" summary_text = "" # Emit company header first yield log_event("writer", f"Generating content for {prospect.company.name}", "company_start", {"company": prospect.company.name, "industry": prospect.company.industry, "size": prospect.company.size}) # Summary generation with HF Inference API try: # Use text generation with streaming stream = await self.hf_client.text_generation( summary_prompt, model=MODEL_NAME, max_new_tokens=500, temperature=0.7, stream=True ) async for token in stream: summary_text += token yield log_event( "writer", token, "llm_token", { "type": "summary", "token": token, "prospect_id": prospect.id, "company_id": prospect.company.id, "company_name": prospect.company.name, }, ) except Exception as e: # Fallback summary if generation fails summary_text = f"""• {prospect.company.name} is a {prospect.company.industry} company with {prospect.company.size} employees • Main challenge: {prospect.company.pains[0] if prospect.company.pains else 'Customer experience improvement'} • Opportunity: Implement modern CX solutions to improve customer satisfaction • Recommended action: Schedule a consultation to discuss specific needs""" yield log_event("writer", f"Summary generation failed, using default: {e}", "llm_error") # Generate personalized email # If we have a contact, instruct the greeting explicitly with name and title greeting_hint = "" contact_context = "" if prospect.contacts: contact = prospect.contacts[0] first_name = (contact.name or "").split()[0] full_name = contact.name title = contact.title if first_name: greeting_hint = f"IMPORTANT: Start the email EXACTLY with this greeting: 'Hi {first_name},'\n" contact_context = f"\nTARGET RECIPIENT:\nName: {full_name}\nTitle: {title}\nEmail: {contact.email}\n" email_prompt = f"""{context} {contact_context} Company Summary: {summary_text} Write a highly personalized outreach email from a CX AI platform provider to {prospect.contacts[0].name if prospect.contacts else 'leaders'} at {prospect.company.name}. {greeting_hint} Requirements: - Subject line that mentions their company name and industry - Body: 150-180 words, professional and friendly - Reference their specific industry ({prospect.company.industry}) and size ({prospect.company.size} employees) - Address them by their first name in the greeting (e.g., "Hi {prospect.contacts[0].name.split()[0] if prospect.contacts else 'there'},") - Acknowledge their role as {prospect.contacts[0].title if prospect.contacts else 'a leader'} in the organization - Clearly connect their challenges to AI-powered customer experience solutions - One clear call-to-action to schedule a short conversation or demo next week - Do not write as if the email is from the company to us - No exaggerated claims - Sign off as: "The CX Team" Format response exactly as: Subject: [subject line] Body: [email body] """ email_text = "" # Emit email generation start yield log_event("writer", f"Generating email for {prospect.company.name}", "email_start", {"company": prospect.company.name}) # Email generation with HF Inference API try: stream = await self.hf_client.text_generation( email_prompt, model=MODEL_NAME, max_new_tokens=400, temperature=0.7, stream=True ) async for token in stream: email_text += token yield log_event( "writer", token, "llm_token", { "type": "email", "token": token, "prospect_id": prospect.id, "company_id": prospect.company.id, "company_name": prospect.company.name, }, ) except Exception as e: # Fallback email if generation fails - use contact name if available contact_greeting = "Hi there," if prospect.contacts: first_name = prospect.contacts[0].name.split()[0] if prospect.contacts[0].name else "there" contact_greeting = f"Hi {first_name}," email_text = f"""Subject: Improve {prospect.company.name}'s Customer Experience Body: {contact_greeting} As a {prospect.company.industry} company with {prospect.company.size} employees, you face unique customer experience challenges. We understand that {prospect.company.pains[0] if prospect.company.pains else 'improving customer satisfaction'} is a priority for your organization. Our AI-powered platform has helped similar companies in the {prospect.company.industry} industry improve their customer experience metrics significantly. We'd love to discuss how we can help {prospect.company.name} achieve similar results. Would you be available for a brief call next week to explore how we can address your specific needs? Best regards, The CX Team""" yield log_event("writer", f"Email generation failed, using default: {e}", "llm_error") # Parse email email_parts = {"subject": "", "body": ""} if "Subject:" in email_text and "Body:" in email_text: parts = email_text.split("Body:") email_parts["subject"] = parts[0].replace("Subject:", "").strip() email_parts["body"] = parts[1].strip() else: # Fallback with company details - personalize with contact name contact_greeting = "Hi there," if prospect.contacts: first_name = prospect.contacts[0].name.split()[0] if prospect.contacts[0].name else "there" contact_greeting = f"Hi {first_name}," email_parts["subject"] = f"Transform {prospect.company.name}'s Customer Experience" email_parts["body"] = email_text or f"""{contact_greeting} As a leading {prospect.company.industry} company with {prospect.company.size} employees, we know you're focused on delivering exceptional customer experiences. We'd like to discuss how our AI-powered platform can help address your specific challenges and improve your customer satisfaction metrics. Best regards, The CX Team""" # Replace any placeholder tokens like [Team Name] with actual contact name if available if prospect.contacts: contact_name = prospect.contacts[0].name if email_parts.get("subject"): email_parts["subject"] = re.sub(r"\[[^\]]+\]", contact_name, email_parts["subject"]) if email_parts.get("body"): email_parts["body"] = re.sub(r"\[[^\]]+\]", contact_name, email_parts["body"]) # Update prospect prospect.summary = f"**{prospect.company.name} ({prospect.company.industry}, {prospect.company.size} employees)**\n\n{summary_text}" prospect.email_draft = email_parts prospect.status = "drafted" await self.store.save_prospect(prospect) # Emit completion event with company info yield log_event( "writer", f"Generation complete for {prospect.company.name}", "llm_done", { "prospect": prospect, "summary": prospect.summary, "email": email_parts, "company_name": prospect.company.name, "prospect_id": prospect.id, "company_id": prospect.company.id, }, ) async def run(self, prospect: Prospect) -> Prospect: """Non-streaming version for compatibility""" async for event in self.run_streaming(prospect): if event["type"] == "llm_done": return event["payload"]["prospect"] return prospect