ZyVOP Logo
Content That Connects
SeriesCategoriesTags
ZyVOP Logo
Content That Connects

Empowering developers and creators with cutting-edge insights, comprehensive tutorials, and innovative solutions for the digital future.

Content

  • Tags
  • Write Article
  • Newsletter

Company

  • About Us
  • Contact

Connect

  • Privacy Policy
  • Terms of Service
  • Cookie Policy
  • DMCA Policy
  • Code of Conduct

© 2026 ZyVOP. Crafted with care for the developer community.

Made with ❤️ by the ZyVOP team
All systems operational
HomeScrapingPython Scraping for Machine Learning: Building Clean Training Datasets (2026)
Scraping

Python Scraping for Machine Learning: Building Clean Training Datasets (2026)

A practical guide to building ML datasets from web data, including text classification, sentiment analysis, image collection, NER, and data quality pipelines.

#web scraping nlp dataset python#scrape image dataset python#text classification dataset scraping python#python data collection machine learning pipeline
Z
ZyVOP

Senior Developer

June 13, 2026
14 min read
1 views
Python Scraping for Machine Learning: Building Clean Training Datasets (2026)

The Data Problem Behind Every ML Model

Every machine learning model is only as good as the data it was trained on. This is not a cliché — it is the single most important constraint in applied ML in 2026. State-of-the-art architectures are abundant and largely commoditised. Clean, domain-specific, carefully labelled training data is still scarce, expensive, and enormously valuable.

The three most common ways to build ML training datasets are:

  1. Buy it — licensed datasets from providers like Scale AI, Hugging Face, or industry-specific vendors

  2. Crowdsource it — Amazon Mechanical Turk, Appen, or internal labelling teams

  3. Scrape it — collect raw data from the web, clean it, and transform it into labelled examples

Option 3 is almost always the cheapest, fastest, and most flexible. It's how most of the internet-scale models you use daily were built. Wikipedia, Common Crawl, Reddit, news archives, GitHub, Stack Overflow, product reviews — these are the raw materials that power modern language models.

This guide shows you how to build four types of ML datasets using Python web scraping:

  • Text classification — news articles labelled by topic/category

  • Sentiment analysis — product reviews with star-rating labels

  • Named entity recognition (NER) — structured text with entities for annotation

  • Image datasets — scraped and cleaned image collections with labels


Part 1: Building a Text Classification Dataset from News Sites

Text classification — assigning a category label to a piece of text — is one of the most common NLP tasks. To train a classifier, you need labelled examples: text + category.

News sites are perfect for this. They organise articles into sections (Technology, Business, Sports, Science) — those section names become your labels. Scraped automatically.

# dataset_builders/news_classifier_dataset.py
import asyncio
import httpx
import pandas as pd
import hashlib
import json
import re
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from pathlib import Path

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120",
    "Accept-Language": "en-US,en;q=0.9",
}

# News sources with their category URL patterns
# Structure: {category_label: [list_of_article_list_urls]}
NEWS_SOURCES = {
    "technology": [
        "https://techcrunch.com/category/artificial-intelligence/",
        "https://techcrunch.com/category/apps/",
        "https://www.theverge.com/tech",
    ],
    "science": [
        "https://www.sciencedaily.com/news/top/science/",
        "https://phys.org/news/",
    ],
    "business": [
        "https://techcrunch.com/category/startups/",
        "https://www.reuters.com/business/",
    ],
    "health": [
        "https://www.healthline.com/health-news",
        "https://www.medicalnewstoday.com/",
    ],
}

OUTPUT_DIR = Path("datasets/news_classification")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


def clean_text(raw: str) -> str:
    """
    Clean scraped article text for ML use.
    Removes boilerplate, normalises whitespace, strips HTML artifacts.
    """
    if not raw:
        return ""

    # Remove common boilerplate phrases
    boilerplate = [
        r"Subscribe to.*?newsletter",
        r"Sign up for.*?alerts",
        r"Click here to.*?\.",
        r"Read more:.*?\.",
        r"Advertisement\s*",
        r"Share this article",
        r"Follow us on.*?\.",
        r"\[.*?\]",               # Remove [citation needed] etc.
        r"©.*?reserved\.",        # Remove copyright notices
    ]
    text = raw
    for pattern in boilerplate:
        text = re.sub(pattern, " ", text, flags=re.IGNORECASE)

    # Normalise whitespace
    text = re.sub(r"\s+", " ", text).strip()

    return text


