Scrapy

从基础到高级,涵盖爬虫编写、Item、Pipeline、中间件、选择器、反爬应对、分布式爬取、数据存储等核心知识点。以 Scrapy 2.11.x 为基准。


目录


一、基础篇

1.1 安装与项目结构

pip install scrapy
pip install scrapy-redis          # 分布式
pip install scrapy-splash         # JS 渲染
pip install playwright            # 现代 JS 渲染
pip install itemadapter           # Item 适配器

# 创建项目
scrapy startproject myspider
cd myspider

# 创建爬虫
scrapy genspider quotes quotes.toscrape.com
scrapy genspider -t crawl news_spider news.example.com   # CrawlSpider 模板

项目结构

myspider/
├── scrapy.cfg                  # 部署配置
└── myspider/
    ├── __init__.py
    ├── settings.py             # 全局配置
    ├── items.py                # Item 定义
    ├── middlewares.py          # 中间件
    ├── pipelines.py            # Pipeline
    └── spiders/
        ├── __init__.py
        ├── quotes.py
        └── news.py

1.2 第一个爬虫

# spiders/quotes.py
import scrapy

class QuotesSpider(scrapy.Spider):
    name = 'quotes'                                    # 爬虫名称(唯一)
    allowed_domains = ['quotes.toscrape.com']          # 允许爬取的域名
    start_urls = ['https://quotes.toscrape.com/']      # 起始 URL

    custom_settings = {                                # 爬虫级配置(覆盖全局)
        'DOWNLOAD_DELAY': 1,
        'CONCURRENT_REQUESTS': 4,
    }

    def parse(self, response):
        """默认回调函数,处理 start_urls 的响应"""
        for quote in response.css('div.quote'):
            yield {
                'text':   quote.css('span.text::text').get(),
                'author': quote.css('small.author::text').get(),
                'tags':   quote.css('div.tags a.tag::text').getall(),
            }

        # 翻页
        next_page = response.css('li.next a::attr(href)').get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)

1.3 命令行工具

# 运行爬虫
scrapy crawl quotes
scrapy crawl quotes -o output.json         # 输出到 JSON
scrapy crawl quotes -o output.csv          # 输出到 CSV
scrapy crawl quotes -o output.jsonl        # JSON Lines(推荐大数据量)
scrapy crawl quotes -s CLOSESPIDER_ITEMCOUNT=100   # 爬取100条后停止
scrapy crawl quotes -s LOG_LEVEL=DEBUG     # 调试日志

# 调试工具
scrapy shell "https://quotes.toscrape.com/"     # 交互式 Shell
scrapy fetch "https://quotes.toscrape.com/"     # 下载页面
scrapy view "https://quotes.toscrape.com/"      # 用浏览器打开下载结果

# 列出所有爬虫
scrapy list

# 检查爬虫
scrapy check quotes

# 查看配置
scrapy settings --get DOWNLOAD_DELAY

二、选择器篇

2.1 CSS 选择器

# 在 scrapy shell 中调试
# scrapy shell "https://quotes.toscrape.com/"

# 选取元素
response.css('div.quote')                      # 所有匹配元素(SelectorList)
response.css('div.quote')[0]                   # 第一个元素

# 获取文本
response.css('span.text::text').get()          # 第一个结果(None if 不存在)
response.css('span.text::text').getall()       # 所有结果(list)
response.css('span.text::text').get(default='') # 带默认值

# 获取属性
response.css('a::attr(href)').get()
response.css('img::attr(src)').getall()

# 伪类与组合
response.css('li.next a::attr(href)').get()    # 子选择器
response.css('div > p::text').get()            # 直接子元素
response.css('h1, h2::text').getall()          # 多选择器
response.css('a[href*="page"]::attr(href)').getall()  # 属性包含

# 常用 CSS 选择技巧
response.css('[class="active"]')               # 精确 class 匹配
response.css('[href^="https"]')                # href 以 https 开头
response.css('[href$=".pdf"]')                 # href 以 .pdf 结尾
response.css('div:first-child')                # 第一个子元素
response.css('tr:nth-child(2)')                # 第二个 tr

2.2 XPath 选择器

# XPath 更强大,尤其在复杂结构中
response.xpath('//div[@class="quote"]')                   # 所有 class=quote 的 div
response.xpath('//span[@class="text"]/text()').get()      # 获取文本
response.xpath('//a/@href').getall()                      # 获取属性
response.xpath('//img/@src').get()

# 常用 XPath 轴
response.xpath('//div/span/text()')            # 子元素文本
response.xpath('//div//span/text()')           # 后代元素文本
response.xpath('//span[@class="text"]')        # 属性过滤
response.xpath('//a[contains(@href, "page")]') # 属性包含
response.xpath('//a[text()="Next"]/@href')     # 文本匹配
response.xpath('//p[last()]')                  # 最后一个
response.xpath('//tr[position()>1]')           # 位置过滤
response.xpath('//div[not(@class)]')           # 没有 class 的 div

