1、 下载scrapy-redis组件
scrapy-redis是在redis-py基础上开发的、用于支持Scrpy框架使用Redis服务的组件,该组件可以支持Scrapy框架使用Redis服务,构建分布式爬取程序。
在使用scrapy-redis之前,需要先了解一下scrapy-redis组件的工作原理及代码构成。scrapy-redis是开源组件,其源代码可以从github 网站下载。下图是scrapy-redis模块的源代码。

2、 调度引擎
为实现分布式爬取,scrapy-redis将所有请求都放置在Redis数据库,所有爬虫的请求队列都来自于Redis数据库。scrapy-redis负责分配不重复的请求队列给爬虫,它还负责将多个爬虫爬取的数据汇总到一处(如:数据库、JSON、EXECL等)。
对于一个分布式爬取框架,需要解决以下几个问题:
(1) 拦截爬虫的所有请求,并将请求存储到Redis数据库;
(2) 为每个爬虫分配请求;
(3) 汇总所有爬虫的数据到一处(如:数据库、JSON文件等)。
要解决上面的(1)和(2)问题,需要一个调度引擎,scheduler.py就是scrapy-redis组件的调度引擎,核心代码如下:
import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None):
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val
# Support serializer as a path to a module.
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
self.df = load_object(self.dupefilter_cls).from_spider(spider)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0在Scrapy框架中,要启用该引擎,需要修改爬虫项目的settings.py配置文件,添加SCHEDULER配置项,该配置项用于配置爬虫的调度引擎,以替换默认的调度引擎。
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
代码分析:
Scheduler实现了和 Redis 的交互,并进行任务管理调度。使得多机运行的爬虫在 Redis 上协调存取工作(与 redis 建立连接,创建任务队列,request 的入队、出对和过滤等操作)。
from_settings(cls, settings)方法通过settings配置项初始化Scheduler调度引擎实例,从settings中读取Redis配置项并连接Redis服务,参数cls是Scheduler调度引擎实例。
open(self, spider)方法主要实例化了任务队列 queue 和过滤器 dupefilter。
enqueue_request(self, request)方法对应请求的入队操作,Spider 提交的Request 对象最终由Scrapy 引擎调用enqueue_request 添加到请求队列中。
next_request(self)方法对应请求的出队操作,Scrapy 引擎调用next_request 从请求队列中取出请求,送给下载器下载。
Scheduler类有两个重要的属性queue和df:queue是请求队列,用于存储所有来自爬虫的请求;df是去重过滤器,用于请求的去重操作。
在enqueue_request 方法中,使用去重过滤器的request_seen 方法判断request 是否重复,即request是否已经爬取过,若已经爬取且用户没有设置dont_filter,就抛弃该request,否则调用self.queue 的push 方法将request 入队。
在next_request 方法中,调用self.queue的pop方法出队一个request 并返回。
3、 请求队列
scrapy-redis组件的请求队列由queue.py模块实现,阅读queue.py代码可以看出,请求队列的基类是Base类,基类定义了push和pop等空方法,这些空方法由子类实现。
Base类有三个子类,分别是FifoQueue类、PriorityQueue类和LifoQueue类。FifoQueue类是先进先出队列,PriorityQueue类是优先级队列,LifoQueue类是后进先出队列。
Base类的代码如下:
class Base(object):
def __init__(self, server, spider, key, serializer=None):
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r"
% serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
% serializer)
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
"""Per-spider base queue class"""
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)Base类_init_()方法用于初始化队列,参数server为Redis服务连接实例,参数spider为爬虫实例,参数key为Redis数据库的键,队列类通过key进行入队和出队操作。
Base类push()和pop()方法没有做任何工作,只是抛出NotImplementedError异常错误,方法的实现交给子类,实现不同的请求队列。
Base类的_encode_request()和_decode_request()方法分别对request进行编码和解码。并进行序列化。
FifoQueue类(先进先出队列)
FifoQueue类是先进先出队列,即谁先进入队列,谁先出去,爬取过程为广度优先。FifoQueue类push()和pop()方法代码如下:
def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
push()方法调用Redis命令lpush,在key中压入一个编码后的request。pop()方法调用Redis 的rpop 或brpop 命令从数据库中列表的最右端弹出一个经过编码的request,再调用基类的_decode_request 方法对其进行解码,然后返回。
PriorityQueue类(优先级队列)
PriorityQueue类是优先级队列,也是scrapy-redis默认采用的队列。PriorityQueue类使用Redis的有序集合来对Request进行排序,优先级高的在有序集合的顶层,然后从上往下依次获取Request即可。PriorityQueue类push()和pop()方法代码如下:
def push(self, request):
"""Push a request"""
data = self._encode_request(request)
score = -request.priority
self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0):
pipe = self.server.pipeline()
pipe.multi()
pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
results, count = pipe.execute()
if results:
return self._decode_request(results[0])push()方法首先获取request的优先级priority,然后使用Redis的execute_command()方法执行ZADD命令,在有序集合按优先级排序插入request。
pop()方法首先调用Redis的pipeline()方法,获取pepeline实例对象,pipeline能够执行一组命令,提高命令执行效率。然后pipeline对象的zrange()方法添加Redis的zrange命令,该命令返回有序集中指定区间内的成员。最后调用pipeline对象的execute()方法执行命令。
LifoQueue类(后进先出队列)
LifoQueue类是后进先出队列,即最后入队的先出队,爬取过程为深度优先。LifoQueue类push()和pop()方法代码如下:
def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data)
LifoQueue类的push()方法同FifoQueue类的相同,都是调用Redis的lpush命令将request压入key。
LifoQueue类的pop()方法调用Redis的blpop或lpop命令弹出最后入队的元素。
4、 去重过滤器
去重过滤器就是过滤最近已经爬取过的URL,防止重复爬取。Scrapy会自动对request做去重过滤,若不希望Scrapy过滤重复的request,可以在生成request对象时设置dont_filter为True。例如:
scrapy.Request(url, callback=self.get_response, dont_filter=True)
scrapy-redis重写了scrapy的去重队列,要启用scrapy-redis的去重队列,需要在settings.py配置文件添加下面的配置项:
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
scrapy-redis的去重过滤器的实现代码为dupefilter.py,类名是RFPDupeFilter,该类的核心代码为:
def request_seen(self, request): fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): return request_fingerprint(request)
RFPDupeFilte使用Redis 中的一个去重集合对请求进行去重,该集合在数据库中的键为self.key 的值。
request_seen()方法用来判断一个request请求是否重复,首先调用request_fingerprint()方法计算request指纹,然后调用Redis的sadd命令将request指纹加入到去重集合,根据sadd 返回值判断请求是否重复,若加入失败说明request已在去重集合中,该request是重复的,返回True。
request_fingerprint()方法用于计算request的指纹。
5、 爬取数据汇总管道类
在分布式爬取框架中,所有爬虫爬取的数据都要汇总到一处(如:数据库、JSON文件等)。
scrapy-redis提供了RedisPipeline,将所有爬虫爬取的数据统一存储到Redis数据库。若希望存储到其它数据库或文件,可以重写RedisPipeline类,RedisPipeline类的模块文件是pipelines.py,代码如下:
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
"""Pushes serialized item into a redis list/queue
Settings
--------
REDIS_ITEMS_KEY : str
Redis key where to store items.
REDIS_ITEMS_SERIALIZER : str
Object path to serializer function.
"""
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
"""Initialize pipeline.
Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.
"""
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}_init_()方法传入三个参数:参数server是Redis服务连接实例对对象;key是在settings配置文件中配置的键值,item_key()方法使用该键值与字典{'spider': spider.name}取模运算后的结果,作为Redis数据库的一个键值;serialize_func()是回调方法,将item数据进行串行化。
process_item()方法将item数据存储到Redis数据库,该方法会调用deferToThread()方法启动一个线程,在线程中执行_process_item()方法。
_process_item()首先调用item_key()获取Redis数据库的键值key,然后调用serialize()方法对item数据串行化,最后调用Redis服务的rpush方法将串行化的数据压入key。