def extract_article_links(html: str, base_url: str) -> list[str]:
    """Extract article URLs from a news category/listing page."""
    from urllib.parse import urljoin, urlparse
    soup   = BeautifulSoup(html, "lxml")
    domain = urlparse(base_url).netloc
    links  = set()

    # Target article links (typically <a> tags with /article/ or year in path)
    for a in soup.select("a[href]"):
        href = a.get("href", "")
        full = urljoin(base_url, href)
        parsed = urlparse(full)

        # Only same-domain links that look like articles
        if parsed.netloc == domain and len(parsed.path) > 10:
            path = parsed.path.lower()
            if any(seg in path for seg in ["/20", "/article", "/story", "/news"]):
                links.add(full)

    return list(links)[:25]   # Limit per listing page


def parse_article(html: str, url: str) -> dict | None:
    """Extract article text and metadata from an article page."""
    soup = BeautifulSoup(html, "lxml")

    # Remove non-content elements
    for tag in soup(["script", "style", "nav", "footer", "header",
                     "aside", "form", "button", ".ad", ".advertisement",
                     ".related-articles", ".comments"]):
        tag.decompose()

    # Title
    title = None
    for selector in ["h1.article-title", "h1.entry-title", "h1"]:
        el = soup.select_one(selector)
        if el:
            title = el.get_text(strip=True)
            break

    if not title:
        return None

    # Article body — try common content selectors
    body_text = ""
    for selector in [
        "article", ".article-body", ".entry-content",
        ".post-content", "main p", ".story-body"
    ]:
        elements = soup.select(selector)
        if elements:
            body_text = " ".join(el.get_text(separator=" ", strip=True) for el in elements)
            if len(body_text) > 200:
                break

    body_clean = clean_text(body_text)

    if len(body_clean.split()) < 100:   # Skip very short articles
        return None

    return {
        "title":      title,
        "text":       body_clean,
        "url":        url,
        "word_count": len(body_clean.split()),
        "scraped_at": datetime.now(timezone.utc).isoformat(),
        "text_hash":  hashlib.md5(body_clean.encode()).hexdigest(),
    }


async def scrape_category(
    label: str,
    listing_urls: list[str],
    target_per_category: int = 200
) -> list[dict]:
    """Scrape articles for one category label."""
    articles = []
    seen_hashes = set()

    async with httpx.AsyncClient(headers=HEADERS, follow_redirects=True) as client:
        for listing_url in listing_urls:
            if len(articles) >= target_per_category:
                break

            try:
                # Fetch listing page
                r = await client.get(listing_url, timeout=20)
                article_links = extract_article_links(r.text, listing_url)
                print(f"  [{label}] Found {len(article_links)} links on {listing_url}")

                # Fetch each article
                for article_url in article_links:
                    if len(articles) >= target_per_category:
                        break

                    try:
                        ar = await client.get(article_url, timeout=20)
                        parsed = parse_article(ar.text, article_url)

                        if parsed and parsed["text_hash"] not in seen_hashes:
                            seen_hashes.add(parsed["text_hash"])
                            parsed["label"] = label
                            parsed["label_id"] = list(NEWS_SOURCES.keys()).index(label)
                            articles.append(parsed)
                            print(f"    ✓ [{label}] {parsed['title'][:60]}...")

                        await asyncio.sleep(1.0)

                    except Exception as e:
                        print(f"    ✗ Article error: {e}")
                        continue

            except Exception as e:
                print(f"  Listing error on {listing_url}: {e}")
                continue

    print(f"  [{label}] Collected {len(articles)} articles")
    return articles