# 父级、兄弟级
response.xpath('//span[@class="author"]/parent::div')
response.xpath('//span[@class="text"]/following-sibling::span')
response.xpath('//li[@class="active"]/preceding-sibling::li')

# 文本处理
response.xpath('normalize-space(//title/text())').get()   # 去除多余空格
response.xpath('string(//div[@class="content"])').get()   # 获取所有后代文本

# 嵌套 XPath(在子选择器中使用 .//)
for quote in response.xpath('//div[@class="quote"]'):
    text   = quote.xpath('.//span[@class="text"]/text()').get()
    author = quote.xpath('.//small[@class="author"]/text()').get()

2.3 正则提取

# re() 正则提取(从文本中)
response.css('p::text').re(r'\d+')                  # 提取所有数字
response.css('p::text').re_first(r'\d+')            # 提取第一个数字

# 结合 XPath
response.xpath('//p/text()').re(r'Price: \$(\d+\.?\d*)')

# 常用正则场景
response.css('script::text').re_first(r'"token":"([^"]+)"')   # 从 JS 中提取
response.css('a::attr(href)').re(r'page=(\d+)')

2.4 嵌套与组合

def parse(self, response):
    for product in response.css('div.product'):
        # 在子元素上继续选择
        name    = product.css('h2.name::text').get('').strip()
        price   = product.css('span.price::text').re_first(r'[\d.]+')
        img_url = product.css('img::attr(data-src)').get() or \
                  product.css('img::attr(src)').get()         # 懒加载兼容

        # CSS 和 XPath 混用
        rating  = product.xpath('.//div[@class="rating"]/@data-score').get()

        yield {
            'name':    name,
            'price':   float(price) if price else None,
            'img_url': img_url,
            'rating':  rating,
        }

三、Spider 篇

3.1 Spider 基类

import scrapy
from scrapy import signals
from scrapy.http import Request, FormRequest

class BaseExampleSpider(scrapy.Spider):
    name = 'base_example'
    allowed_domains = ['example.com']

    # 自定义起始请求(动态 start_urls)
    def start_requests(self):
        urls = self.get_urls_from_db()    # 从数据库读取
        for url in urls:
            yield Request(
                url=url,
                callback=self.parse,
                errback=self.handle_error,
                meta={
                    'dont_redirect': False,
                    'handle_httpstatus_list': [404, 403],
                    'proxy': 'http://proxy:port',
                },
                headers={'Referer': 'https://example.com'},
                priority=0,               # 优先级,越大越先处理
                dont_filter=False,        # 是否过滤重复 URL
            )

    def parse(self, response):
        # 传递数据给下一个请求
        for link in response.css('a.detail::attr(href)').getall():
            yield response.follow(
                link,
                callback=self.parse_detail,
                cb_kwargs={'category': 'tech'},   # 传递额外参数给回调
                meta={'page_url': response.url},
            )

    def parse_detail(self, response, category):
        yield {
            'title':    response.css('h1::text').get(),
            'category': category,
            'source':   response.meta['page_url'],
        }

    def handle_error(self, failure):
        """处理请求错误"""
        self.logger.error(f'Request failed: {failure.request.url}')
        self.logger.error(repr(failure))

    def closed(self, reason):
        """爬虫关闭时执行"""
        self.logger.info(f'Spider closed: {reason}')
        self.logger.info(f'Total pages: {self.crawler.stats.get_value("response_received_count")}')

3.2 CrawlSpider(全站爬取)

from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor

class NewsCrawlSpider(CrawlSpider):
    name = 'news_crawl'
    allowed_domains = ['news.example.com']
    start_urls = ['https://news.example.com/']

    rules = (
        # 规则1:提取分类页链接,不解析内容
        Rule(
            LinkExtractor(allow=r'/category/\w+/'),
            follow=True                              # 继续跟进链接
        ),
        # 规则2:提取文章链接,解析内容
        Rule(
            LinkExtractor(
                allow=r'/article/\d+/',
                deny=r'/login|/register|/search',   # 排除
                restrict_css='div.article-list',     # 只在指定区域提取
            ),
            callback='parse_article',
            follow=False
        ),
    )

    def parse_article(self, response):
        yield {
            'title':      response.css('h1.title::text').get('').strip(),
            'content':    '\n'.join(response.css('div.content p::text').getall()),
            'publish_at': response.css('time::attr(datetime)').get(),
            'author':     response.css('span.author::text').get(),
            'url':        response.url,
        }

3.3 SitemapSpider

from scrapy.spiders import SitemapSpider

class SitemapExampleSpider(SitemapSpider):
    name = 'sitemap_spider'
    sitemap_urls = ['https://example.com/sitemap.xml']
    sitemap_rules = [
        ('/blog/', 'parse_blog'),
        ('/product/', 'parse_product'),
    ]
    sitemap_follow = ['/sitemap_index']   # 跟进子 sitemap

    def parse_blog(self, response):
        yield {'title': response.css('h1::text').get(), 'url': response.url}

    def parse_product(self, response):
        yield {'name': response.css('h1.product-name::text').get(), 'url': response.url}

