Exploring Email Scraping with Python

Introduction: 

In today's digital age, email remains a pivotal communication tool for both personal and professional purposes. The ability to gather email addresses from various sources can be immensely valuable for marketing campaigns, networking, or research purposes. Python, with its rich ecosystem of libraries, offers powerful tools for web scraping, making it an ideal choice for extracting email addresses from web pages.

Scope: 

In this blog post, we'll delve into the process of scraping email addresses from web pages using Python. We'll walk through each step of the process, from fetching web pages to extracting email addresses. We'll use a combination of libraries such as requests​, BeautifulSoup​, requests_html​, colorama​ and re​ to accomplish our task. Additionally, we'll explore threading to parallelize the scraping process for better efficiency.

Requirements:

To follow along with the examples in this blog post, you'll need to have the following libraries installed in your Python environment. You can install them using pip and the provided requirements.txt file:

requirements.txt file

requests_html  
bs4
colorama
request
BeautifulSoup

 User python3 -m pip install -r requirements.txt​ to install the dependencies.


Explanation of Source Code:

The provided source code comprises several components:

  1. Crawler Class: Responsible for fetching web pages and extracting links recursively.
  2. EmailSpider Class: Extracts email addresses from web pages.
  3. Helper Functions: Includes functions to validate email addresses and URLs.
  4. Main Script: Combines the above components to initiate the crawling and email extraction process.

Step-by-Step Guide:

  1. Initialization: Import necessary libraries and initialize settings.
  2. Crawling Web Pages: The Crawler class fetches web pages and extracts links.
  3. Email Extraction: The EmailSpider class extracts email addresses from the fetched pages.
  4. Threading: Utilize threading to parallelize the crawling and email extraction process for improved efficiency.
  5. Output: Save extracted email addresses to a file.

Let's go through each line of the provided source code and explain its functionality:


import re  # Importing the regular expression module for pattern matching
import argparse  # Importing the argparse module for parsing command-line arguments
import threading  # Importing the threading module for parallel execution
from urllib.parse import urlparse, urljoin  # Importing functions for URL manipulation
from queue import Queue  # Importing the Queue class for managing tasks
import time  # Importing time module for delays and tracking execution time
import warnings  # Importing warnings module to suppress unnecessary warnings
warnings.filterwarnings("ignore")  # Ignoring warnings for cleaner output
import requests  # Importing the requests module for making HTTP requests
from bs4 import BeautifulSoup  # Importing BeautifulSoup for HTML parsing
import colorama  # Importing colorama for colored terminal output

colorama.init()  # Initializing colorama for colored terminal output
GREEN = colorama.Fore.GREEN  # Defining green color for terminal output
GRAY = colorama.Fore.LIGHTBLACK_EX  # Defining gray color for terminal output
RESET = colorama.Fore.RESET  # Defining reset color for terminal output
YELLOW = colorama.Fore.YELLOW  # Defining yellow color for terminal output
RED = colorama.Fore.RED  # Defining red color for terminal output

EMAIL_REGEX = r"""(?:[a-z0-9!#$%&'*+=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f]){2,12})\])"""
# Regular expression for matching email addresses

FORBIDDEN_TLDS = ["js", "css", "jpg", "png", "svg", "webp", "gz", "zip", "webm", "mp3",
                  "wav", "mp4", "gif", "tar", "gz", "rar", "gzip", "tgz"]
# List of forbidden top-level domains

FORBIDDEN_EXTENSIONS = ["js", "css", "jpg", "png", "svg", "webp", "gz", "zip", "webm", "mp3",
                        "wav", "mp4", "gif", "tar", "gz", "rar", "gzip", "tgz"]
# List of forbidden file extensions

print_lock = threading.Lock()  # Lock for thread-safe printing to console
file_lock = threading.Lock()  # Lock for thread-safe file operations


def is_valid_email_address(email):
    """Verify whether `email` is a valid email address"""
    for forbidden_tld in FORBIDDEN_TLDS:
        if email.endswith(forbidden_tld):
            return False  # Return False if email ends with forbidden TLDs
    if re.search(r"\..{1}$", email):
        return False  # Return False if TLD has a length of 1
    elif re.search(r"\..*\d+.*$", email):
        return False  # Return False if TLD contains numbers
    return True  # Return True otherwise