async def build_news_classification_dataset(
    articles_per_category: int = 200
) -> pd.DataFrame:
    """Build a complete multi-category news classification dataset."""
    all_articles = []

    for label, urls in NEWS_SOURCES.items():
        print(f"\nScraping category: {label.upper()}")
        articles = await scrape_category(label, urls, articles_per_category)
        all_articles.extend(articles)

    df = pd.DataFrame(all_articles)

    # Deduplicate by hash
    df = df.drop_duplicates(subset=["text_hash"])

    # Shuffle for good measure
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)

    # Split into train/val/test (80/10/10)
    n         = len(df)
    train_end = int(n * 0.8)
    val_end   = int(n * 0.9)

    df["split"] = "train"
    df.loc[train_end:val_end, "split"] = "val"
    df.loc[val_end:,          "split"] = "test"

    # Save
    df.to_csv(OUTPUT_DIR / "news_dataset.csv", index=False)

    # Also save in HuggingFace datasets format
    for split in ["train", "val", "test"]:
        split_df = df[df["split"] == split][["title", "text", "label", "label_id"]]
        split_df.to_json(
            OUTPUT_DIR / f"{split}.jsonl",
            orient="records",
            lines=True,
            force_ascii=False
        )

    print(f"\n── Dataset Summary ──")
    print(f"Total articles: {len(df):,}")
    print(f"Train: {len(df[df['split']=='train']):,}")
    print(f"Val:   {len(df[df['split']=='val']):,}")
    print(f"Test:  {len(df[df['split']=='test']):,}")
    print(df.groupby("label")["text"].count().to_string())

    return df


# Run
df = asyncio.run(build_news_classification_dataset(articles_per_category=150))

Part 2: Sentiment Analysis Dataset from Product Reviews

Product reviews with star ratings are ideal for sentiment analysis — the rating is a built-in label that requires no manual annotation.

# dataset_builders/sentiment_dataset.py
import asyncio
import httpx
import pandas as pd
import re
from bs4 import BeautifulSoup
from pathlib import Path
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async

OUTPUT_DIR = Path("datasets/sentiment")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Map star rating to sentiment label
def rating_to_sentiment(rating: int) -> str:
    if rating >= 4:
        return "positive"
    elif rating == 3:
        return "neutral"
    else:
        return "negative"

def rating_to_label_id(rating: int) -> int:
    return {1: 0, 2: 0, 3: 1, 4: 2, 5: 2}.get(rating, 1)


async def scrape_trustpilot_reviews(
    company_slug: str,
    max_pages: int = 20
) -> list[dict]:
    """
    Scrape product/company reviews from Trustpilot.
    Trustpilot is server-rendered and accessible without login.

    Args:
        company_slug: The company identifier from Trustpilot URL
                      e.g. "www.amazon.com" for trustpilot.com/review/www.amazon.com
        max_pages: Number of review pages to scrape
    """
    base_url = f"https://www.trustpilot.com/review/{company_slug}"
    headers  = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120",
        "Accept-Language": "en-US,en;q=0.9",
    }
    reviews = []

    async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client:
        for page in range(1, max_pages + 1):
            url = f"{base_url}?page={page}&languages=en"
            print(f"  Scraping Trustpilot page {page}: {url}")

            try:
                r = await client.get(url, timeout=20)
                r.raise_for_status()
                soup = BeautifulSoup(r.text, "lxml")

                # Each review card
                for card in soup.select("[data-service-review-card-paper]"):
                    # Rating (1-5 stars)
                    rating_el = card.select_one("[data-service-review-rating]")
                    rating    = int(rating_el.get("data-service-review-rating", 0)) if rating_el else None

                    # Title
                    title_el = card.select_one("[data-service-review-title-typography]")
                    title    = title_el.get_text(strip=True) if title_el else ""

                    # Body text
                    body_el = card.select_one("[data-service-review-text-typography]")
                    body    = body_el.get_text(strip=True) if body_el else ""

                    # Date
                    date_el = card.select_one("time[datetime]")
                    date    = date_el.get("datetime", "") if date_el else ""

                    # Verified purchase
                    verified = bool(card.select_one("[data-review-label]"))

                    if rating and body and len(body) > 20:
                        sentiment = rating_to_sentiment(rating)
                        reviews.append({
                            "text":      f"{title}. {body}".strip(". "),
                            "title":     title,
                            "body":      body,
                            "rating":    rating,
                            "sentiment": sentiment,
                            "label_id":  rating_to_label_id(rating),
                            "verified":  verified,
                            "date":      date,
                            "source":    f"trustpilot/{company_slug}",
                        })

                await asyncio.sleep(1.5)

            except Exception as e:
                print(f"  Error on page {page}: {e}")
                break

    print(f"  Scraped {len(reviews)} reviews for {company_slug}")
    return reviews