3.4 翻页与深度爬取

class PaginationSpider(scrapy.Spider):
    name = 'pagination'
    start_urls = ['https://example.com/list?page=1']

    # 方式1:CSS/XPath 提取下一页链接
    def parse(self, response):
        for item in response.css('div.item'):
            yield {'title': item.css('h2::text').get()}

        next_url = response.css('a.next::attr(href)').get()
        if next_url:
            yield response.follow(next_url, self.parse)

    # 方式2:构造页码 URL
    def start_requests(self):
        for page in range(1, 101):
            yield scrapy.Request(
                f'https://example.com/list?page={page}',
                callback=self.parse
            )

    # 方式3:通过 meta 传递页码
    def parse(self, response):
        for item in response.css('div.item'):
            yield {'title': item.css('h2::text').get()}

        current_page = response.meta.get('page', 1)
        total_pages  = int(response.css('span.total::text').get(0))
        if current_page < total_pages:
            yield response.follow(
                f'?page={current_page + 1}',
                callback=self.parse,
                meta={'page': current_page + 1}
            )

3.5 POST 请求与表单

from scrapy.http import FormRequest, JsonRequest

class LoginSpider(scrapy.Spider):
    name = 'login_spider'
    start_urls = ['https://example.com/login']

    def parse(self, response):
        # 提取 CSRF Token
        csrf_token = response.css('input[name="csrf_token"]::attr(value)').get()
        yield FormRequest.from_response(
            response,
            formdata={
                'username':   'myuser',
                'password':   'mypass',
                'csrf_token': csrf_token,
            },
            callback=self.after_login
        )

    def after_login(self, response):
        if '欢迎' in response.text:
            self.logger.info('登录成功')
            yield scrapy.Request('https://example.com/private', callback=self.parse_private)
        else:
            self.logger.error('登录失败')

    # JSON 请求
    def parse_api(self, response):
        yield JsonRequest(
            url='https://api.example.com/data',
            data={'page': 1, 'size': 20},
            callback=self.parse_json_response
        )

    def parse_json_response(self, response):
        data = response.json()
        for item in data['list']:
            yield item

四、Item 篇

4.1 Item 定义

# items.py
import scrapy
from itemloaders.processors import TakeFirst, MapCompose, Join
from w3lib.html import remove_tags

class ArticleItem(scrapy.Item):
    title      = scrapy.Field()
    url        = scrapy.Field()
    author     = scrapy.Field()
    content    = scrapy.Field()
    tags       = scrapy.Field()
    publish_at = scrapy.Field()
    image_urls = scrapy.Field()    # ImagesPipeline 使用
    images     = scrapy.Field()    # ImagesPipeline 填充结果


class ProductItem(scrapy.Item):
    name      = scrapy.Field()
    price     = scrapy.Field()
    sku       = scrapy.Field()
    category  = scrapy.Field()
    images    = scrapy.Field()
    in_stock  = scrapy.Field()
    crawled_at = scrapy.Field()

4.2 ItemLoader

from scrapy.loader import ItemLoader
from itemloaders.processors import TakeFirst, MapCompose, Join
from w3lib.html import remove_tags, strip_html5_whitespace
import re

def clean_price(value):
    """清洗价格字段"""
    return float(re.sub(r'[^\d.]', '', value)) if value else None

def clean_text(value):
    return value.strip() if value else None

class ArticleLoader(ItemLoader):
    default_item_class = ArticleItem

    # 输入处理器:处理原始数据
    # 输出处理器:多个值如何合并

    title_in       = MapCompose(remove_tags, str.strip)
    title_out      = TakeFirst()            # 只取第一个值

    content_in     = MapCompose(remove_tags, str.strip)
    content_out    = Join('\n')             # 多段文本用换行合并

    tags_out       = list                   # 保留所有值(list)
    author_out     = TakeFirst()

    price_in       = MapCompose(clean_price)
    price_out      = TakeFirst()

    # 默认处理器
    default_input_processor  = MapCompose(str.strip)
    default_output_processor = TakeFirst()


# 在 Spider 中使用
def parse_article(self, response):
    loader = ArticleLoader(item=ArticleItem(), response=response)

    loader.add_css('title',      'h1.title')
    loader.add_css('author',     'span.author::text')
    loader.add_css('content',    'div.content p')
    loader.add_css('tags',       'a.tag::text')
    loader.add_css('publish_at', 'time::attr(datetime)')
    loader.add_value('url',      response.url)

    # 也可以 add_xpath
    loader.add_xpath('price', '//span[@class="price"]/text()')

    # 直接赋值
    loader.add_value('crawled_at', datetime.now().isoformat())

    yield loader.load_item()

五、Pipeline 篇

5.1 数据清洗 Pipeline

# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import re

