94 lines
3.7 KiB
Python
94 lines
3.7 KiB
Python
|
||
import pandas as pd
|
||
import re
|
||
|
||
def analyze_leads():
|
||
input_csv = 'leads/leads.csv'
|
||
output_report = 'leads/analysis/red_flags.md'
|
||
|
||
df = pd.read_csv(input_csv)
|
||
|
||
# 1. Apply Specific Fixes
|
||
# Dachdeckerinnung Unterfranken -> info@dachdecker-unterfranken.de
|
||
mask = df['Firm/Innung'].str.contains('Dachdeckerinnung Unterfranken', case=False, na=False)
|
||
if mask.any():
|
||
print("Fixing Dachdeckerinnung Unterfranken email...")
|
||
df.loc[mask, 'Email'] = 'info@dachdecker-unterfranken.de'
|
||
|
||
# Save the fixes back to CSV
|
||
df.to_csv(input_csv, index=False)
|
||
|
||
# 2. Red Flag Analysis
|
||
red_flags = []
|
||
|
||
# Patterns
|
||
freemail_domains = ['t-online.de', 'web.de', 'gmx.de', 'gmail.com', 'hotmail.com', 'yahoo.de', 'aol.com', 'freenet.de', 'arcor.de']
|
||
kh_patterns = ['kh-', 'handwerk-', 'kreishandwerkerschaft', '-kh']
|
||
|
||
# Check for Duplicates
|
||
email_counts = df['Email'].value_counts()
|
||
duplicate_emails = email_counts[email_counts > 1]
|
||
|
||
if not duplicate_emails.empty:
|
||
red_flags.append("## 🚩 Duplicate Emails (Potential Central Administration)")
|
||
red_flags.append("| Email | Count | Innungen |")
|
||
red_flags.append("|---|---|---|")
|
||
for email, count in duplicate_emails.items():
|
||
innungen = df[df['Email'] == email]['Firm/Innung'].tolist()
|
||
innungen_str = "<br>".join(innungen[:3]) + ("..." if len(innungen) > 3 else "")
|
||
red_flags.append(f"| `{email}` | {count} | {innungen_str} |")
|
||
red_flags.append("")
|
||
|
||
# Check for Freemail Addresses
|
||
red_flags.append("## 🟡 Freemail Addresses (Check Professionality)")
|
||
red_flags.append("| Innung | Contact | Email |")
|
||
red_flags.append("|---|---|---|")
|
||
|
||
found_freemail = False
|
||
for idx, row in df.iterrows():
|
||
email = str(row['Email']).lower()
|
||
domain = email.split('@')[-1] if '@' in email else ''
|
||
|
||
if domain in freemail_domains:
|
||
red_flags.append(f"| {row['Firm/Innung']} | {row['Contact Person']} | `{email}` |")
|
||
found_freemail = True
|
||
|
||
if not found_freemail:
|
||
red_flags.append("No freemail addresses found.")
|
||
red_flags.append("")
|
||
|
||
# Check for Generic KH Domains vs Specific Innung Name
|
||
# Heuristic: If email has 'kh-' or 'handwerk' but Innung name is specific (like "Bäckerinnung")
|
||
# This might indicate the email goes to the KH office, not the Obermeister directly.
|
||
red_flags.append("## ℹ️ Kreishandwerkerschaft (KH) Generic Contacts")
|
||
red_flags.append("These emails likely reach the administrative office, not necessarily the specific trade representative directly.")
|
||
red_flags.append("| Innung | Email | Note |")
|
||
red_flags.append("|---|---|---|")
|
||
|
||
for idx, row in df.iterrows():
|
||
email = str(row['Email']).lower()
|
||
innung = str(row['Firm/Innung'])
|
||
|
||
is_kh_email = any(p in email for p in kh_patterns)
|
||
|
||
# If it's a specific guild but uses a generic KH email
|
||
if is_kh_email:
|
||
red_flags.append(f"| {innung} | `{email}` | Generic KH Domain |")
|
||
|
||
# Domain Mismatch (Simple)
|
||
# Check if the domain is totally unrelated to the Innung name
|
||
# Difficult to do reliably without extensive lists, but we can look for "shop", "portal", etc.
|
||
|
||
# Save Report
|
||
import os
|
||
os.makedirs(os.path.dirname(output_report), exist_ok=True)
|
||
|
||
with open(output_report, 'w', encoding='utf-8') as f:
|
||
f.write("# Lead Quality Audit & Red Flags\n\n")
|
||
f.write("\n".join(red_flags))
|
||
|
||
print(f"Report generated at {output_report}")
|
||
|
||
if __name__ == "__main__":
|
||
analyze_leads()
|