async def scrape_app_store_reviews(
    app_id: str,
    country: str = "us",
    max_pages: int = 10
) -> list[dict]:
    """
    Scrape App Store reviews via iTunes API.
    Returns up to max_pages * 50 reviews per app.
    """
    reviews = []
    base = f"https://itunes.apple.com/{country}/rss/customerreviews"

    async with httpx.AsyncClient() as client:
        for page in range(1, max_pages + 1):
            url    = f"{base}/page={page}/id={app_id}/sortby=mostrecent/json"
            try:
                r    = await client.get(url, timeout=15)
                data = r.json()
                entries = data.get("feed", {}).get("entry", [])

                if not entries:
                    break

                for entry in entries:
                    rating_str = entry.get("im:rating", {}).get("label", "3")
                    try:
                        rating = int(rating_str)
                    except ValueError:
                        continue

                    title = entry.get("title", {}).get("label", "")
                    body  = entry.get("content", {}).get("label", "")

                    if body and len(body) > 15:
                        reviews.append({
                            "text":      f"{title}. {body}".strip(". "),
                            "title":     title,
                            "body":      body,
                            "rating":    rating,
                            "sentiment": rating_to_sentiment(rating),
                            "label_id":  rating_to_label_id(rating),
                            "source":    f"appstore/{app_id}",
                        })

                await asyncio.sleep(0.5)

            except Exception as e:
                print(f"  App Store error page {page}: {e}")
                break

    print(f"  App Store: {len(reviews)} reviews for app {app_id}")
    return reviews


async def build_sentiment_dataset() -> pd.DataFrame:
    """
    Combine reviews from multiple sources into a balanced sentiment dataset.
    """
    all_reviews = []

    # Trustpilot reviews for multiple companies/products
    companies = ["amazon.com", "booking.com", "airbnb.com", "netflix.com"]
    for company in companies:
        reviews = await scrape_trustpilot_reviews(company, max_pages=10)
        all_reviews.extend(reviews)
        await asyncio.sleep(2)

    # App Store reviews for popular apps
    app_ids = {
        "284882215": "facebook",
        "310633997": "whatsapp",
        "389801252": "instagram",
        "474141758": "youtube",
    }
    for app_id, name in app_ids.items():
        reviews = await scrape_app_store_reviews(app_id, max_pages=5)
        for r in reviews:
            r["source"] = f"appstore/{name}"
        all_reviews.extend(reviews)

    df = pd.DataFrame(all_reviews).drop_duplicates(subset=["text"])

    # Balance classes — equal samples per sentiment
    min_class = df["sentiment"].value_counts().min()
    balanced  = pd.concat([
        df[df["sentiment"] == cls].sample(min(min_class, len(df[df["sentiment"] == cls])),
                                           random_state=42)
        for cls in ["positive", "neutral", "negative"]
    ]).sample(frac=1, random_state=42).reset_index(drop=True)

    # Splits
    n         = len(balanced)
    balanced["split"] = "train"
    balanced.loc[int(n*0.8):int(n*0.9), "split"] = "val"
    balanced.loc[int(n*0.9):,           "split"] = "test"

    balanced.to_csv(OUTPUT_DIR / "sentiment_dataset.csv", index=False)

    for split in ["train", "val", "test"]:
        (balanced[balanced["split"] == split][["text", "sentiment", "label_id"]]
         .to_json(OUTPUT_DIR / f"{split}.jsonl", orient="records",
                  lines=True, force_ascii=False))

    print(f"\n── Sentiment Dataset ──")
    print(f"Total: {len(balanced):,}")
    print(balanced["sentiment"].value_counts().to_string())

    return balanced