class CleanDataPipeline:
    """数据清洗"""

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # 必填字段检查
        if not adapter.get('title'):
            raise DropItem(f'缺少 title: {item}')

        # 清洗文本
        for field in ['title', 'author', 'content']:
            if adapter.get(field):
                adapter[field] = adapter[field].strip()

        # 清洗价格
        if adapter.get('price'):
            price_str = str(adapter['price'])
            price = re.sub(r'[^\d.]', '', price_str)
            adapter['price'] = float(price) if price else None

        # 补充默认值
        if not adapter.get('tags'):
            adapter['tags'] = []

        return item

5.2 去重 Pipeline

class DuplicateFilterPipeline:
    """基于内存的 URL 去重"""

    def __init__(self):
        self.seen_urls = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        url = adapter.get('url')
        if url in self.seen_urls:
            raise DropItem(f'重复 URL: {url}')
        self.seen_urls.add(url)
        return item


class RedisDuplicatePipeline:
    """基于 Redis 的持久化去重(分布式场景)"""

    def __init__(self, redis_url):
        self.redis_url = redis_url

    @classmethod
    def from_crawler(cls, crawler):
        return cls(redis_url=crawler.settings.get('REDIS_URL', 'redis://localhost:6379/0'))

    def open_spider(self, spider):
        import redis
        self.redis = redis.from_url(self.redis_url)
        self.key = f'scrapy:seen:{spider.name}'

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        url = adapter.get('url', '')
        if self.redis.sismember(self.key, url):
            raise DropItem(f'重复: {url}')
        self.redis.sadd(self.key, url)
        return item

5.3 存储到 MySQL

import pymysql
from twisted.enterprise import adbapi

class MySQLPipeline:
    """同步 MySQL Pipeline"""

    def __init__(self, db_config):
        self.db_config = db_config

    @classmethod
    def from_crawler(cls, crawler):
        return cls(db_config={
            'host':     crawler.settings.get('MYSQL_HOST', 'localhost'),
            'port':     crawler.settings.get('MYSQL_PORT', 3306),
            'user':     crawler.settings.get('MYSQL_USER'),
            'password': crawler.settings.get('MYSQL_PASSWORD'),
            'database': crawler.settings.get('MYSQL_DATABASE'),
            'charset':  'utf8mb4',
        })

    def open_spider(self, spider):
        self.conn = pymysql.connect(**self.db_config)
        self.cursor = self.conn.cursor()

    def close_spider(self, spider):
        self.conn.commit()
        self.conn.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        try:
            self.cursor.execute(
                """
                INSERT INTO articles (title, url, author, content, publish_at)
                VALUES (%s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    title = VALUES(title),
                    content = VALUES(content)
                """,
                (
                    adapter.get('title'),
                    adapter.get('url'),
                    adapter.get('author'),
                    adapter.get('content'),
                    adapter.get('publish_at'),
                )
            )
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            spider.logger.error(f'MySQL 写入失败: {e}')
        return item


class AsyncMySQLPipeline:
    """异步 MySQL Pipeline(推荐,性能更好)"""

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            dbpool=adbapi.ConnectionPool(
                'pymysql',
                host=crawler.settings.get('MYSQL_HOST'),
                port=crawler.settings.get('MYSQL_PORT', 3306),
                user=crawler.settings.get('MYSQL_USER'),
                password=crawler.settings.get('MYSQL_PASSWORD'),
                db=crawler.settings.get('MYSQL_DATABASE'),
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor,
                use_unicode=True,
            )
        )

    def __init__(self, dbpool):
        self.dbpool = dbpool

    def process_item(self, item, spider):
        return self.dbpool.runInteraction(self._insert, item)

    def _insert(self, tx, item):
        adapter = ItemAdapter(item)
        tx.execute(
            'INSERT IGNORE INTO articles (title, url, author) VALUES (%s, %s, %s)',
            (adapter.get('title'), adapter.get('url'), adapter.get('author'))
        )

5.4 存储到 MongoDB

import pymongo

class MongoDBPipeline:
    collection = 'articles'

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy')
        )

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db  = mongo_db

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        self.db[self.collection].update_one(
            {'url': adapter.get('url')},        # 按 url 去重
            {'$set': dict(adapter)},
            upsert=True                          # 不存在则插入
        )
        return item

5.5 存储到 Redis

import redis
import json

class RedisPipeline:
    """存储到 Redis List,供消费者处理"""

    @classmethod
    def from_crawler(cls, crawler):
        return cls(redis_url=crawler.settings.get('REDIS_URL'))

    def __init__(self, redis_url):
        self.redis_url = redis_url

    def open_spider(self, spider):
        self.redis = redis.from_url(self.redis_url)
        self.key = f'scrapy:items:{spider.name}'

    def process_item(self, item, spider):
        self.redis.lpush(self.key, json.dumps(dict(ItemAdapter(item))))
        return item

5.6 图片/文件下载

# settings.py
ITEM_PIPELINES = {
    'scrapy.pipelines.images.ImagesPipeline': 100,
    'myspider.pipelines.CustomImagesPipeline': 200,
}
IMAGES_STORE = './images'
IMAGES_THUMBS = {
    'small':  (50, 50),
    'medium': (200, 200),
}
IMAGES_MIN_HEIGHT = 100
IMAGES_MIN_WIDTH  = 100