def is_valid_url(url):
    """Checks whether `url` is a valid URL."""
    parsed = urlparse(url)
    return bool(parsed.netloc) and bool(parsed.scheme)  # Return True if URL has both scheme and netloc


def is_text_url(url):
    """Returns False if the URL has forbidden extensions."""
    for extension in FORBIDDEN_EXTENSIONS:
        if url.endswith(extension):
            return False  # Return False if URL ends with forbidden extension
    return True  # Return True otherwise


class Crawler(threading.Thread):
    def __init__(self, first_url, delay, crawl_external_urls=False, max_crawl_urls=30):
        super().__init__()
        self.first_url = first_url  # Starting URL for crawling
        self.delay = delay  # Delay between requests
        self.crawl_external_urls = crawl_external_urls  # Flag to crawl external URLs
        self.max_crawl_urls = max_crawl_urls  # Maximum number of URLs to crawl
        self.visited_urls = {}  # Dictionary to store visited URLs and their HTML content
        self.domain_name = urlparse(self.first_url).netloc  # Domain name of the base URL
        self.internal_urls = set()  # Set to store internal URLs
        self.external_urls = set()  # Set to store external URLs
        self.urls_queue = Queue()  # Queue to manage URLs to crawl
        self.urls_queue.put(self.first_url)  # Add the starting URL to the queue
        self.total_urls_visited = 0  # Counter for total URLs visited

    def get_all_website_links(self, url):
        """Returns all URLs found on `url` belonging to the same website"""
        urls = set()
        res = requests.get(url, verify=False, timeout=10)  # Make HTTP request
        soup = BeautifulSoup(res.text, "html.parser")  # Parse HTML content
        self.visited_urls[url] = res.text  # Store visited URL and its HTML content
        for a_tag in soup.findAll("a"):  # Find all <a> tags in HTML
            href = a_tag.attrs.get("href")  # Get href attribute of <a> tag
            if href == "" or href is None:
                continue  # Skip if href is empty or None
            href = urljoin(url, href)  # Join URL if it's relative
            parsed_href = urlparse(href)  # Parse joined URL
            href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path  # Remove URL parameters
            if not is_valid_url(href):
                continue  # Skip if URL is not valid
            if href in self.internal_urls:
                continue  # Skip if URL is already in internal URLs set
            if self.domain_name not in href:
                if href not in self.external_urls:
                    self.external_urls.add(href)  # Add external URL to set
                if self.crawl_external_urls:
                    self.urls_queue.put(href)  # Add external URL to queue if crawling external URLs is enabled
                continue
            urls.add(href)  # Add internal URL to set
            self.urls_queue.put(href)  # Add internal URL to queue
            self.internal_urls.add(href)  # Add internal URL to internal URLs set
        return urls  # Return set of URLs

    def crawl(self, url):
        """Crawls a web page and extracts all links."""
        if not is_text_url(url):
            return  # Skip if URL is not a text file
        self.total_urls_visited += 1  # Increment total URLs visited counter
        with print_lock:
            print(f"{YELLOW}[*] Crawling: {url}{RESET}")  # Print crawling message
        links = self.get_all_website_links(url)  # Get all links from the URL
        for link in links:
            if self.total_urls_visited > self.max_crawl_urls:
                break  # Break if maximum URLs limit is reached
            self.crawl(link)  # Recursively crawl each link
        time.sleep(self.delay)  # Add delay between requests

    def run(self):
        self.crawl(self.first_url)  # Start crawling from the first URL