df = asyncio.run(build_sentiment_dataset())

Part 3: Image Dataset Collection

# dataset_builders/image_dataset.py
import asyncio
import httpx
import hashlib
from pathlib import Path
from PIL import Image, ImageFilter
import io
import json

OUTPUT_DIR = Path("datasets/images")


async def scrape_unsplash_images(
    query: str,
    label: str,
    target_count: int = 200,
    min_size: tuple = (224, 224)
) -> list[dict]:
    """
    Download images from Unsplash for a given query.
    Unsplash images are licensed for free use via their API.
    Get a free API key at unsplash.com/developers
    """
    API_KEY  = "YOUR_UNSPLASH_ACCESS_KEY"
    BASE_URL = "https://api.unsplash.com/search/photos"
    headers  = {"Authorization": f"Client-ID {API_KEY}"}

    label_dir = OUTPUT_DIR / label
    label_dir.mkdir(parents=True, exist_ok=True)

    collected = []
    page      = 1

    async with httpx.AsyncClient(headers=headers) as client:
        while len(collected) < target_count:
            params = {"query": query, "page": page, "per_page": 30,
                      "orientation": "squarish"}
            r = await client.get(BASE_URL, params=params, timeout=15)
            data = r.json()
            results = data.get("results", [])

            if not results:
                break

            for photo in results:
                if len(collected) >= target_count:
                    break

                # Download the "small" size (400px wide) — good for ML
                img_url  = photo["urls"]["small"]
                img_id   = photo["id"]
                filepath = label_dir / f"{img_id}.jpg"

                if filepath.exists():
                    continue

                try:
                    img_r = await client.get(img_url, timeout=20)
                    img   = Image.open(io.BytesIO(img_r.content)).convert("RGB")

                    # Quality filter: skip images smaller than min_size
                    if img.size[0] < min_size[0] or img.size[1] < min_size[1]:
                        continue

                    # Save resized to 224x224 (standard for CNN input)
                    img_resized = img.resize((224, 224), Image.LANCZOS)
                    img_resized.save(filepath, "JPEG", quality=90)

                    collected.append({
                        "filepath":    str(filepath),
                        "label":       label,
                        "image_id":    img_id,
                        "source_url":  photo["links"]["html"],
                        "width":       img.size[0],
                        "height":      img.size[1],
                        "description": photo.get("alt_description", ""),
                        "photographer":photo["user"]["name"],
                        "license":     "Unsplash License",
                    })
                    print(f"  [{label}] Downloaded {len(collected)}/{target_count}: {img_id}")

                except Exception as e:
                    print(f"  Image error ({img_url}): {e}")
                    continue

                await asyncio.sleep(0.3)

            page += 1

    return collected


async def build_image_dataset(
    categories: dict,   # {label: search_query}
    images_per_class: int = 500
) -> dict:
    """
    Build a multi-class image classification dataset.
    Returns metadata dict with train/val/test splits.
    """
    import random
    all_images = []

    for label, query in categories.items():
        print(f"\nDownloading '{label}' ({query})...")
        images = await scrape_unsplash_images(query, label, images_per_class)
        all_images.extend(images)
        await asyncio.sleep(1)

    # Shuffle and split
    random.shuffle(all_images)
    n         = len(all_images)
    train_end = int(n * 0.8)
    val_end   = int(n * 0.9)

    for i, img in enumerate(all_images):
        if i < train_end:
            img["split"] = "train"
        elif i < val_end:
            img["split"] = "val"
        else:
            img["split"] = "test"

    # Save metadata
    metadata_path = OUTPUT_DIR / "metadata.json"
    with open(metadata_path, "w") as f:
        json.dump(all_images, f, indent=2)

    # Create class mapping file
    class_map = {label: i for i, label in enumerate(categories.keys())}
    with open(OUTPUT_DIR / "classes.json", "w") as f:
        json.dump(class_map, f, indent=2)

    # Summary
    from collections import Counter
    label_counts = Counter(img["label"] for img in all_images)
    split_counts = Counter(img["split"] for img in all_images)

    print(f"\n── Image Dataset Summary ──")
    print(f"Total images: {n:,}")
    for label, count in label_counts.items():
        print(f"  {label}: {count}")
    print(f"Train: {split_counts['train']} | Val: {split_counts['val']} | Test: {split_counts['test']}")
    print(f"Metadata saved to {metadata_path}")

    return {"metadata": all_images, "class_map": class_map}