# items.py
class ProductItem(scrapy.Item):
    image_urls = scrapy.Field()   # 必须用这个字段名
    images     = scrapy.Field()   # 下载结果自动填充到这里

# spider 中
yield ProductItem(image_urls=['https://example.com/img1.jpg'])


# 自定义图片路径
from scrapy.pipelines.images import ImagesPipeline

class CustomImagesPipeline(ImagesPipeline):
    def file_path(self, request, response=None, info=None, item=None):
        """自定义保存路径"""
        adapter = ItemAdapter(item)
        category = adapter.get('category', 'misc')
        filename = request.url.split('/')[-1]
        return f'{category}/{filename}'

    def get_media_requests(self, item, info):
        """过滤无效 URL"""
        adapter = ItemAdapter(item)
        for url in adapter.get('image_urls', []):
            if url and url.startswith('http'):
                yield scrapy.Request(url)

    def item_completed(self, results, item, info):
        """下载完成后处理"""
        image_paths = [r['path'] for ok, r in results if ok]
        if not image_paths:
            raise DropItem('图片下载失败')
        adapter = ItemAdapter(item)
        adapter['image_paths'] = image_paths
        return item

六、中间件篇

6.1 下载中间件结构

class MyDownloaderMiddleware:
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)

    def __init__(self, settings):
        self.settings = settings

    def process_request(self, request, spider):
        """处理请求(发送前)
        返回 None      → 继续处理(传给下一个中间件)
        返回 Response  → 直接返回,不再发送请求
        返回 Request   → 重新调度该请求
        抛出 IgnoreRequest → 丢弃该请求
        """
        return None

    def process_response(self, request, response, spider):
        """处理响应(收到后)
        返回 Response  → 继续处理
        返回 Request   → 重新调度
        抛出 IgnoreRequest → 丢弃
        """
        return response

    def process_exception(self, request, exception, spider):
        """处理异常
        返回 None     → 继续传播
        返回 Response → 转为正常响应
        返回 Request  → 重新调度
        """
        pass

6.2 随机 User-Agent 中间件

import random

class RandomUserAgentMiddleware:
    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
    ]

    def process_request(self, request, spider):
        request.headers['User-Agent'] = random.choice(self.USER_AGENTS)

6.3 代理 IP 中间件

import random
import requests

class ProxyMiddleware:
    """静态代理池"""

    def __init__(self, proxies):
        self.proxies = proxies

    @classmethod
    def from_crawler(cls, crawler):
        proxies = crawler.settings.getlist('PROXY_LIST', [])
        return cls(proxies)

    def process_request(self, request, spider):
        if self.proxies:
            proxy = random.choice(self.proxies)
            request.meta['proxy'] = proxy
            spider.logger.debug(f'使用代理: {proxy}')


class DynamicProxyMiddleware:
    """动态代理池(从代理 API 获取)"""

    def __init__(self, proxy_api):
        self.proxy_api = proxy_api
        self.proxies   = []

    @classmethod
    def from_crawler(cls, crawler):
        return cls(proxy_api=crawler.settings.get('PROXY_API_URL'))

    def get_proxy(self):
        try:
            resp = requests.get(self.proxy_api, timeout=5)
            return resp.text.strip()
        except Exception:
            return None

    def process_request(self, request, spider):
        proxy = self.get_proxy()
        if proxy:
            request.meta['proxy'] = f'http://{proxy}'

    def process_exception(self, request, exception, spider):
        """代理失败时重试(换代理)"""
        proxy = self.get_proxy()
        if proxy:
            request.meta['proxy'] = f'http://{proxy}'
            return request    # 重新发送

6.4 Selenium 集成

# pip install selenium
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse

class SeleniumMiddleware:
    def __init__(self):
        options = Options()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_experimental_option('excludeSwitches', ['enable-automation'])
        self.driver = webdriver.Chrome(options=options)

    @classmethod
    def from_crawler(cls, crawler):
        return cls()

    def process_request(self, request, spider):
        if not request.meta.get('use_selenium'):
            return None    # 不是 selenium 请求,跳过

        self.driver.get(request.url)

        # 等待元素加载
        try:
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, 'div.content'))
            )
        except Exception:
            pass

        # 滚动到底部(加载懒加载内容)
        self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
        import time
        time.sleep(2)

        return HtmlResponse(
            url=self.driver.current_url,
            body=self.driver.page_source.encode('utf-8'),
            encoding='utf-8',
            request=request
        )

    def spider_closed(self, spider):
        self.driver.quit()

# Spider 中使用
yield scrapy.Request(url, meta={'use_selenium': True}, callback=self.parse)

6.5 Playwright 集成(推荐替代 Selenium)

# pip install scrapy-playwright
# playwright install chromium

# settings.py
DOWNLOAD_HANDLERS = {
    'http':  'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
    'https': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
}
TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'

# spider 中使用
from scrapy_playwright.page import PageMethod

