Scrapy¶
从基础到高级,涵盖爬虫编写、Item、Pipeline、中间件、选择器、反爬应对、分布式爬取、数据存储等核心知识点。以 Scrapy 2.11.x 为基准。
目录¶
一、基础篇¶
1.1 安装与项目结构¶
pip install scrapy
pip install scrapy-redis # 分布式
pip install scrapy-splash # JS 渲染
pip install playwright # 现代 JS 渲染
pip install itemadapter # Item 适配器
# 创建项目
scrapy startproject myspider
cd myspider
# 创建爬虫
scrapy genspider quotes quotes.toscrape.com
scrapy genspider -t crawl news_spider news.example.com # CrawlSpider 模板
项目结构
myspider/
├── scrapy.cfg # 部署配置
└── myspider/
├── __init__.py
├── settings.py # 全局配置
├── items.py # Item 定义
├── middlewares.py # 中间件
├── pipelines.py # Pipeline
└── spiders/
├── __init__.py
├── quotes.py
└── news.py
1.2 第一个爬虫¶
# spiders/quotes.py
import scrapy
class QuotesSpider(scrapy.Spider):
name = 'quotes' # 爬虫名称(唯一)
allowed_domains = ['quotes.toscrape.com'] # 允许爬取的域名
start_urls = ['https://quotes.toscrape.com/'] # 起始 URL
custom_settings = { # 爬虫级配置(覆盖全局)
'DOWNLOAD_DELAY': 1,
'CONCURRENT_REQUESTS': 4,
}
def parse(self, response):
"""默认回调函数,处理 start_urls 的响应"""
for quote in response.css('div.quote'):
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('small.author::text').get(),
'tags': quote.css('div.tags a.tag::text').getall(),
}
# 翻页
next_page = response.css('li.next a::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
1.3 命令行工具¶
# 运行爬虫
scrapy crawl quotes
scrapy crawl quotes -o output.json # 输出到 JSON
scrapy crawl quotes -o output.csv # 输出到 CSV
scrapy crawl quotes -o output.jsonl # JSON Lines(推荐大数据量)
scrapy crawl quotes -s CLOSESPIDER_ITEMCOUNT=100 # 爬取100条后停止
scrapy crawl quotes -s LOG_LEVEL=DEBUG # 调试日志
# 调试工具
scrapy shell "https://quotes.toscrape.com/" # 交互式 Shell
scrapy fetch "https://quotes.toscrape.com/" # 下载页面
scrapy view "https://quotes.toscrape.com/" # 用浏览器打开下载结果
# 列出所有爬虫
scrapy list
# 检查爬虫
scrapy check quotes
# 查看配置
scrapy settings --get DOWNLOAD_DELAY
二、选择器篇¶
2.1 CSS 选择器¶
# 在 scrapy shell 中调试
# scrapy shell "https://quotes.toscrape.com/"
# 选取元素
response.css('div.quote') # 所有匹配元素(SelectorList)
response.css('div.quote')[0] # 第一个元素
# 获取文本
response.css('span.text::text').get() # 第一个结果(None if 不存在)
response.css('span.text::text').getall() # 所有结果(list)
response.css('span.text::text').get(default='') # 带默认值
# 获取属性
response.css('a::attr(href)').get()
response.css('img::attr(src)').getall()
# 伪类与组合
response.css('li.next a::attr(href)').get() # 子选择器
response.css('div > p::text').get() # 直接子元素
response.css('h1, h2::text').getall() # 多选择器
response.css('a[href*="page"]::attr(href)').getall() # 属性包含
# 常用 CSS 选择技巧
response.css('[class="active"]') # 精确 class 匹配
response.css('[href^="https"]') # href 以 https 开头
response.css('[href$=".pdf"]') # href 以 .pdf 结尾
response.css('div:first-child') # 第一个子元素
response.css('tr:nth-child(2)') # 第二个 tr
2.2 XPath 选择器¶
# XPath 更强大,尤其在复杂结构中
response.xpath('//div[@class="quote"]') # 所有 class=quote 的 div
response.xpath('//span[@class="text"]/text()').get() # 获取文本
response.xpath('//a/@href').getall() # 获取属性
response.xpath('//img/@src').get()
# 常用 XPath 轴
response.xpath('//div/span/text()') # 子元素文本
response.xpath('//div//span/text()') # 后代元素文本
response.xpath('//span[@class="text"]') # 属性过滤
response.xpath('//a[contains(@href, "page")]') # 属性包含
response.xpath('//a[text()="Next"]/@href') # 文本匹配
response.xpath('//p[last()]') # 最后一个
response.xpath('//tr[position()>1]') # 位置过滤
response.xpath('//div[not(@class)]') # 没有 class 的 div
# 父级、兄弟级
response.xpath('//span[@class="author"]/parent::div')
response.xpath('//span[@class="text"]/following-sibling::span')
response.xpath('//li[@class="active"]/preceding-sibling::li')
# 文本处理
response.xpath('normalize-space(//title/text())').get() # 去除多余空格
response.xpath('string(//div[@class="content"])').get() # 获取所有后代文本
# 嵌套 XPath(在子选择器中使用 .//)
for quote in response.xpath('//div[@class="quote"]'):
text = quote.xpath('.//span[@class="text"]/text()').get()
author = quote.xpath('.//small[@class="author"]/text()').get()
2.3 正则提取¶
# re() 正则提取(从文本中)
response.css('p::text').re(r'\d+') # 提取所有数字
response.css('p::text').re_first(r'\d+') # 提取第一个数字
# 结合 XPath
response.xpath('//p/text()').re(r'Price: \$(\d+\.?\d*)')
# 常用正则场景
response.css('script::text').re_first(r'"token":"([^"]+)"') # 从 JS 中提取
response.css('a::attr(href)').re(r'page=(\d+)')
2.4 嵌套与组合¶
def parse(self, response):
for product in response.css('div.product'):
# 在子元素上继续选择
name = product.css('h2.name::text').get('').strip()
price = product.css('span.price::text').re_first(r'[\d.]+')
img_url = product.css('img::attr(data-src)').get() or \
product.css('img::attr(src)').get() # 懒加载兼容
# CSS 和 XPath 混用
rating = product.xpath('.//div[@class="rating"]/@data-score').get()
yield {
'name': name,
'price': float(price) if price else None,
'img_url': img_url,
'rating': rating,
}
三、Spider 篇¶
3.1 Spider 基类¶
import scrapy
from scrapy import signals
from scrapy.http import Request, FormRequest
class BaseExampleSpider(scrapy.Spider):
name = 'base_example'
allowed_domains = ['example.com']
# 自定义起始请求(动态 start_urls)
def start_requests(self):
urls = self.get_urls_from_db() # 从数据库读取
for url in urls:
yield Request(
url=url,
callback=self.parse,
errback=self.handle_error,
meta={
'dont_redirect': False,
'handle_httpstatus_list': [404, 403],
'proxy': 'http://proxy:port',
},
headers={'Referer': 'https://example.com'},
priority=0, # 优先级,越大越先处理
dont_filter=False, # 是否过滤重复 URL
)
def parse(self, response):
# 传递数据给下一个请求
for link in response.css('a.detail::attr(href)').getall():
yield response.follow(
link,
callback=self.parse_detail,
cb_kwargs={'category': 'tech'}, # 传递额外参数给回调
meta={'page_url': response.url},
)
def parse_detail(self, response, category):
yield {
'title': response.css('h1::text').get(),
'category': category,
'source': response.meta['page_url'],
}
def handle_error(self, failure):
"""处理请求错误"""
self.logger.error(f'Request failed: {failure.request.url}')
self.logger.error(repr(failure))
def closed(self, reason):
"""爬虫关闭时执行"""
self.logger.info(f'Spider closed: {reason}')
self.logger.info(f'Total pages: {self.crawler.stats.get_value("response_received_count")}')
3.2 CrawlSpider(全站爬取)¶
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class NewsCrawlSpider(CrawlSpider):
name = 'news_crawl'
allowed_domains = ['news.example.com']
start_urls = ['https://news.example.com/']
rules = (
# 规则1:提取分类页链接,不解析内容
Rule(
LinkExtractor(allow=r'/category/\w+/'),
follow=True # 继续跟进链接
),
# 规则2:提取文章链接,解析内容
Rule(
LinkExtractor(
allow=r'/article/\d+/',
deny=r'/login|/register|/search', # 排除
restrict_css='div.article-list', # 只在指定区域提取
),
callback='parse_article',
follow=False
),
)
def parse_article(self, response):
yield {
'title': response.css('h1.title::text').get('').strip(),
'content': '\n'.join(response.css('div.content p::text').getall()),
'publish_at': response.css('time::attr(datetime)').get(),
'author': response.css('span.author::text').get(),
'url': response.url,
}
3.3 SitemapSpider¶
from scrapy.spiders import SitemapSpider
class SitemapExampleSpider(SitemapSpider):
name = 'sitemap_spider'
sitemap_urls = ['https://example.com/sitemap.xml']
sitemap_rules = [
('/blog/', 'parse_blog'),
('/product/', 'parse_product'),
]
sitemap_follow = ['/sitemap_index'] # 跟进子 sitemap
def parse_blog(self, response):
yield {'title': response.css('h1::text').get(), 'url': response.url}
def parse_product(self, response):
yield {'name': response.css('h1.product-name::text').get(), 'url': response.url}
3.4 翻页与深度爬取¶
class PaginationSpider(scrapy.Spider):
name = 'pagination'
start_urls = ['https://example.com/list?page=1']
# 方式1:CSS/XPath 提取下一页链接
def parse(self, response):
for item in response.css('div.item'):
yield {'title': item.css('h2::text').get()}
next_url = response.css('a.next::attr(href)').get()
if next_url:
yield response.follow(next_url, self.parse)
# 方式2:构造页码 URL
def start_requests(self):
for page in range(1, 101):
yield scrapy.Request(
f'https://example.com/list?page={page}',
callback=self.parse
)
# 方式3:通过 meta 传递页码
def parse(self, response):
for item in response.css('div.item'):
yield {'title': item.css('h2::text').get()}
current_page = response.meta.get('page', 1)
total_pages = int(response.css('span.total::text').get(0))
if current_page < total_pages:
yield response.follow(
f'?page={current_page + 1}',
callback=self.parse,
meta={'page': current_page + 1}
)
3.5 POST 请求与表单¶
from scrapy.http import FormRequest, JsonRequest
class LoginSpider(scrapy.Spider):
name = 'login_spider'
start_urls = ['https://example.com/login']
def parse(self, response):
# 提取 CSRF Token
csrf_token = response.css('input[name="csrf_token"]::attr(value)').get()
yield FormRequest.from_response(
response,
formdata={
'username': 'myuser',
'password': 'mypass',
'csrf_token': csrf_token,
},
callback=self.after_login
)
def after_login(self, response):
if '欢迎' in response.text:
self.logger.info('登录成功')
yield scrapy.Request('https://example.com/private', callback=self.parse_private)
else:
self.logger.error('登录失败')
# JSON 请求
def parse_api(self, response):
yield JsonRequest(
url='https://api.example.com/data',
data={'page': 1, 'size': 20},
callback=self.parse_json_response
)
def parse_json_response(self, response):
data = response.json()
for item in data['list']:
yield item
四、Item 篇¶
4.1 Item 定义¶
# items.py
import scrapy
from itemloaders.processors import TakeFirst, MapCompose, Join
from w3lib.html import remove_tags
class ArticleItem(scrapy.Item):
title = scrapy.Field()
url = scrapy.Field()
author = scrapy.Field()
content = scrapy.Field()
tags = scrapy.Field()
publish_at = scrapy.Field()
image_urls = scrapy.Field() # ImagesPipeline 使用
images = scrapy.Field() # ImagesPipeline 填充结果
class ProductItem(scrapy.Item):
name = scrapy.Field()
price = scrapy.Field()
sku = scrapy.Field()
category = scrapy.Field()
images = scrapy.Field()
in_stock = scrapy.Field()
crawled_at = scrapy.Field()
4.2 ItemLoader¶
from scrapy.loader import ItemLoader
from itemloaders.processors import TakeFirst, MapCompose, Join
from w3lib.html import remove_tags, strip_html5_whitespace
import re
def clean_price(value):
"""清洗价格字段"""
return float(re.sub(r'[^\d.]', '', value)) if value else None
def clean_text(value):
return value.strip() if value else None
class ArticleLoader(ItemLoader):
default_item_class = ArticleItem
# 输入处理器:处理原始数据
# 输出处理器:多个值如何合并
title_in = MapCompose(remove_tags, str.strip)
title_out = TakeFirst() # 只取第一个值
content_in = MapCompose(remove_tags, str.strip)
content_out = Join('\n') # 多段文本用换行合并
tags_out = list # 保留所有值(list)
author_out = TakeFirst()
price_in = MapCompose(clean_price)
price_out = TakeFirst()
# 默认处理器
default_input_processor = MapCompose(str.strip)
default_output_processor = TakeFirst()
# 在 Spider 中使用
def parse_article(self, response):
loader = ArticleLoader(item=ArticleItem(), response=response)
loader.add_css('title', 'h1.title')
loader.add_css('author', 'span.author::text')
loader.add_css('content', 'div.content p')
loader.add_css('tags', 'a.tag::text')
loader.add_css('publish_at', 'time::attr(datetime)')
loader.add_value('url', response.url)
# 也可以 add_xpath
loader.add_xpath('price', '//span[@class="price"]/text()')
# 直接赋值
loader.add_value('crawled_at', datetime.now().isoformat())
yield loader.load_item()
五、Pipeline 篇¶
5.1 数据清洗 Pipeline¶
# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import re
class CleanDataPipeline:
"""数据清洗"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 必填字段检查
if not adapter.get('title'):
raise DropItem(f'缺少 title: {item}')
# 清洗文本
for field in ['title', 'author', 'content']:
if adapter.get(field):
adapter[field] = adapter[field].strip()
# 清洗价格
if adapter.get('price'):
price_str = str(adapter['price'])
price = re.sub(r'[^\d.]', '', price_str)
adapter['price'] = float(price) if price else None
# 补充默认值
if not adapter.get('tags'):
adapter['tags'] = []
return item
5.2 去重 Pipeline¶
class DuplicateFilterPipeline:
"""基于内存的 URL 去重"""
def __init__(self):
self.seen_urls = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
url = adapter.get('url')
if url in self.seen_urls:
raise DropItem(f'重复 URL: {url}')
self.seen_urls.add(url)
return item
class RedisDuplicatePipeline:
"""基于 Redis 的持久化去重(分布式场景)"""
def __init__(self, redis_url):
self.redis_url = redis_url
@classmethod
def from_crawler(cls, crawler):
return cls(redis_url=crawler.settings.get('REDIS_URL', 'redis://localhost:6379/0'))
def open_spider(self, spider):
import redis
self.redis = redis.from_url(self.redis_url)
self.key = f'scrapy:seen:{spider.name}'
def process_item(self, item, spider):
adapter = ItemAdapter(item)
url = adapter.get('url', '')
if self.redis.sismember(self.key, url):
raise DropItem(f'重复: {url}')
self.redis.sadd(self.key, url)
return item
5.3 存储到 MySQL¶
import pymysql
from twisted.enterprise import adbapi
class MySQLPipeline:
"""同步 MySQL Pipeline"""
def __init__(self, db_config):
self.db_config = db_config
@classmethod
def from_crawler(cls, crawler):
return cls(db_config={
'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
'port': crawler.settings.get('MYSQL_PORT', 3306),
'user': crawler.settings.get('MYSQL_USER'),
'password': crawler.settings.get('MYSQL_PASSWORD'),
'database': crawler.settings.get('MYSQL_DATABASE'),
'charset': 'utf8mb4',
})
def open_spider(self, spider):
self.conn = pymysql.connect(**self.db_config)
self.cursor = self.conn.cursor()
def close_spider(self, spider):
self.conn.commit()
self.conn.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
try:
self.cursor.execute(
"""
INSERT INTO articles (title, url, author, content, publish_at)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
content = VALUES(content)
""",
(
adapter.get('title'),
adapter.get('url'),
adapter.get('author'),
adapter.get('content'),
adapter.get('publish_at'),
)
)
self.conn.commit()
except Exception as e:
self.conn.rollback()
spider.logger.error(f'MySQL 写入失败: {e}')
return item
class AsyncMySQLPipeline:
"""异步 MySQL Pipeline(推荐,性能更好)"""
@classmethod
def from_crawler(cls, crawler):
return cls(
dbpool=adbapi.ConnectionPool(
'pymysql',
host=crawler.settings.get('MYSQL_HOST'),
port=crawler.settings.get('MYSQL_PORT', 3306),
user=crawler.settings.get('MYSQL_USER'),
password=crawler.settings.get('MYSQL_PASSWORD'),
db=crawler.settings.get('MYSQL_DATABASE'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
use_unicode=True,
)
)
def __init__(self, dbpool):
self.dbpool = dbpool
def process_item(self, item, spider):
return self.dbpool.runInteraction(self._insert, item)
def _insert(self, tx, item):
adapter = ItemAdapter(item)
tx.execute(
'INSERT IGNORE INTO articles (title, url, author) VALUES (%s, %s, %s)',
(adapter.get('title'), adapter.get('url'), adapter.get('author'))
)
5.4 存储到 MongoDB¶
import pymongo
class MongoDBPipeline:
collection = 'articles'
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy')
)
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
self.db[self.collection].update_one(
{'url': adapter.get('url')}, # 按 url 去重
{'$set': dict(adapter)},
upsert=True # 不存在则插入
)
return item
5.5 存储到 Redis¶
import redis
import json
class RedisPipeline:
"""存储到 Redis List,供消费者处理"""
@classmethod
def from_crawler(cls, crawler):
return cls(redis_url=crawler.settings.get('REDIS_URL'))
def __init__(self, redis_url):
self.redis_url = redis_url
def open_spider(self, spider):
self.redis = redis.from_url(self.redis_url)
self.key = f'scrapy:items:{spider.name}'
def process_item(self, item, spider):
self.redis.lpush(self.key, json.dumps(dict(ItemAdapter(item))))
return item
5.6 图片/文件下载¶
# settings.py
ITEM_PIPELINES = {
'scrapy.pipelines.images.ImagesPipeline': 100,
'myspider.pipelines.CustomImagesPipeline': 200,
}
IMAGES_STORE = './images'
IMAGES_THUMBS = {
'small': (50, 50),
'medium': (200, 200),
}
IMAGES_MIN_HEIGHT = 100
IMAGES_MIN_WIDTH = 100
# items.py
class ProductItem(scrapy.Item):
image_urls = scrapy.Field() # 必须用这个字段名
images = scrapy.Field() # 下载结果自动填充到这里
# spider 中
yield ProductItem(image_urls=['https://example.com/img1.jpg'])
# 自定义图片路径
from scrapy.pipelines.images import ImagesPipeline
class CustomImagesPipeline(ImagesPipeline):
def file_path(self, request, response=None, info=None, item=None):
"""自定义保存路径"""
adapter = ItemAdapter(item)
category = adapter.get('category', 'misc')
filename = request.url.split('/')[-1]
return f'{category}/{filename}'
def get_media_requests(self, item, info):
"""过滤无效 URL"""
adapter = ItemAdapter(item)
for url in adapter.get('image_urls', []):
if url and url.startswith('http'):
yield scrapy.Request(url)
def item_completed(self, results, item, info):
"""下载完成后处理"""
image_paths = [r['path'] for ok, r in results if ok]
if not image_paths:
raise DropItem('图片下载失败')
adapter = ItemAdapter(item)
adapter['image_paths'] = image_paths
return item
六、中间件篇¶
6.1 下载中间件结构¶
class MyDownloaderMiddleware:
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def __init__(self, settings):
self.settings = settings
def process_request(self, request, spider):
"""处理请求(发送前)
返回 None → 继续处理(传给下一个中间件)
返回 Response → 直接返回,不再发送请求
返回 Request → 重新调度该请求
抛出 IgnoreRequest → 丢弃该请求
"""
return None
def process_response(self, request, response, spider):
"""处理响应(收到后)
返回 Response → 继续处理
返回 Request → 重新调度
抛出 IgnoreRequest → 丢弃
"""
return response
def process_exception(self, request, exception, spider):
"""处理异常
返回 None → 继续传播
返回 Response → 转为正常响应
返回 Request → 重新调度
"""
pass
6.2 随机 User-Agent 中间件¶
import random
class RandomUserAgentMiddleware:
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
]
def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(self.USER_AGENTS)
6.3 代理 IP 中间件¶
import random
import requests
class ProxyMiddleware:
"""静态代理池"""
def __init__(self, proxies):
self.proxies = proxies
@classmethod
def from_crawler(cls, crawler):
proxies = crawler.settings.getlist('PROXY_LIST', [])
return cls(proxies)
def process_request(self, request, spider):
if self.proxies:
proxy = random.choice(self.proxies)
request.meta['proxy'] = proxy
spider.logger.debug(f'使用代理: {proxy}')
class DynamicProxyMiddleware:
"""动态代理池(从代理 API 获取)"""
def __init__(self, proxy_api):
self.proxy_api = proxy_api
self.proxies = []
@classmethod
def from_crawler(cls, crawler):
return cls(proxy_api=crawler.settings.get('PROXY_API_URL'))
def get_proxy(self):
try:
resp = requests.get(self.proxy_api, timeout=5)
return resp.text.strip()
except Exception:
return None
def process_request(self, request, spider):
proxy = self.get_proxy()
if proxy:
request.meta['proxy'] = f'http://{proxy}'
def process_exception(self, request, exception, spider):
"""代理失败时重试(换代理)"""
proxy = self.get_proxy()
if proxy:
request.meta['proxy'] = f'http://{proxy}'
return request # 重新发送
6.4 Selenium 集成¶
# pip install selenium
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse
class SeleniumMiddleware:
def __init__(self):
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
self.driver = webdriver.Chrome(options=options)
@classmethod
def from_crawler(cls, crawler):
return cls()
def process_request(self, request, spider):
if not request.meta.get('use_selenium'):
return None # 不是 selenium 请求,跳过
self.driver.get(request.url)
# 等待元素加载
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.content'))
)
except Exception:
pass
# 滚动到底部(加载懒加载内容)
self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
import time
time.sleep(2)
return HtmlResponse(
url=self.driver.current_url,
body=self.driver.page_source.encode('utf-8'),
encoding='utf-8',
request=request
)
def spider_closed(self, spider):
self.driver.quit()
# Spider 中使用
yield scrapy.Request(url, meta={'use_selenium': True}, callback=self.parse)
6.5 Playwright 集成(推荐替代 Selenium)¶
# pip install scrapy-playwright
# playwright install chromium
# settings.py
DOWNLOAD_HANDLERS = {
'http': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
'https': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
}
TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'
# spider 中使用
from scrapy_playwright.page import PageMethod
yield scrapy.Request(
url,
meta={
'playwright': True,
'playwright_include_page': True,
'playwright_page_methods': [
PageMethod('wait_for_selector', 'div.content'),
PageMethod('evaluate', 'window.scrollTo(0, document.body.scrollHeight)'),
PageMethod('wait_for_timeout', 2000),
],
},
callback=self.parse_js
)
async def parse_js(self, response):
page = response.meta['playwright_page']
# 可以继续操作页面
await page.click('button.load-more')
await page.wait_for_timeout(1000)
content = await page.content()
await page.close()
# 使用 Scrapy 选择器处理 content
七、Settings 配置篇¶
# settings.py
# ---- 基础配置 ----
BOT_NAME = 'myspider'
USER_AGENT = 'Mozilla/5.0 (compatible; MySpider/1.0)'
# ---- 并发配置 ----
CONCURRENT_REQUESTS = 16 # 全局并发请求数(默认16)
CONCURRENT_REQUESTS_PER_DOMAIN = 8 # 每个域名并发数
CONCURRENT_REQUESTS_PER_IP = 0 # 每个IP并发数(0=不限)
# ---- 延迟配置 ----
DOWNLOAD_DELAY = 0.5 # 请求间隔(秒)
RANDOMIZE_DOWNLOAD_DELAY = True # 随机化延迟(0.5~1.5倍)
# ---- 自动限速 ----
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 4.0
AUTOTHROTTLE_DEBUG = False
# ---- 重试配置 ----
RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
RETRY_BACKOFF_ENABLED = True # 退避重试
RETRY_BACKOFF_MAX_SECONDS = 30
# ---- 超时配置 ----
DOWNLOAD_TIMEOUT = 30 # 下载超时(秒)
# ---- 去重配置 ----
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter' # 默认基于内存
# DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # Redis 持久化
# ---- 缓存配置 ----
HTTPCACHE_ENABLED = True # 开发时启用缓存,避免重复请求
HTTPCACHE_EXPIRATION_SECS = 86400
HTTPCACHE_DIR = '.scrapy/httpcache'
HTTPCACHE_IGNORE_HTTP_CODES = [500, 503]
# ---- 日志配置 ----
LOG_LEVEL = 'INFO'
LOG_FILE = 'scrapy.log'
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
# ---- Pipeline 优先级(数字越小优先级越高)----
ITEM_PIPELINES = {
'myspider.pipelines.CleanDataPipeline': 100,
'myspider.pipelines.DuplicateFilterPipeline': 200,
'myspider.pipelines.MySQLPipeline': 300,
'myspider.pipelines.MongoDBPipeline': 400,
}
# ---- 中间件优先级 ----
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None, # 禁用默认
'myspider.middlewares.RandomUserAgentMiddleware': 400,
'myspider.middlewares.ProxyMiddleware': 500,
'myspider.middlewares.SeleniumMiddleware': 800,
}
SPIDER_MIDDLEWARES = {
'scrapy.spidermiddlewares.httperror.HttpErrorMiddleware': 50,
'myspider.middlewares.MySpiderMiddleware': 543,
}
# ---- HTTP 配置 ----
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
COOKIES_ENABLED = True
COOKIES_DEBUG = False
# ---- 深度配置 ----
DEPTH_LIMIT = 0 # 0=不限深度
DEPTH_PRIORITY = 1 # 深度优先(1=BFS,-1=DFS)
# ---- 扩展配置 ----
EXTENSIONS = {
'scrapy.extensions.telnet.TelnetConsole': None, # 禁用 Telnet
'scrapy.extensions.corestats.CoreStats': 500,
'scrapy.extensions.memusage.MemoryUsage': 500,
}
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 512 # 超过内存限制关闭爬虫
# ---- 关闭条件 ----
CLOSESPIDER_ITEMCOUNT = 0 # 爬取N条后停止(0=不限)
CLOSESPIDER_PAGECOUNT = 0 # 爬取N页后停止
CLOSESPIDER_ERRORCOUNT = 10 # 错误超过N次停止
CLOSESPIDER_TIMEOUT = 0 # 运行时间超过N秒停止(0=不限)
# ---- 数据库配置 ----
MYSQL_HOST = 'localhost'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'password'
MYSQL_DATABASE = 'scrapy_db'
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy_db'
REDIS_URL = 'redis://localhost:6379/0'
八、反爬应对篇¶
8.1 Headers 伪装¶
# 完整浏览器 Headers
DEFAULT_REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
}
# 动态设置 Referer
class RefererMiddleware:
def process_request(self, request, spider):
if 'referer' not in request.headers:
request.headers['Referer'] = 'https://www.google.com'
8.3 限速与随机延迟策略¶
import random
import time
from scrapy.downloadermiddlewares.retry import RetryMiddleware
class SmartDelayMiddleware:
"""智能延迟:根据响应状态动态调整"""
def process_response(self, request, response, spider):
if response.status == 429:
# 被限流,等待更长时间
retry_after = int(response.headers.get('Retry-After', 60))
spider.logger.warning(f'被限流,等待 {retry_after} 秒')
time.sleep(retry_after)
return request # 重新发送
if response.status == 403:
# 被封,换 IP
spider.logger.warning('IP 被封,切换代理')
request.meta['proxy'] = self.get_new_proxy()
return request
return response
九、异步与性能篇¶
9.1 并发控制¶
# 按域名限制并发
CONCURRENT_REQUESTS_PER_DOMAIN = 4
# 下载中间件中动态控制
class RequestQueueMiddleware:
def process_request(self, request, spider):
# 高价值 URL 提高优先级
if '/detail/' in request.url:
request.priority = 10
return None
9.2 自动限速(AutoThrottle)¶
# 根据服务器响应时间自动调整请求速度
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1 # 初始延迟
AUTOTHROTTLE_MAX_DELAY = 60 # 最大延迟
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0 # 目标并发数
AUTOTHROTTLE_DEBUG = True # 在日志中显示限速信息
9.3 HTTP 缓存(开发调试用)¶
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 3600 # 缓存1小时
HTTPCACHE_DIR = '.scrapy/httpcache'
HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
HTTPCACHE_IGNORE_HTTP_CODES = [500, 503, 404]
HTTPCACHE_POLICY = 'scrapy.extensions.httpcache.RFC2616Policy'
十、分布式篇¶
10.1 scrapy-redis¶
pip install scrapy-redis
# settings.py
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
REDIS_URL = 'redis://localhost:6379'
SCHEDULER_PERSIST = True # 爬虫关闭后保留请求队列
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue' # 优先级队列
# 也可以用 FIFO 或 LIFO
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderQueue' # FIFO
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack' # LIFO(深度优先)
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300, # 将 item 推入 Redis
}
# spiders/distributed_spider.py
from scrapy_redis.spiders import RedisSpider
class DistributedSpider(RedisSpider):
name = 'distributed'
redis_key = 'distributed:start_urls' # 从 Redis 读取起始 URL
def parse(self, response):
for item in response.css('div.item'):
yield {
'title': item.css('h2::text').get(),
'url': response.url,
}
# 正常翻页,URL 会自动推入 Redis 队列
next_url = response.css('a.next::attr(href)').get()
if next_url:
yield response.follow(next_url, self.parse)
# 启动多个 worker(不同机器)
scrapy crawl distributed
# 向 Redis 推入起始 URL
redis-cli lpush distributed:start_urls "https://example.com/page/1"
redis-cli lpush distributed:start_urls "https://example.com/page/2"
10.2 分布式架构¶
Master(调度机)
├── Redis(URL 队列 + 去重集合 + Item 队列)
│
Worker 1 → scrapy crawl distributed
Worker 2 → scrapy crawl distributed
Worker 3 → scrapy crawl distributed
│
Consumer(数据消费)→ 从 Redis 读取 Item → 写入 MySQL / MongoDB
# 消费者脚本(独立运行)
import redis
import json
import pymysql
r = redis.from_url('redis://localhost:6379')
conn = pymysql.connect(...)
while True:
# 阻塞获取 Item
result = r.blpop('distributed:items', timeout=30)
if result:
_, data = result
item = json.loads(data)
# 写入数据库
cursor = conn.cursor()
cursor.execute('INSERT INTO ...')
conn.commit()
十一、数据存储篇¶
11.1 导出到文件¶
# 内置格式
scrapy crawl quotes -o output.json # JSON(不适合大文件,需全部加载到内存)
scrapy crawl quotes -o output.jsonl # JSON Lines(推荐,逐行写入)
scrapy crawl quotes -o output.csv
scrapy crawl quotes -o output.xml
# 追加模式(-o 会覆盖文件,追加使用 a:)
scrapy crawl quotes -o a:output.jsonl
# 自定义导出
FEEDS = {
'output/%(name)s_%(time)s.jsonl': {
'format': 'jsonlines',
'encoding': 'utf-8',
'store_empty': False,
'item_classes': ['myspider.items.ArticleItem'], # 只导出指定 Item
'fields': ['title', 'url', 'author'], # 只导出指定字段
'overwrite': True,
},
# 同时导出到 S3
's3://bucket/%(name)s.jsonl': {
'format': 'jsonlines',
'boto_credentials': {...},
}
}
十二、调试与监控篇¶
12.1 Scrapy Shell¶
# 启动 Shell
scrapy shell "https://quotes.toscrape.com/"
# Shell 中常用操作
response.url
response.status
response.headers
response.text[:500]
response.css('div.quote')
response.css('span.text::text').get()
response.xpath('//span[@class="text"]/text()').get()
# 发送请求
fetch('https://quotes.toscrape.com/page/2/')
# 查看 Item
from myspider.items import ArticleItem
item = ArticleItem()
item['title'] = response.css('h1::text').get()
# 查看 Spider
spider
spider.name
12.2 Stats 统计¶
# 在 Spider 中访问统计信息
class MySpider(scrapy.Spider):
def closed(self, reason):
stats = self.crawler.stats.get_stats()
self.logger.info(f"""
爬取完成:
- 总请求数: {stats.get('downloader/request_count', 0)}
- 成功响应: {stats.get('downloader/response_count', 0)}
- 错误数: {stats.get('downloader/exception_count', 0)}
- Item 数量: {stats.get('item_scraped_count', 0)}
- 丢弃 Item: {stats.get('item_dropped_count', 0)}
- 重试次数: {stats.get('retry/count', 0)}
- 运行时间: {stats.get('elapsed_time_seconds', 0):.1f}s
""")
def process_item(self, item, spider):
# Pipeline 中增加自定义统计
spider.crawler.stats.inc_value('custom/items_written')
12.3 日志配置¶
import logging
class MySpider(scrapy.Spider):
name = 'myspider'
def parse(self, response):
self.logger.debug('调试信息')
self.logger.info(f'正在处理: {response.url}')
self.logger.warning('警告信息')
self.logger.error('错误信息')
# settings.py 日志配置
LOG_LEVEL = 'INFO'
LOG_FILE = 'logs/scrapy.log'
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
LOG_DATEFORMAT = '%Y-%m-%d %H:%M:%S'
LOG_STDOUT = False # 不输出到 stdout
十三、部署篇¶
13.1 Scrapyd¶
# pip install scrapyd scrapyd-client
# 启动 Scrapyd 服务
scrapyd
# 部署爬虫
scrapyd-deploy default -p myspider
# API 操作
# 启动爬虫
curl http://localhost:6800/schedule.json -d project=myspider -d spider=quotes
# 查看任务
curl http://localhost:6800/listjobs.json?project=myspider
# 取消任务
curl http://localhost:6800/cancel.json -d project=myspider -d job=<job_id>
13.2 Docker 部署¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
gcc libffi-dev libssl-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["scrapy", "crawl", "quotes", "-o", "/data/output.jsonl"]
# docker-compose.yml
version: "3.8"
services:
spider:
build: .
volumes:
- ./data:/data
environment:
- MYSQL_HOST=db
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
command: scrapy crawl quotes
# 分布式 worker(可扩展多个)
worker1:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
command: scrapy crawl distributed
worker2:
build: .
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
command: scrapy crawl distributed
db:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: scrapy_db
volumes:
- mysql_data:/var/lib/mysql
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
mysql_data:
redis_data:
常用命令速查¶
# 项目管理
scrapy startproject <name> # 创建项目
scrapy genspider <name> <domain> # 创建爬虫
scrapy genspider -t crawl <name> <domain> # 创建 CrawlSpider
# 运行
scrapy crawl <spider> # 运行爬虫
scrapy crawl <spider> -o out.jsonl # 输出文件
scrapy crawl <spider> -s LOG_LEVEL=DEBUG # 调试模式
scrapy crawl <spider> -s CLOSESPIDER_ITEMCOUNT=100 # 限制数量
# 调试
scrapy shell <url> # 交互式调试
scrapy fetch <url> # 下载页面
scrapy view <url> # 浏览器查看
# 检查
scrapy list # 列出所有爬虫
scrapy check <spider> # 检查爬虫契约
scrapy settings --get <key> # 查看配置值
常用扩展汇总¶
库 |
功能 |
|---|---|
scrapy-redis |
分布式爬取 |
scrapy-playwright |
Playwright JS 渲染 |
scrapy-splash |
Splash JS 渲染 |
scrapy-fake-useragent |
随机 UA |
scrapy-rotating-proxies |
自动轮换代理 |
itemadapter |
Item 统一适配器 |
w3lib |
HTML/URL 工具 |
parsel |
独立选择器库(Scrapy 内部使用) |
scrapyd |
爬虫部署服务 |
spidermon |
爬虫监控 |
参考资源¶
Scrapy 官方文档:https://docs.scrapy.org/
scrapy-redis:https://github.com/rmax/scrapy-redis
scrapy-playwright:https://github.com/scrapy-plugins/scrapy-playwright
Parsel 文档:https://parsel.readthedocs.io/
SpiderMon 监控:https://spidermon.readthedocs.io/