# Build a food classification dataset
categories = {
    "pizza":   "pizza food italian",
    "sushi":   "sushi japanese food",
    "burger":  "burger hamburger fast food",
    "salad":   "salad healthy food green",
    "pasta":   "pasta noodles italian food",
}

dataset = asyncio.run(build_image_dataset(categories, images_per_class=300))

Part 4: NER Dataset from Wikipedia

Named Entity Recognition (NER) datasets require text annotated with entity spans (persons, organisations, locations). Wikipedia's structured text and internal links are an excellent source — the anchor text of links is already a form of entity annotation.

# dataset_builders/ner_dataset.py
import asyncio
import httpx
import json
import re
from bs4 import BeautifulSoup
from pathlib import Path

OUTPUT_DIR = Path("datasets/ner")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

HEADERS = {"User-Agent": "NERDatasetBuilder/1.0 (research@example.com)"}

# Entity type mapping based on Wikipedia category patterns
ENTITY_PATTERNS = {
    "PERSON":   [r"/wiki/[A-Z][a-z]+_[A-Z][a-z]+",
                 r"/wiki/[A-Z][a-z]+_[A-Z][a-z]+_[A-Z][a-z]+"],
    "ORG":      [r"/wiki/.*(company|corporation|inc|ltd|org|group|foundation).*",
                 r"/wiki/.*(University|College|Institute|School).*"],
    "GPE":      [r"/wiki/.*(country|city|state|province|district).*",
                 r"/wiki/[A-Z][a-z]+,_"],
}


async def extract_wiki_ner_examples(
    article_title: str
) -> list[dict]:
    """
    Extract NER training examples from a Wikipedia article.
    Uses anchor text of internal links as entity annotations.
    """
    url = f"https://en.wikipedia.org/wiki/{article_title.replace(' ', '_')}"

    async with httpx.AsyncClient(headers=HEADERS) as client:
        r = await client.get(url, timeout=15)

    soup = BeautifulSoup(r.text, "lxml")
    content_div = soup.select_one("#mw-content-text")
    if not content_div:
        return []

    examples = []

    # Process each paragraph
    for para in content_div.select("p"):
        para_text = para.get_text(separator=" ", strip=True)

        # Skip very short paragraphs
        if len(para_text.split()) < 20:
            continue

        # Find entity spans by processing anchor tags
        entities = []
        for link in para.select("a[href^='/wiki/']"):
            entity_text = link.get_text(strip=True)
            href        = link.get("href", "")

            if not entity_text or len(entity_text) < 2:
                continue

            # Determine entity type from link target
            entity_type = "MISC"
            href_lower  = href.lower()

            if re.search(r"/wiki/[A-Z][a-z]+ [A-Z][a-z]+", href):
                entity_type = "PERSON"
            elif any(kw in href_lower for kw in ["_city", "_country", ",_", "_state"]):
                entity_type = "GPE"
            elif any(kw in href_lower for kw in ["company", "corporation", "university", "inc"]):
                entity_type = "ORG"

            # Find character position in paragraph text
            start = para_text.find(entity_text)
            if start >= 0:
                entities.append({
                    "text":  entity_text,
                    "start": start,
                    "end":   start + len(entity_text),
                    "label": entity_type,
                })

        if entities:
            examples.append({
                "text":       para_text,
                "entities":   entities,
                "source_url": url,
                "article":    article_title,
            })

    return examples