yield scrapy.Request(
    url,
    meta={
        'playwright': True,
        'playwright_include_page': True,
        'playwright_page_methods': [
            PageMethod('wait_for_selector', 'div.content'),
            PageMethod('evaluate', 'window.scrollTo(0, document.body.scrollHeight)'),
            PageMethod('wait_for_timeout', 2000),
        ],
    },
    callback=self.parse_js
)

async def parse_js(self, response):
    page = response.meta['playwright_page']
    # 可以继续操作页面
    await page.click('button.load-more')
    await page.wait_for_timeout(1000)
    content = await page.content()
    await page.close()
    # 使用 Scrapy 选择器处理 content

七、Settings 配置篇

# settings.py

# ---- 基础配置 ----
BOT_NAME = 'myspider'
USER_AGENT = 'Mozilla/5.0 (compatible; MySpider/1.0)'

# ---- 并发配置 ----
CONCURRENT_REQUESTS = 16                   # 全局并发请求数(默认16)
CONCURRENT_REQUESTS_PER_DOMAIN = 8        # 每个域名并发数
CONCURRENT_REQUESTS_PER_IP = 0            # 每个IP并发数(0=不限)

# ---- 延迟配置 ----
DOWNLOAD_DELAY = 0.5                       # 请求间隔(秒)
RANDOMIZE_DOWNLOAD_DELAY = True           # 随机化延迟(0.5~1.5倍)

# ---- 自动限速 ----
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 4.0
AUTOTHROTTLE_DEBUG = False

# ---- 重试配置 ----
RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
RETRY_BACKOFF_ENABLED = True              # 退避重试
RETRY_BACKOFF_MAX_SECONDS = 30

# ---- 超时配置 ----
DOWNLOAD_TIMEOUT = 30                      # 下载超时(秒)

# ---- 去重配置 ----
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'   # 默认基于内存
# DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'  # Redis 持久化

# ---- 缓存配置 ----
HTTPCACHE_ENABLED = True                   # 开发时启用缓存,避免重复请求
HTTPCACHE_EXPIRATION_SECS = 86400
HTTPCACHE_DIR = '.scrapy/httpcache'
HTTPCACHE_IGNORE_HTTP_CODES = [500, 503]

# ---- 日志配置 ----
LOG_LEVEL = 'INFO'
LOG_FILE = 'scrapy.log'
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'

# ---- Pipeline 优先级(数字越小优先级越高)----
ITEM_PIPELINES = {
    'myspider.pipelines.CleanDataPipeline':      100,
    'myspider.pipelines.DuplicateFilterPipeline': 200,
    'myspider.pipelines.MySQLPipeline':           300,
    'myspider.pipelines.MongoDBPipeline':         400,
}

# ---- 中间件优先级 ----
DOWNLOADER_MIDDLEWARES = {
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,  # 禁用默认
    'myspider.middlewares.RandomUserAgentMiddleware': 400,
    'myspider.middlewares.ProxyMiddleware':            500,
    'myspider.middlewares.SeleniumMiddleware':         800,
}

SPIDER_MIDDLEWARES = {
    'scrapy.spidermiddlewares.httperror.HttpErrorMiddleware': 50,
    'myspider.middlewares.MySpiderMiddleware': 543,
}

# ---- HTTP 配置 ----
DEFAULT_REQUEST_HEADERS = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
COOKIES_ENABLED = True
COOKIES_DEBUG = False

# ---- 深度配置 ----
DEPTH_LIMIT = 0                           # 0=不限深度
DEPTH_PRIORITY = 1                        # 深度优先(1=BFS,-1=DFS)

# ---- 扩展配置 ----
EXTENSIONS = {
    'scrapy.extensions.telnet.TelnetConsole': None,    # 禁用 Telnet
    'scrapy.extensions.corestats.CoreStats':  500,
    'scrapy.extensions.memusage.MemoryUsage': 500,
}
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 512                   # 超过内存限制关闭爬虫

# ---- 关闭条件 ----
CLOSESPIDER_ITEMCOUNT  = 0    # 爬取N条后停止(0=不限)
CLOSESPIDER_PAGECOUNT  = 0    # 爬取N页后停止
CLOSESPIDER_ERRORCOUNT = 10   # 错误超过N次停止
CLOSESPIDER_TIMEOUT    = 0    # 运行时间超过N秒停止(0=不限)

# ---- 数据库配置 ----
MYSQL_HOST     = 'localhost'
MYSQL_PORT     = 3306
MYSQL_USER     = 'root'
MYSQL_PASSWORD = 'password'
MYSQL_DATABASE = 'scrapy_db'
MONGO_URI      = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy_db'
REDIS_URL      = 'redis://localhost:6379/0'

八、反爬应对篇

8.1 Headers 伪装

# 完整浏览器 Headers
DEFAULT_REQUEST_HEADERS = {
    'User-Agent':       'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept':           'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language':  'zh-CN,zh;q=0.9,en;q=0.8',
    'Accept-Encoding':  'gzip, deflate, br',
    'Connection':       'keep-alive',
    'Upgrade-Insecure-Requests': '1',
    'Sec-Fetch-Dest':   'document',
    'Sec-Fetch-Mode':   'navigate',
    'Sec-Fetch-Site':   'none',
}