class EmailSpider:
    def __init__(self, crawler: Crawler, n_threads=20, output_file="extracted-emails.txt"):
        self.crawler = crawler  # Initialize Crawler instance
        self.extracted_emails = set()  # Set to store extracted email addresses
        self.n_threads = n_threads  # Number of threads for email extraction
        self.output_file = output_file  # Output file to store extracted email addresses

    def get_emails_from_url(self, url):
        """Extracts email addresses from the given URL."""
        if not is_text_url(url):
            return set()  # Return empty set if URL is not a text file
        if url not in self.crawler.visited_urls:
            try:
                r = requests.get(url, verify=False, timeout=10)  # Make HTTP request if URL is not visited
            except Exception as e:
                return set()  # Return empty set if an exception occurs
            else:
                text = r.text
        else:
            text = self.crawler.visited_urls[url]  # Use cached HTML content if URL is visited
        emails = set()  # Set to store extracted email addresses
        try:
            for re_match in re.finditer(EMAIL_REGEX, text):  # Find all email addresses using regex
                email = re_match.group()
                if is_valid_email_address(email):  # Check if email address is valid
                    emails.add(email)  # Add valid email address to set
        except Exception as e:
            return set()  # Return empty set if an exception occurs
        return emails  # Return set of email addresses

    def scan_urls(self):
        while True:
            url = self.crawler.urls_queue.get()  # Get URL from the queue
            emails = self.get_emails_from_url(url)  # Extract email addresses from the URL
            for email in emails:
                with print_lock:
                    print("[+] Got email:", email, "from url:", url)  # Print extracted email address
                if email not in self.extracted_emails:
                    with file_lock:
                        with open(self.output_file, "a") as f:
                            print(email, file=f)  # Write extracted email address to file
                    self.extracted_emails.add(email)  # Add email address to set
            self.crawler.urls_queue.task_done()  # Mark URL task as done

    def run(self):
        for t in range(self.n_threads):
            t = threading.Thread(target=self.scan_urls)  # Create thread for email extraction
            t.daemon = True  # Set thread as daemon
            t.start()  # Start thread
        self.crawler.urls_queue.join()  # Wait for all URL tasks to be done


def track_stats(crawler: Crawler):
    """Prints statistics about the crawler and active threads."""
    while is_running:
        with print_lock:
            print(f"{RED}[+] Queue size: {crawler.urls_queue.qsize()}{RESET}")  # Print queue size
            print(f"{GRAY}[+] Total Extracted External links: {len(crawler.external_urls)}{RESET}")  # Print total external links
            print(f"{GREEN}[+] Total Extracted Internal links: {len(crawler.internal_urls)}{RESET}")  # Print total internal links
            print(f"[*] Total threads running: {threading.active_count()}")  # Print total active threads
        time.sleep(5)  # Add delay for stats update


def start_stats_tracker(crawler: Crawler):
    """Starts the statistics tracker in a separate thread."""
    t = threading.Thread(target=track_stats, args=(crawler,))
    t.daemon = True
    t.start()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Advanced Email Spider")
    parser.add_argument("url", help="URL to start crawling from & extracting email addresses")
    parser.add_argument("-m", "--max-crawl-urls", help="The maximum number of URLs to crawl, default is 30.",
                        type=int, default=30)
    parser.add_argument("-t", "--num-threads", help="The number of threads that runs extracting emails"
                                                    "from individual pages. Default is 10",
                        type=int, default=10)
    parser.add_argument("--crawl-external-urls", help="Whether to crawl external URLs that the domain specified",
                        action="store_true")
    parser.add_argument("--crawl-delay", help="The crawl delay in seconds, useful for not overloading web servers",
                        type=float, default=0.01)
    args = parser.parse_args()  # Parse command-line arguments

    url = args.url
    is_running = True  # Flag to indicate whether the program is still running

    crawler = Crawler(url, max_crawl_urls=args.max_crawl_urls, delay=args.crawl_delay,
                      crawl_external_urls=args.crawl_external_urls)  # Initialize crawler
    crawler.start()  # Start crawling

    time.sleep(5)  # Wait for queue to fill up

    start_stats_tracker(crawler)  # Start statistics tracker

    email_spider = EmailSpider(crawler, n_threads=args.num_threads)  # Initialize email spider
    email_spider.run()  # Run email spider

    is_running = False  # Update flag to indicate program is no longer running


Checkout few screens here:





Conclusion:

Scraping email addresses from web pages using Python can be a powerful technique for various purposes. However, it's essential to respect website policies and adhere to legal and ethical guidelines while scraping. By following the steps outlined in this blog post and understanding the provided source code, you can efficiently gather email addresses from web pages using Python.


Hope you find this helpful!!! Do comment on this!

Exploring Email Scraping with Python
Ram Krishna April 28, 2024
Share this post
Our blogs
Sign in to leave a comment
Exploring Prisma and ORM in NestJS: Choosing the Right Tool for Your Project