async def build_ner_dataset(
    seed_articles: list[str],
    examples_per_article: int = 20
) -> list[dict]:
    """Build an NER dataset from Wikipedia articles."""
    all_examples = []

    for title in seed_articles:
        print(f"Processing: {title}")
        examples = await extract_wiki_ner_examples(title)
        all_examples.extend(examples[:examples_per_article])
        await asyncio.sleep(0.5)

    # Save in spaCy training format
    spacy_format = []
    for ex in all_examples:
        ents = [(e["start"], e["end"], e["label"]) for e in ex["entities"]]
        spacy_format.append((ex["text"], {"entities": ents}))

    with open(OUTPUT_DIR / "ner_training_data.json", "w") as f:
        json.dump(all_examples, f, indent=2, ensure_ascii=False)

    print(f"\nBuilt NER dataset: {len(all_examples)} examples")
    return all_examples


seed_topics = [
    "Artificial intelligence", "Google", "Apple Inc.",
    "Elon Musk", "India", "World War II",
    "Python (programming language)", "OpenAI",
]

ner_data = asyncio.run(build_ner_dataset(seed_topics, examples_per_article=15))

Part 5: Data Quality Pipeline

Raw scraped data is never ready for training without cleaning. Here's a reusable quality pipeline:

# quality/data_quality_pipeline.py
import pandas as pd
import re
import hashlib
from langdetect import detect
from collections import Counter

def run_quality_pipeline(df: pd.DataFrame, text_col: str = "text") -> pd.DataFrame:
    """
    Run a full data quality pipeline on a text dataset.
    Returns cleaned DataFrame with quality report.
    """
    initial_count = len(df)
    report = {}

    print(f"Starting quality pipeline: {initial_count:,} records")

    # ── Step 1: Remove exact duplicates ──────────────────────
    df = df.drop_duplicates(subset=[text_col])
    report["after_dedup"] = len(df)
    print(f"  After dedup: {len(df):,} (-{initial_count - len(df):,})")

    # ── Step 2: Near-duplicate detection (simhash) ───────────
    def simple_hash(text: str) -> str:
        """Cheap near-duplicate hash using first/last 100 chars."""
        signature = text[:100] + text[-100:]
        return hashlib.md5(signature.encode()).hexdigest()

    df["_near_hash"] = df[text_col].apply(simple_hash)
    df = df.drop_duplicates(subset=["_near_hash"]).drop(columns=["_near_hash"])
    report["after_near_dedup"] = len(df)
    print(f"  After near-dedup: {len(df):,}")

    # ── Step 3: Length filters ────────────────────────────────
    df["_word_count"] = df[text_col].str.split().str.len()
    df = df[(df["_word_count"] >= 20) & (df["_word_count"] <= 1000)]
    df = df.drop(columns=["_word_count"])
    report["after_length_filter"] = len(df)
    print(f"  After length filter: {len(df):,}")

    # ── Step 4: Language detection (English only) ────────────
    def is_english(text: str) -> bool:
        try:
            return detect(text[:200]) == "en"
        except Exception:
            return True   # Keep if detection fails

    df = df[df[text_col].apply(is_english)]
    report["after_lang_filter"] = len(df)
    print(f"  After English filter: {len(df):,}")

    # ── Step 5: Content quality filters ──────────────────────
    def is_quality_text(text: str) -> bool:
        text_lower = text.lower()
        # Reject texts that are mostly numbers
        digit_ratio = sum(c.isdigit() for c in text) / max(len(text), 1)
        if digit_ratio > 0.3:
            return False
        # Reject texts with too many special chars (likely tables/code)
        special_ratio = sum(not c.isalnum() and not c.isspace() for c in text) / max(len(text), 1)
        if special_ratio > 0.25:
            return False
        # Reject spam patterns
        spam_patterns = ["click here", "buy now", "limited offer",
                         "subscribe now", "free trial", "©"]
        if sum(p in text_lower for p in spam_patterns) >= 2:
            return False
        return True

    df = df[df[text_col].apply(is_quality_text)]
    report["after_quality_filter"] = len(df)
    print(f"  After quality filter: {len(df):,}")

    # ── Step 6: Text normalisation ────────────────────────────
    def normalise(text: str) -> str:
        text = re.sub(r"http\S+|www\S+", "", text)        # Remove URLs
        text = re.sub(r"@\w+",           "", text)         # Remove @mentions
        text = re.sub(r"#(\w+)",         r"\1", text)      # Hashtags → plain text
        text = re.sub(r"\s+",            " ",  text)       # Normalise whitespace
        return text.strip()

    df[text_col] = df[text_col].apply(normalise)

    # ── Final report ──────────────────────────────────────────
    final_count = len(df)
    retention   = final_count / initial_count * 100

    print(f"\n── Quality Pipeline Report ──")
    print(f"  Input:     {initial_count:,}")
    print(f"  Output:    {final_count:,}")
    print(f"  Retained:  {retention:.1f}%")
    print(f"  Removed:   {initial_count - final_count:,}")

    return df.reset_index(drop=True)


