What This Workflow Does
A complete email scraping automation workflow has several distinct stages. Each stage has a clear input and output, making the whole pipeline easy to debug, extend and maintain. Here is the full pipeline we will build in this tutorial:
Legal notice: Only scrape websites that permit it in their Terms of Service. Always respect robots.txt. In many jurisdictions collecting personal email addresses without consent may violate GDPR, CAN-SPAM or similar laws. This tutorial is for educational purposes — scrape only publicly listed business contact emails where you have a legitimate interest.
Step 1 — Environment Setup
We will use Python 3.10+ with four main libraries: requests for HTTP, BeautifulSoup4 for HTML parsing, re (built-in) for regex email extraction, and csv (built-in) for output. For advanced crawling we will also introduce Scrapy.
pip install requests beautifulsoup4 lxml scrapy fake-useragent
Create your project folder structure:
email-scraper/
├── scraper.py # Main scraping engine
├── validator.py # Email validation
├── pipeline.py # Full automation workflow
├── urls.txt # Input URL list
└── output/
└── emails.csv # Final output
Step 2 — Fetching Web Pages
The first task is reliably fetching HTML from target URLs. Websites block scrapers based on user-agent strings, request rate and IP address. We solve this with a rotating user-agent, randomized delays and optional proxy support.
import requests
import time
import random
from fake_useragent import UserAgent
ua = UserAgent()
HEADERS_BASE = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
def fetch_page(url: str, proxy: str = None, retries: int = 3) -> str | None:
"""Fetch a URL and return HTML content or None on failure."""
headers = {**HEADERS_BASE, "User-Agent": ua.random}
proxies = {"http": proxy, "https": proxy} if proxy else None
for attempt in range(retries):
try:
# Randomized delay between 1–3 seconds to avoid rate limiting
time.sleep(random.uniform(1.0, 3.0))
response = requests.get(
url,
headers=headers,
proxies=proxies,
timeout=15,
allow_redirects=True
)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt+1} failed for {url}: {e}")
time.sleep(2 ** attempt) # Exponential backoff
return None # All retries failed
Step 3 — Extracting Emails with Regex
Once we have the HTML, we extract emails using a regular expression. We also parse the page with BeautifulSoup to find mailto: links, which often contain emails that are obfuscated in the visible text but present in raw HTML attributes.
import re
from bs4 import BeautifulSoup
from urllib.parse import unquote
# Robust email regex pattern
EMAIL_REGEX = re.compile(
r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
re.IGNORECASE
)
# Common fake/placeholder emails to ignore
BLACKLIST_DOMAINS = {
"example.com", "test.com", "domain.com",
"email.com", "yourdomain.com", "sentry.io",
"sentry-next.wixpress.com", "wixpress.com"
}
def extract_emails(html: str, source_url: str) -> set:
"""Extract all unique emails from HTML content."""
emails = set()
# Method 1: Regex on raw HTML (catches obfuscated text too)
raw_matches = EMAIL_REGEX.findall(html)
emails.update(e.lower() for e in raw_matches)
# Method 2: BeautifulSoup mailto: links
soup = BeautifulSoup(html, "lxml")
for tag in soup.find_all("a", href=True):
href = unquote(tag["href"])
if href.startswith("mailto:"):
email = href.replace("mailto:", "").split("?")[0].strip().lower()
if EMAIL_REGEX.match(email):
emails.add(email)
# Filter out blacklisted domains and image/asset false positives
clean = set()
for email in emails:
domain = email.split("@")[1]
if domain not in BLACKLIST_DOMAINS:
# Skip emails that look like file paths or asset names
if not any(ext in email for ext in [".png", ".jpg", ".svg", ".css"]):
clean.add(email)
return clean
Pro tip: Many websites obfuscate emails by replacing @ with [at] or (at) to prevent scraping. Add a secondary regex: r"[a-zA-Z0-9._%+\-]+\s*[\[\(]at[\]\)]\s*[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}" and normalize these matches before adding them to your set.
Step 4 — Email Validation
Regex alone does not guarantee an email is real or deliverable. After extraction, we run a two-layer validation: format validation (is it a properly structured email?) and domain validation (does the domain have a valid MX record, meaning it can actually receive email?).
import re
import socket
import dns.resolver # pip install dnspython
EMAIL_RE = re.compile(
r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
)
# Cache MX results to avoid repeat DNS lookups
_mx_cache = {}
def has_mx_record(domain: str) -> bool:
"""Check if a domain has valid MX DNS records."""
if domain in _mx_cache:
return _mx_cache[domain]
try:
dns.resolver.resolve(domain, "MX")
_mx_cache[domain] = True
return True
except Exception:
_mx_cache[domain] = False
return False
def validate_email(email: str, check_mx: bool = True) -> dict:
"""
Validate an email address.
Returns dict with 'valid' bool and 'reason' string.
"""
email = email.strip().lower()
# Layer 1: Format check
if not EMAIL_RE.match(email):
return {"valid": False, "reason": "invalid_format"}
parts = email.split("@")
if len(parts) != 2:
return {"valid": False, "reason": "invalid_format"}
local, domain = parts
# Layer 2: Domain length sanity check
if len(domain) > 255 or len(local) > 64:
return {"valid": False, "reason": "too_long"}
# Layer 3: MX record check (optional, slower)
if check_mx and not has_mx_record(domain):
return {"valid": False, "reason": "no_mx_record"}
return {"valid": True, "reason": "ok"}
Step 5 — The Full Automation Pipeline
Now we wire everything together into a single pipeline.py script. It reads a list of URLs from a text file, crawls each one, extracts and validates emails, removes duplicates across all pages and writes the final clean list to a CSV file with source URL metadata.
import csv
import os
from datetime import datetime
from scraper import fetch_page
from scraper import extract_emails
from validator import validate_email
# ── CONFIG ──────────────────────────────────────────
INPUT_FILE = "urls.txt"
OUTPUT_DIR = "output"
OUTPUT_FILE = f"{OUTPUT_DIR}/emails_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
PROXY = None # Set to "http://user:pass@proxy:port" if needed
CHECK_MX = True # Set False to skip DNS checks (faster)
# ────────────────────────────────────────────────────
def run_pipeline():
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Load URLs
with open(INPUT_FILE) as f:
urls = [line.strip() for line in f if line.strip() and line.startswith("http")]
print(f"📋 Loaded {len(urls)} URLs")
seen_emails = set()
results = []
for i, url in enumerate(urls, 1):
print(f"[{i}/{len(urls)}] Crawling: {url}")
html = fetch_page(url, proxy=PROXY)
if not html:
print(f" ⚠️ Failed to fetch")
continue
emails = extract_emails(html, url)
print(f" 🔍 Found {len(emails)} raw emails")
for email in emails:
if email in seen_emails:
continue
seen_emails.add(email)
validation = validate_email(email, check_mx=CHECK_MX)
if validation["valid"]:
results.append({
"email": email,
"source_url": url,
"scraped_at": datetime.now().isoformat()
})
print(f" ✅ {email}")
else:
print(f" ❌ {email} ({validation['reason']})")
# Write CSV
with open(OUTPUT_FILE, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["email", "source_url", "scraped_at"])
writer.writeheader()
writer.writerows(results)
print(f"\n✅ Done! {len(results)} valid emails saved to {OUTPUT_FILE}")
if __name__ == "__main__":
run_pipeline()
Step 6 — Using Proxies to Avoid Blocks
When scraping at scale, websites will eventually block your IP. The solution is rotating proxies — each request goes through a different IP address. You can use a proxy service like Bright Data, Oxylabs, or Webshare, or build a simple proxy rotator using a list of free proxies (less reliable).
import random
PROXY_LIST = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
"http://user:pass@proxy3.example.com:8080",
# Add more proxies here
]
def get_random_proxy() -> str:
return random.choice(PROXY_LIST)
# Use in your fetch call:
# html = fetch_page(url, proxy=get_random_proxy())
Best proxy service for scraping: Webshare offers 10 free proxies to start and paid plans from $2.99/month. For high-volume work, Bright Data's residential proxies are the most reliable — they route through real home IP addresses making them nearly impossible to detect.
Step 7 — Scheduling the Workflow Automatically
Once your pipeline works manually, automate it with a cron job on your Linux server. This way the scraper runs on a schedule — daily, weekly or hourly — without you having to run it manually each time.
# Edit crontab with: crontab -e
# Run email scraper every day at 2:00 AM
0 2 * * * /usr/bin/python3 /home/user/email-scraper/pipeline.py >> /home/user/email-scraper/logs/scraper.log 2>&1
# Run every Monday at 8:00 AM (weekly scrape)
0 8 * * 1 /usr/bin/python3 /home/user/email-scraper/pipeline.py
Advanced: Crawling Multiple Pages Per Site
The basic pipeline only scrapes the given URLs. For deeper crawling — following links from a homepage to the contact page, about page and team page — we need a recursive crawler. Here is a lightweight spider that starts from a homepage and follows internal links up to a configurable depth.
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from scraper import fetch_page, extract_emails
def crawl_site(start_url: str, max_depth: int = 2, max_pages: int = 20) -> set:
"""Crawl a site starting from start_url, following internal links."""
base_domain = urlparse(start_url).netloc
visited = set()
to_visit = [(start_url, 0)] # (url, depth)
all_emails = set()
while to_visit and len(visited) < max_pages:
url, depth = to_visit.pop(0)
if url in visited:
continue
visited.add(url)
html = fetch_page(url)
if not html:
continue
# Extract emails from this page
emails = extract_emails(html, url)
all_emails.update(emails)
print(f" 📧 {url} → {len(emails)} emails")
# Find internal links to follow
if depth < max_depth:
soup = BeautifulSoup(html, "lxml")
for a in soup.find_all("a", href=True):
href = urljoin(url, a["href"])
parsed = urlparse(href)
# Only follow same-domain, http/https links
if (parsed.netloc == base_domain
and parsed.scheme in ("http", "https")
and href not in visited):
to_visit.append((href, depth + 1))
print(f" 🕷️ Crawled {len(visited)} pages, found {len(all_emails)} emails")
return all_emails
Step 8 — Export to CSV and Enrich Data
The final step is exporting your validated, deduplicated email list with enrichment metadata. A good output CSV should include not just the email address but also the source URL, domain, company name (extracted from domain) and scrape date — making it immediately useful for outreach campaigns or CRM import.
import csv
from datetime import datetime
from urllib.parse import urlparse
def domain_to_company(domain: str) -> str:
"""Best-effort company name from domain."""
name = domain.replace("www.", "").split(".")[0]
return name.replace("-", " ").title()
def export_csv(results: list, filepath: str):
fieldnames = ["email", "domain", "company", "source_url", "scraped_at"]
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in results:
email_domain = r["email"].split("@")[1]
writer.writerow({
"email": r["email"],
"domain": email_domain,
"company": domain_to_company(email_domain),
"source_url": r["source_url"],
"scraped_at": datetime.now().strftime("%Y-%m-%d %H:%M")
})
print(f"💾 Exported {len(results)} rows to {filepath}")
# Output preview:
# email,domain,company,source_url,scraped_at
# contact@acme.com,acme.com,Acme,https://acme.com/contact,2026-03-24 02:00
Pro Tips to Maximize Results
- Target contact pages first —
/contact,/about,/teamand/staffpages have the highest email density. Add these paths to your URL list directly. - Use
dnspythonMX caching — MX lookups are slow. Cache results per domain so you only look up each domain once regardless of how many emails share it. - Respect
robots.txt— Use theurllib.robotparsermodule to check if scraping is allowed before fetching a URL. - Add jitter to delays — Never use a fixed delay. Randomize between 0.5 and 4 seconds to mimic human browsing patterns.
- Run in threads for speed — Use
concurrent.futures.ThreadPoolExecutorwith a max of 3–5 workers for parallel fetching without hammering servers. - Store raw HTML locally — Cache fetched HTML to disk so you can re-run the extraction step without re-crawling when you update your regex.
- Validate with an API for bulk — For large lists (10k+ emails), use a dedicated verification API like ZeroBounce, NeverBounce or Hunter.io to check deliverability beyond MX records.