# 动态设置 Referer
class RefererMiddleware:
    def process_request(self, request, spider):
        if 'referer' not in request.headers:
            request.headers['Referer'] = 'https://www.google.com'


8.3 限速与随机延迟策略

import random
import time
from scrapy.downloadermiddlewares.retry import RetryMiddleware

class SmartDelayMiddleware:
    """智能延迟:根据响应状态动态调整"""

    def process_response(self, request, response, spider):
        if response.status == 429:
            # 被限流,等待更长时间
            retry_after = int(response.headers.get('Retry-After', 60))
            spider.logger.warning(f'被限流,等待 {retry_after} 秒')
            time.sleep(retry_after)
            return request    # 重新发送

        if response.status == 403:
            # 被封,换 IP
            spider.logger.warning('IP 被封,切换代理')
            request.meta['proxy'] = self.get_new_proxy()
            return request

        return response

九、异步与性能篇

9.1 并发控制

# 按域名限制并发
CONCURRENT_REQUESTS_PER_DOMAIN = 4

# 下载中间件中动态控制
class RequestQueueMiddleware:
    def process_request(self, request, spider):
        # 高价值 URL 提高优先级
        if '/detail/' in request.url:
            request.priority = 10
        return None

9.2 自动限速(AutoThrottle)

# 根据服务器响应时间自动调整请求速度
AUTOTHROTTLE_ENABLED          = True
AUTOTHROTTLE_START_DELAY      = 1      # 初始延迟
AUTOTHROTTLE_MAX_DELAY        = 60     # 最大延迟
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0  # 目标并发数
AUTOTHROTTLE_DEBUG            = True   # 在日志中显示限速信息

9.3 HTTP 缓存(开发调试用)

HTTPCACHE_ENABLED            = True
HTTPCACHE_EXPIRATION_SECS    = 3600    # 缓存1小时
HTTPCACHE_DIR                = '.scrapy/httpcache'
HTTPCACHE_STORAGE            = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
HTTPCACHE_IGNORE_HTTP_CODES  = [500, 503, 404]
HTTPCACHE_POLICY             = 'scrapy.extensions.httpcache.RFC2616Policy'

十、分布式篇

10.1 scrapy-redis

pip install scrapy-redis
# settings.py
SCHEDULER            = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS     = 'scrapy_redis.dupefilter.RFPDupeFilter'
REDIS_URL            = 'redis://localhost:6379'
SCHEDULER_PERSIST    = True      # 爬虫关闭后保留请求队列
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'   # 优先级队列

# 也可以用 FIFO 或 LIFO
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderQueue'       # FIFO
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack'       # LIFO(深度优先)

ITEM_PIPELINES = {
    'scrapy_redis.pipelines.RedisPipeline': 300,   # 将 item 推入 Redis
}
# spiders/distributed_spider.py
from scrapy_redis.spiders import RedisSpider

class DistributedSpider(RedisSpider):
    name = 'distributed'
    redis_key = 'distributed:start_urls'    # 从 Redis 读取起始 URL

    def parse(self, response):
        for item in response.css('div.item'):
            yield {
                'title': item.css('h2::text').get(),
                'url':   response.url,
            }

        # 正常翻页,URL 会自动推入 Redis 队列
        next_url = response.css('a.next::attr(href)').get()
        if next_url:
            yield response.follow(next_url, self.parse)
# 启动多个 worker(不同机器)
scrapy crawl distributed

# 向 Redis 推入起始 URL
redis-cli lpush distributed:start_urls "https://example.com/page/1"
redis-cli lpush distributed:start_urls "https://example.com/page/2"

10.2 分布式架构

Master(调度机)
  ├── Redis(URL 队列 + 去重集合 + Item 队列)
  │
Worker 1 → scrapy crawl distributed
Worker 2 → scrapy crawl distributed
Worker 3 → scrapy crawl distributed
  │
Consumer(数据消费)→ 从 Redis 读取 Item → 写入 MySQL / MongoDB
# 消费者脚本(独立运行)
import redis
import json
import pymysql

r = redis.from_url('redis://localhost:6379')
conn = pymysql.connect(...)

while True:
    # 阻塞获取 Item
    result = r.blpop('distributed:items', timeout=30)
    if result:
        _, data = result
        item = json.loads(data)
        # 写入数据库
        cursor = conn.cursor()
        cursor.execute('INSERT INTO ...')
        conn.commit()

十一、数据存储篇

11.1 导出到文件

# 内置格式
scrapy crawl quotes -o output.json       # JSON(不适合大文件,需全部加载到内存)
scrapy crawl quotes -o output.jsonl      # JSON Lines(推荐,逐行写入)
scrapy crawl quotes -o output.csv
scrapy crawl quotes -o output.xml

# 追加模式(-o 会覆盖文件,追加使用 a:)
scrapy crawl quotes -o a:output.jsonl