# Apply to news dataset
df = pd.read_csv("datasets/news_classification/news_dataset.csv")
clean_df = run_quality_pipeline(df, text_col="text")
clean_df.to_csv("datasets/news_classification/news_dataset_clean.csv", index=False)

Part 6: Uploading to HuggingFace Hub

Once your dataset is clean and split, publish it to HuggingFace for easy reuse:

from datasets import Dataset, DatasetDict
import pandas as pd

def upload_to_huggingface(
    dataset_dir: str,
    repo_id: str,
    token: str
):
    """
    Upload a scraped dataset to HuggingFace Hub.

    Args:
        dataset_dir: Path containing train.jsonl, val.jsonl, test.jsonl
        repo_id: Your HF repo, e.g. "username/news-classification-2026"
        token: HuggingFace write token from huggingface.co/settings/tokens
    """
    splits = {}
    for split in ["train", "val", "test"]:
        path = f"{dataset_dir}/{split}.jsonl"
        df   = pd.read_json(path, lines=True)
        splits[split] = Dataset.from_pandas(df)

    dataset_dict = DatasetDict(splits)
    dataset_dict.push_to_hub(repo_id, token=token)
    print(f"Dataset uploaded to: https://huggingface.co/datasets/{repo_id}")


upload_to_huggingface(
    dataset_dir = "datasets/news_classification",
    repo_id     = "your-username/news-classification-2026",
    token       = "hf_your_write_token_here"
)

Dataset Checklist Before Training

Before using a scraped dataset for ML:

  • Deduplicated — both exact and near-duplicate removal

  • Language filtered — only target language retained

  • Length filtered — minimum and maximum token count enforced

  • Balanced — roughly equal class representation (or imbalance documented)

  • Splits fixed — train/val/test split done before any preprocessing

  • No data leakage — val/test URLs do not appear in train

  • Licence documented — source licence verified and recorded

  • PII removed — emails, phone numbers, names stripped if not needed

  • Quality spot-checked — human review of 50–100 random examples

  • Label distribution logged — class counts saved alongside data


Summary

Dataset type

Source

Label source

Output

Text classification

News sites

URL category path

CSV + JSONL

Sentiment analysis

Trustpilot / App Store

Star rating

CSV + JSONL

Image classification

Unsplash API

Search query

JPEGs + metadata.json

NER

Wikipedia links

Anchor text type

JSON (spaCy format)

Z

ZyVOP

Passionate developer sharing knowledge about modern web technologies and best practices.

Comments (0)

Login to post a comment.

Stay Updated

Get the latest articles delivered to your inbox.

We respect your privacy. Unsubscribe anytime.

Popular Tags

#.env.example Node.js#0x profiling#10x faster python scraper tutorial#12-factor#2026#AI#AI agents#AI code quality#AI code security#AI coding