Build a Price Monitoring System with Python: Amazon, Shopify & eBay (2026)
Learn how to track product prices across Amazon, Shopify stores, and eBay using Python. Build automated alerts, historical price storage, and scheduled monitoring from scratch.
Senior Developer

Why Price Monitoring Is the Most Practical Scraping Project
Amazon remains the largest e-commerce platform in the world, and extracting product data from it — prices, reviews, seller information — is one of the most common web scraping use cases. Whether you're building a deal-finder for personal use, doing competitive analysis for a business, or building a side product that alerts users when prices drop, a price monitoring system is arguably the most immediately useful scraper you can build.
Here's what makes it interesting technically in 2026:
Modern e-commerce sites — Shopify stores, Amazon, direct-to-consumer brands — render prices with JavaScript, hide them behind A/B tests, and protect their pages with sophisticated bot detection. A naive scraper that worked in 2022 breaks instantly today.
Amazon reprices millions of products every day, and according to industry monitoring data, 37 percent of active Amazon monitors check at least hourly and 12 percent run at five-minute intervals. Electronics and trending categories can move dozens of times per day.
This guide builds a complete, production-ready price monitoring system step by step — from scraping the price off a single page all the way to a scheduled multi-platform tracker with email alerts.
What We'll Build
By the end of this guide you'll have:
A multi-platform scraper that monitors prices on Amazon, Shopify stores, and eBay
A SQLite price history database that tracks every price change over time
A price-drop alert system that emails you when a product hits your target price
A daily scheduler that runs automatically without you touching it
A trend report showing price movements over time per product
Understanding the Platforms
Each platform has different scraping characteristics:
Platform | Rendering | Anti-Bot | Price Location | Difficulty |
|---|---|---|---|---|
Amazon | Mostly server-side | Very aggressive |
| Hard |
Shopify | Server + JS hybrid | Low–Medium |
| Easy |
eBay | Server-side | Moderate |
| Medium |
Generic sites | Varies | Low | Custom per site | Varies |
Shopify is the easiest — every Shopify store exposes a clean JSON API at /products/[handle].json that returns the full product including all variant prices. Amazon is the hardest, as it reprices constantly and blocks aggressively.
Part 1: The Database Layer
Build the storage foundation first — everything else writes into it.
# db.py
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
DB_PATH = "price_monitor.db"
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # Access columns by name
return conn
def init_db():
"""Create tables if they don't exist."""
with get_conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
url TEXT NOT NULL UNIQUE,
platform TEXT NOT NULL,
target_price REAL,
alert_email TEXT,
active INTEGER DEFAULT 1,
added_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL REFERENCES products(id),
price REAL,
currency TEXT DEFAULT 'USD',
in_stock INTEGER,
scraped_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_history_product
ON price_history(product_id, scraped_at DESC);
""")
print("Database initialised.")
def add_product(name, url, platform, target_price=None, alert_email=None):
"""Register a new product to monitor."""
with get_conn() as conn:
try:
conn.execute("""
INSERT INTO products (name, url, platform, target_price, alert_email)
VALUES (?, ?, ?, ?, ?)
""", (name, url, platform, target_price, alert_email))
print(f"Added: {name}")
except sqlite3.IntegrityError:
print(f"Already exists: {url}")
def save_price(product_id, price, currency="USD", in_stock=True):
"""Record a price observation."""
with get_conn() as conn:
conn.execute("""
INSERT INTO price_history (product_id, price, currency, in_stock)
VALUES (?, ?, ?, ?)
""", (product_id, price, currency, int(in_stock)))
def get_active_products():
"""Return all active monitored products."""
with get_conn() as conn:
return conn.execute(
"SELECT * FROM products WHERE active = 1"
).fetchall()
def get_price_history(product_id, days=30):
"""Return recent price history for a product."""
with get_conn() as conn:
return conn.execute("""
SELECT price, currency, in_stock, scraped_at
FROM price_history
WHERE product_id = ?
AND scraped_at >= datetime('now', ?)
ORDER BY scraped_at ASC
""", (product_id, f"-{days} days")).fetchall()
def get_lowest_price(product_id):
"""Return the all-time lowest recorded price."""
with get_conn() as conn:
result = conn.execute(
"SELECT MIN(price) FROM price_history WHERE product_id = ?",
(product_id,)
).fetchone()
return result[0]
Part 2: Scraping Amazon
Scraping Amazon prices with Python means pulling five related numbers off a product page: the current Buy Box price, the "was" price (list price), any Subscribe and Save price, per-variant prices, and the price of any other-sellers offers.
# scrapers/amazon.py
import re
import random
import asyncio
from curl_cffi.requests import AsyncSession
from bs4 import BeautifulSoup
HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"sec-ch-ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Upgrade-Insecure-Requests": "1",
}
def parse_amazon_price(html: str) -> dict:
"""
Extract price data from an Amazon product page.
Targets the most stable selectors as of June 2026.
"""
soup = BeautifulSoup(html, "lxml")
result = {
"price": None,
"was_price": None,
"currency": "USD",
"in_stock": False,
"title": None,
"asin": None,
}
# ── Title ─────────────────────────────────────────────────
title_el = soup.select_one("#productTitle")
if title_el:
result["title"] = title_el.get_text(strip=True)
# ── Current Price (Buy Box) ────────────────────────────────
# Method 1: Standard Buy Box price — most common location
whole = soup.select_one(".a-price-whole")
frac = soup.select_one(".a-price-fraction")
if whole:
price_str = whole.get_text(strip=True).replace(",", "").rstrip(".")
if frac:
price_str += "." + frac.get_text(strip=True)
try:
result["price"] = float(price_str)
except ValueError:
pass
# Method 2: Fallback — hidden offscreen price (more reliable on some pages)
if not result["price"]:
offscreen = soup.select_one(".a-offscreen")
if offscreen:
price_text = offscreen.get_text(strip=True)
numbers = re.findall(r"[\d,]+\.?\d*", price_text.replace(",", ""))
if numbers:
try:
result["price"] = float(numbers[0])
except ValueError:
pass
# ── Was / List Price ──────────────────────────────────────
was_el = soup.select_one(".basisPrice .a-offscreen")
if was_el:
was_text = was_el.get_text(strip=True)
numbers = re.findall(r"[\d,]+\.?\d*", was_text.replace(",", ""))
if numbers:
try:
result["was_price"] = float(numbers[0])
except ValueError:
pass
# ── Stock Status ──────────────────────────────────────────
avail_el = soup.select_one("#availability span")
if avail_el:
avail_text = avail_el.get_text(strip=True).lower()
result["in_stock"] = "in stock" in avail_text
# ── Currency (detect from symbol) ────────────────────────
price_symbol = soup.select_one(".a-price-symbol")
if price_symbol:
symbol = price_symbol.get_text(strip=True)
symbol_map = {"$": "USD", "£": "GBP", "€": "EUR", "₹": "INR"}
result["currency"] = symbol_map.get(symbol, "USD")
# ── ASIN from URL or page ─────────────────────────────────
asin_input = soup.select_one("#ASIN")
if asin_input:
result["asin"] = asin_input.get("value")
return result
async def scrape_amazon(url: str, proxy: str = None) -> dict:
"""Fetch and parse a single Amazon product page."""
proxies = {"https": proxy} if proxy else None
async with AsyncSession(impersonate="chrome120") as session:
# Visit homepage first for a more natural session pattern
await session.get(
"https://www.amazon.com",
headers=HEADERS,
proxies=proxies,
timeout=15
)
await asyncio.sleep(random.uniform(1.5, 3.0))
response = await session.get(
url,
headers=HEADERS,
proxies=proxies,
timeout=20
)
if response.status_code != 200:
raise Exception(f"Amazon returned {response.status_code}")
# Detect block page
if "Type the characters you see" in response.text or \
"Enter the characters you see below" in response.text:
raise Exception("CAPTCHA encountered — rotate proxy and retry")
return parse_amazon_price(response.text)Part 3: Scraping Shopify (The Easy Way)
Every Shopify store has a secret weapon: the /products/[handle].json endpoint returns clean JSON with full product data including all variant prices, no scraping needed.
# scrapers/shopify.py
import httpx
import re
from urllib.parse import urlparse
async def scrape_shopify(url: str) -> dict:
"""
Extract price from any Shopify store using the built-in JSON API.
Works on all Shopify stores — no CSS selectors needed.
Supported URL formats:
- https://store.com/products/my-product
- https://store.myshopify.com/products/handle
"""
parsed = urlparse(url)
base = f"{parsed.scheme}://{parsed.netloc}"
# Extract product handle from URL path
# URL pattern: /products/product-handle or /collections/x/products/handle
path_match = re.search(r"/products/([^/?#]+)", parsed.path)
if not path_match:
raise ValueError(f"Cannot extract product handle from: {url}")
handle = path_match.group(1)
json_url = f"{base}/products/{handle}.json"
headers = {
"User-Agent": "Mozilla/5.0 (compatible; PriceBot/1.0)",
"Accept": "application/json",
}
async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client:
r = await client.get(json_url, timeout=15)
r.raise_for_status()
data = r.json()
product = data["product"]
variants = product.get("variants", [])
if not variants:
return {"price": None, "in_stock": False, "title": product.get("title")}
# Get the first available variant's price
# (or lowest price if all variants have different prices)
prices = []
for v in variants:
if v.get("available", True):
try:
prices.append(float(v["price"]))
except (ValueError, KeyError):
pass
lowest_price = min(prices) if prices else None
return {
"title": product.get("title"),
"price": lowest_price,
"was_price": float(variants[0].get("compare_at_price") or 0) or None,
"currency": "USD", # Shopify JSON doesn't include currency — check storefront
"in_stock": any(v.get("available", False) for v in variants),
"variants_count": len(variants),
"vendor": product.get("vendor"),
"product_type": product.get("product_type"),
}Part 4: Scraping eBay
# scrapers/ebay.py
import httpx
import re
from bs4 import BeautifulSoup
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120",
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
}
async def scrape_ebay(url: str) -> dict:
"""Extract price from an eBay listing page."""
async with httpx.AsyncClient(headers=HEADERS, follow_redirects=True) as client:
r = await client.get(url, timeout=20)
r.raise_for_status()
soup = BeautifulSoup(r.text, "lxml")
result = {"price": None, "was_price": None, "in_stock": True,
"currency": "USD", "title": None}
# Title
title_el = soup.select_one("h1.x-item-title__mainTitle span")
if title_el:
result["title"] = title_el.get_text(strip=True)
# Current price
price_el = soup.select_one(".x-price-primary span.ux-textspans")
if price_el:
price_text = price_el.get_text(strip=True)
nums = re.findall(r"[\d,]+\.?\d*", price_text.replace(",", ""))
if nums:
try:
result["price"] = float(nums[0])
except ValueError:
pass
# Was price (strikethrough)
was_el = soup.select_one(".x-additional-info__original-price span")
if was_el:
was_text = was_el.get_text(strip=True)
nums = re.findall(r"[\d,]+\.?\d*", was_text.replace(",", ""))
if nums:
try:
result["was_price"] = float(nums[0])
except ValueError:
pass
# Stock status
qty_el = soup.select_one(".d-quantity__availability")
if qty_el:
qty_text = qty_el.get_text(strip=True).lower()
result["in_stock"] = "sold out" not in qty_text and "unavailable" not in qty_text
return resultPart 5: The Unified Monitor
# monitor.py
import asyncio
import random
from db import get_active_products, save_price, get_lowest_price, init_db
from scrapers.amazon import scrape_amazon
from scrapers.shopify import scrape_shopify
from scrapers.ebay import scrape_ebay
SCRAPER_MAP = {
"amazon": scrape_amazon,
"shopify": scrape_shopify,
"ebay": scrape_ebay,
}
async def check_product(product) -> dict | None:
"""Scrape price for one product and save to database."""
scraper = SCRAPER_MAP.get(product["platform"])
if not scraper:
print(f" No scraper for platform: {product['platform']}")
return None
try:
data = await scraper(product["url"])
if data.get("price") is None:
print(f" [{product['name']}] No price found")
return None
save_price(
product_id=product["id"],
price=data["price"],
currency=data.get("currency", "USD"),
in_stock=data.get("in_stock", True),
)
print(f" [{product['name']}] ${data['price']:.2f}"
f"{' ⚠ OUT OF STOCK' if not data.get('in_stock') else ''}")
return data
except Exception as e:
print(f" [{product['name']}] Error: {e}")
return None
async def run_monitor():
"""Run one monitoring cycle across all active products."""
products = get_active_products()
if not products:
print("No products to monitor. Add some with add_product().")
return
print(f"\nMonitoring {len(products)} products...\n")
results = {}
for product in products:
data = await check_product(product)
if data:
results[product["id"]] = data
# Polite delay between requests
await asyncio.sleep(random.uniform(2.0, 5.0))
return resultsPart 6: Price-Drop Email Alerts
# alerts.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from db import get_conn, get_lowest_price
# Configure your email credentials
SMTP_HOST = "smtp.gmail.com"
SMTP_PORT = 587
SENDER_EMAIL = "your_monitor@gmail.com"
SENDER_PASS = "your_app_password" # Use Gmail App Password, not your real password
def send_alert(to_email: str, product_name: str, current_price: float,
target_price: float, product_url: str, currency: str = "USD"):
"""Send a price-drop email alert."""
symbol = {"USD": "$", "GBP": "£", "EUR": "€", "INR": "₹"}.get(currency, "$")
subject = f"🔔 Price Drop Alert: {product_name}"
body = f"""
Great news! The price for a product you're monitoring has dropped.
Product : {product_name}
Current price : {symbol}{current_price:.2f}
Your target : {symbol}{target_price:.2f}
Savings : {symbol}{target_price - current_price:.2f}
👉 Buy now: {product_url}
--
Python Price Monitor
"""
msg = MIMEMultipart()
msg["From"] = SENDER_EMAIL
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SENDER_EMAIL, SENDER_PASS)
server.send_message(msg)
print(f" 📧 Alert sent to {to_email} for {product_name}")
except Exception as e:
print(f" Alert send failed: {e}")
def check_and_alert(product, current_price: float):
"""Check if current price triggers an alert and send email if so."""
target = product["target_price"]
email = product["alert_email"]
if not target or not email:
return # No alert configured for this product
if current_price <= target:
print(f" 🎯 Target hit! {product['name']}: ${current_price:.2f} ≤ ${target:.2f}")
send_alert(
to_email=email,
product_name=product["name"],
current_price=current_price,
target_price=target,
product_url=product["url"],
)
Part 7: Price Trend Reporting
# report.py
import pandas as pd
from db import get_conn, get_active_products, get_price_history
def generate_price_report(days: int = 30) -> pd.DataFrame:
"""Generate a full price trend report for all monitored products."""
products = get_active_products()
rows = []
for product in products:
history = get_price_history(product["id"], days=days)
if not history:
continue
prices = [row["price"] for row in history if row["price"]]
if not prices:
continue
latest = prices[-1]
lowest = min(prices)
highest = max(prices)
first = prices[0]
change = ((latest - first) / first * 100) if first else 0
rows.append({
"product": product["name"],
"platform": product["platform"],
"current_price": round(latest, 2),
"lowest_price": round(lowest, 2),
"highest_price": round(highest, 2),
"price_change_%": round(change, 1),
"observations": len(prices),
"target_price": product["target_price"],
"below_target": latest <= product["target_price"] if product["target_price"] else None,
})
df = pd.DataFrame(rows)
return df
def print_report():
df = generate_price_report(days=30)
if df.empty:
print("No price history yet. Run the monitor first.")
return
print("\n══ PRICE MONITORING REPORT (Last 30 days) ══\n")
print(df.to_string(index=False))
below = df[df["below_target"] == True]
if not below.empty:
print(f"\n🎯 {len(below)} product(s) currently at or below target price:")
for _, row in below.iterrows():
print(f" • {row['product']}: ${row['current_price']:.2f} "
f"(target: ${row['target_price']:.2f})")Part 8: Complete Setup and Scheduler
# main.py — run this file to start monitoring
import asyncio
import schedule
import time
from db import init_db, add_product
from monitor import run_monitor
from alerts import check_and_alert
from report import print_report
from db import get_active_products, get_price_history
def setup():
"""Initialise database and add products to monitor."""
init_db()
# Amazon products — use full product URLs
add_product(
name="Sony WH-1000XM5 Headphones",
url="https://www.amazon.com/dp/B09XS7JWHH",
platform="amazon",
target_price=280.00,
alert_email="you@example.com"
)
# Shopify store products
add_product(
name="Example Shopify Product",
url="https://some-shopify-store.com/products/product-handle",
platform="shopify",
target_price=50.00,
alert_email="you@example.com"
)
# eBay listings
add_product(
name="Vintage Camera on eBay",
url="https://www.ebay.com/itm/123456789",
platform="ebay",
target_price=120.00,
alert_email="you@example.com"
)
async def monitoring_cycle():
"""One full monitoring pass with alert checking."""
products = get_active_products()
from monitor import check_product
from scrapers.amazon import scrape_amazon
from scrapers.shopify import scrape_shopify
from scrapers.ebay import scrape_ebay
for product in products:
data = await check_product(product)
if data and data.get("price"):
check_and_alert(product, data["price"])
print_report()
def run_scheduled():
"""Run monitoring on a schedule."""
asyncio.run(monitoring_cycle())
print("Next run in 6 hours.")
if __name__ == "__main__":
setup()
# Run immediately on start
asyncio.run(monitoring_cycle())
# Then every 6 hours
schedule.every(6).hours.do(run_scheduled)
print("\nScheduler running. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(60)Pro Tips for Reliable Price Monitoring
Tip 1 — Store raw HTML alongside prices If your parser breaks after a site redesign, you can re-parse from stored HTML without re-scraping:
# Add to save_price()
with open(f"html_cache/{product_id}_{timestamp}.html", "w") as f:
f.write(raw_html)Tip 2 — Track percentage drops, not just absolute prices
def is_significant_drop(old_price, new_price, threshold_pct=5.0) -> bool:
"""Return True if price dropped by more than threshold_pct percent."""
if not old_price or not new_price:
return False
drop_pct = (old_price - new_price) / old_price * 100
return drop_pct >= threshold_pctTip 3 — Handle dynamic pricing variations Amazon shows different prices to different users based on their location, login state, browsing history, and time of day. To get consistent readings, always scrape with the same headers, same proxy location, and without cookies from previous sessions.
Tip 4 — Monitor variant-level prices on Shopify Shopify products often have multiple variants (sizes, colours) at different prices. Monitor all variants to catch deals on specific options:
all_variant_prices = {
v["title"]: float(v["price"])
for v in product["variants"]
if v.get("available")
}FAQ
Q: Is scraping Amazon prices legal? Scraping Amazon is possible and legal as long as you follow the guidelines and stay away from any login-protected personal information. Price data displayed publicly is generally fair game for personal use. Don't republish or resell the scraped data.
Q: How do I avoid getting blocked by Amazon? Use curl_cffi with impersonate="chrome120", rotate residential proxies, add 2–5 second delays between requests, and never scrape more than a few hundred products per day per IP.
Q: What if the price selector breaks? Amazon changes its HTML structure regularly. When that happens: open the product page in Chrome DevTools, find the price element, copy its updated selector, and update parse_amazon_price(). Use data- attributes when available — they're more stable than class names.
Q: Can I monitor prices on Indian e-commerce sites like Flipkart or Meesho? Yes — both are server-rendered and moderately protected. Use httpx with curl_cffi impersonation for Flipkart. Meesho is React-rendered and needs Playwright.
Summary
Component | Tool | What it does |
|---|---|---|
Amazon scraper |
| TLS impersonation + price extraction |
Shopify scraper |
| Clean structured data, no CSS selectors |
eBay scraper |
| Server-rendered price extraction |
Storage | SQLite | Price history with full trend data |
Alerts |
| Email when target price is hit |
Reporting | pandas | Price trend summary across products |
Scheduling |
| Runs every N hours automatically |
Comments (0)
Login to post a comment.