# 自定义导出
FEEDS = {
    'output/%(name)s_%(time)s.jsonl': {
        'format': 'jsonlines',
        'encoding': 'utf-8',
        'store_empty': False,
        'item_classes': ['myspider.items.ArticleItem'],   # 只导出指定 Item
        'fields': ['title', 'url', 'author'],             # 只导出指定字段
        'overwrite': True,
    },
    # 同时导出到 S3
    's3://bucket/%(name)s.jsonl': {
        'format': 'jsonlines',
        'boto_credentials': {...},
    }
}

十二、调试与监控篇

12.1 Scrapy Shell

# 启动 Shell
scrapy shell "https://quotes.toscrape.com/"

# Shell 中常用操作
response.url
response.status
response.headers
response.text[:500]

response.css('div.quote')
response.css('span.text::text').get()
response.xpath('//span[@class="text"]/text()').get()

# 发送请求
fetch('https://quotes.toscrape.com/page/2/')

# 查看 Item
from myspider.items import ArticleItem
item = ArticleItem()
item['title'] = response.css('h1::text').get()

# 查看 Spider
spider
spider.name

12.2 Stats 统计

# 在 Spider 中访问统计信息
class MySpider(scrapy.Spider):
    def closed(self, reason):
        stats = self.crawler.stats.get_stats()
        self.logger.info(f"""
        爬取完成:
        - 总请求数:    {stats.get('downloader/request_count', 0)}
        - 成功响应:    {stats.get('downloader/response_count', 0)}
        - 错误数:      {stats.get('downloader/exception_count', 0)}
        - Item 数量:   {stats.get('item_scraped_count', 0)}
        - 丢弃 Item:   {stats.get('item_dropped_count', 0)}
        - 重试次数:    {stats.get('retry/count', 0)}
        - 运行时间:    {stats.get('elapsed_time_seconds', 0):.1f}s
        """)

    def process_item(self, item, spider):
        # Pipeline 中增加自定义统计
        spider.crawler.stats.inc_value('custom/items_written')

12.3 日志配置

import logging

class MySpider(scrapy.Spider):
    name = 'myspider'

    def parse(self, response):
        self.logger.debug('调试信息')
        self.logger.info(f'正在处理: {response.url}')
        self.logger.warning('警告信息')
        self.logger.error('错误信息')

# settings.py 日志配置
LOG_LEVEL   = 'INFO'
LOG_FILE    = 'logs/scrapy.log'
LOG_FORMAT  = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
LOG_DATEFORMAT = '%Y-%m-%d %H:%M:%S'
LOG_STDOUT  = False    # 不输出到 stdout

十三、部署篇

13.1 Scrapyd

# pip install scrapyd scrapyd-client

# 启动 Scrapyd 服务
scrapyd

# 部署爬虫
scrapyd-deploy default -p myspider

# API 操作
# 启动爬虫
curl http://localhost:6800/schedule.json -d project=myspider -d spider=quotes

# 查看任务
curl http://localhost:6800/listjobs.json?project=myspider

# 取消任务
curl http://localhost:6800/cancel.json -d project=myspider -d job=<job_id>

13.2 Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y \
    gcc libffi-dev libssl-dev \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["scrapy", "crawl", "quotes", "-o", "/data/output.jsonl"]
# docker-compose.yml
version: "3.8"

services:
  spider:
    build: .
    volumes:
      - ./data:/data
    environment:
      - MYSQL_HOST=db
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    command: scrapy crawl quotes

  # 分布式 worker(可扩展多个)
  worker1:
    build: .
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    command: scrapy crawl distributed

  worker2:
    build: .
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis
    command: scrapy crawl distributed

  db:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: scrapy_db
    volumes:
      - mysql_data:/var/lib/mysql

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  mysql_data:
  redis_data:

常用命令速查

# 项目管理
scrapy startproject <name>              # 创建项目
scrapy genspider <name> <domain>        # 创建爬虫
scrapy genspider -t crawl <name> <domain>  # 创建 CrawlSpider

# 运行
scrapy crawl <spider>                   # 运行爬虫
scrapy crawl <spider> -o out.jsonl      # 输出文件
scrapy crawl <spider> -s LOG_LEVEL=DEBUG  # 调试模式
scrapy crawl <spider> -s CLOSESPIDER_ITEMCOUNT=100  # 限制数量

# 调试
scrapy shell <url>                      # 交互式调试
scrapy fetch <url>                      # 下载页面
scrapy view <url>                       # 浏览器查看

# 检查
scrapy list                             # 列出所有爬虫
scrapy check <spider>                   # 检查爬虫契约
scrapy settings --get <key>             # 查看配置值

常用扩展汇总

功能

scrapy-redis

分布式爬取

scrapy-playwright

Playwright JS 渲染

scrapy-splash

Splash JS 渲染

scrapy-fake-useragent

随机 UA

scrapy-rotating-proxies

自动轮换代理

itemadapter

Item 统一适配器

w3lib

HTML/URL 工具

parsel

独立选择器库(Scrapy 内部使用)

scrapyd

爬虫部署服务

spidermon

爬虫监控


参考资源