Logo

郎哥编程

详解scrapy-redis组件

2020-12-25 223

1、 下载scrapy-redis组件

scrapy-redis是在redis-py基础上开发的、用于支持Scrpy框架使用Redis服务的组件,该组件可以支持Scrapy框架使用Redis服务,构建分布式爬取程序。

在使用scrapy-redis之前,需要先了解一下scrapy-redis组件的工作原理及代码构成。scrapy-redis是开源组件,其源代码可以从github 网站下载。下图是scrapy-redis模块的源代码。

68.png

2、 调度引擎

为实现分布式爬取,scrapy-redis将所有请求都放置在Redis数据库,所有爬虫的请求队列都来自于Redis数据库。scrapy-redis负责分配不重复的请求队列给爬虫,它还负责将多个爬虫爬取的数据汇总到一处(如:数据库、JSON、EXECL等)。

对于一个分布式爬取框架,需要解决以下几个问题:

(1)   拦截爬虫的所有请求,并将请求存储到Redis数据库;

(2)   为每个爬虫分配请求;

(3)   汇总所有爬虫的数据到一处(如:数据库、JSON文件等)。

要解决上面的(1)和(2)问题,需要一个调度引擎,scheduler.py就是scrapy-redis组件的调度引擎,核心代码如下:

import importlib
import six
 
from scrapy.utils.misc import load_object
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
      def __init__(self, server,
                 persist=False,
                 flush_on_start=False,
                 queue_key=defaults.SCHEDULER_QUEUE_KEY,
                 queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                 dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                 dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                 idle_before_close=0,
                 serializer=None):
     
        if idle_before_close < 0:
            raise TypeError("idle_before_close cannot be negative")
 
        self.server = server
        self.persist = persist
        self.flush_on_start = flush_on_start
        self.queue_key = queue_key
        self.queue_cls = queue_cls
        self.dupefilter_cls = dupefilter_cls
        self.dupefilter_key = dupefilter_key
        self.idle_before_close = idle_before_close
        self.serializer = serializer
        self.stats = None
 
    def __len__(self):
        return len(self.queue)
 
    @classmethod
    def from_settings(cls, settings):
        kwargs = {
            'persist': settings.getbool('SCHEDULER_PERSIST'),
            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
        }
 
        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            'queue_key': 'SCHEDULER_QUEUE_KEY',
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
            # We use the default setting name to keep compatibility.
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            'serializer': 'SCHEDULER_SERIALIZER',
        }
        for name, setting_name in optional.items():
            val = settings.get(setting_name)
            if val:
                kwargs[name] = val
 
        # Support serializer as a path to a module.
        if isinstance(kwargs.get('serializer'), six.string_types):
            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
 
        server = connection.from_settings(settings)
        # Ensure the connection is working.
        server.ping()
 
        return cls(server=server, **kwargs)
 
    @classmethod
    def from_crawler(cls, crawler):
        instance = cls.from_settings(crawler.settings)
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance
 
    def open(self, spider):
        self.spider = spider
 
        try:
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                key=self.queue_key % {'spider': spider.name},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                             self.queue_cls, e)
 
        self.df = load_object(self.dupefilter_cls).from_spider(spider)
 
        if self.flush_on_start:
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
 
    def close(self, reason):
        if not self.persist:
            self.flush()
 
    def flush(self):
        self.df.clear()
        self.queue.clear()
 
    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True
 
    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request
 
    def has_pending_requests(self):
        return len(self) > 0

在Scrapy框架中,要启用该引擎,需要修改爬虫项目的settings.py配置文件,添加SCHEDULER配置项,该配置项用于配置爬虫的调度引擎,以替换默认的调度引擎。

SCHEDULER = "scrapy_redis.scheduler.Scheduler"

代码分析:

Scheduler实现了和 Redis 的交互,并进行任务管理调度。使得多机运行的爬虫在 Redis 上协调存取工作(与 redis 建立连接,创建任务队列,request 的入队、出对和过滤等操作)。

from_settings(cls, settings)方法通过settings配置项初始化Scheduler调度引擎实例,从settings中读取Redis配置项并连接Redis服务,参数cls是Scheduler调度引擎实例。

open(self, spider)方法主要实例化了任务队列 queue 和过滤器 dupefilter。

enqueue_request(self, request)方法对应请求的入队操作,Spider 提交的Request 对象最终由Scrapy 引擎调用enqueue_request 添加到请求队列中。

next_request(self)方法对应请求的出队操作,Scrapy 引擎调用next_request 从请求队列中取出请求,送给下载器下载。

Scheduler类有两个重要的属性queue和df:queue是请求队列,用于存储所有来自爬虫的请求;df是去重过滤器,用于请求的去重操作。

在enqueue_request 方法中,使用去重过滤器的request_seen 方法判断request 是否重复,即request是否已经爬取过,若已经爬取且用户没有设置dont_filter,就抛弃该request,否则调用self.queue 的push 方法将request 入队。

在next_request 方法中,调用self.queue的pop方法出队一个request 并返回。

3、 请求队列

scrapy-redis组件的请求队列由queue.py模块实现,阅读queue.py代码可以看出,请求队列的基类是Base类,基类定义了push和pop等空方法,这些空方法由子类实现。

Base类有三个子类,分别是FifoQueue类、PriorityQueue类和LifoQueue类。FifoQueue类是先进先出队列,PriorityQueue类是优先级队列,LifoQueue类是后进先出队列。

Base类的代码如下:

class Base(object):
def __init__(self, server, spider, key, serializer=None):
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)
 
        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer
    """Per-spider base queue class"""
   def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)
   def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)
   def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError
 
    def push(self, request):
        """Push a request"""
        raise NotImplementedError
  def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError
   def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)

Base类_init_()方法用于初始化队列,参数server为Redis服务连接实例,参数spider为爬虫实例,参数key为Redis数据库的键,队列类通过key进行入队和出队操作。

Base类push()和pop()方法没有做任何工作,只是抛出NotImplementedError异常错误,方法的实现交给子类,实现不同的请求队列。

Base类的_encode_request()和_decode_request()方法分别对request进行编码和解码。并进行序列化。

FifoQueue类(先进先出队列)

FifoQueue类是先进先出队列,即谁先进入队列,谁先出去,爬取过程为广度优先。FifoQueue类push()和pop()方法代码如下:

def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)

push()方法调用Redis命令lpush,在key中压入一个编码后的request。pop()方法调用Redis 的rpop 或brpop 命令从数据库中列表的最右端弹出一个经过编码的request,再调用基类的_decode_request 方法对其进行解码,然后返回。

PriorityQueue类(优先级队列)

PriorityQueue类是优先级队列,也是scrapy-redis默认采用的队列。PriorityQueue类使用Redis的有序集合来对Request进行排序,优先级高的在有序集合的顶层,然后从上往下依次获取Request即可。PriorityQueue类push()和pop()方法代码如下:

def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0):
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])

push()方法首先获取request的优先级priority,然后使用Redis的execute_command()方法执行ZADD命令,在有序集合按优先级排序插入request。

pop()方法首先调用Redis的pipeline()方法,获取pepeline实例对象,pipeline能够执行一组命令,提高命令执行效率。然后pipeline对象的zrange()方法添加Redis的zrange命令,该命令返回有序集中指定区间内的成员。最后调用pipeline对象的execute()方法执行命令。

LifoQueue类(后进先出队列)

LifoQueue类是后进先出队列,即最后入队的先出队,爬取过程为深度优先。LifoQueue类push()和pop()方法代码如下:

def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)
 
        if data:
            return self._decode_request(data)

LifoQueue类的push()方法同FifoQueue类的相同,都是调用Redis的lpush命令将request压入key。

LifoQueue类的pop()方法调用Redis的blpop或lpop命令弹出最后入队的元素。

4、 去重过滤器

去重过滤器就是过滤最近已经爬取过的URL,防止重复爬取。Scrapy会自动对request做去重过滤,若不希望Scrapy过滤重复的request,可以在生成request对象时设置dont_filter为True。例如:

scrapy.Request(url, callback=self.get_response, dont_filter=True)

scrapy-redis重写了scrapy的去重队列,要启用scrapy-redis的去重队列,需要在settings.py配置文件添加下面的配置项:

DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

scrapy-redis的去重过滤器的实现代码为dupefilter.py,类名是RFPDupeFilter,该类的核心代码为:

def request_seen(self, request):
    fp = self.request_fingerprint(request)
    # This returns the number of values added, zero if already exists.
    added = self.server.sadd(self.key, fp)
    return added == 0
def request_fingerprint(self, request):
    return request_fingerprint(request)

RFPDupeFilte使用Redis 中的一个去重集合对请求进行去重,该集合在数据库中的键为self.key 的值。

request_seen()方法用来判断一个request请求是否重复,首先调用request_fingerprint()方法计算request指纹,然后调用Redis的sadd命令将request指纹加入到去重集合,根据sadd 返回值判断请求是否重复,若加入失败说明request已在去重集合中,该request是重复的,返回True。

request_fingerprint()方法用于计算request的指纹。

5、 爬取数据汇总管道类

在分布式爬取框架中,所有爬虫爬取的数据都要汇总到一处(如:数据库、JSON文件等)。

scrapy-redis提供了RedisPipeline,将所有爬虫爬取的数据统一存储到Redis数据库。若希望存储到其它数据库或文件,可以重写RedisPipeline类,RedisPipeline类的模块文件是pipelines.py,代码如下:

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
 
from . import connection, defaults
 
 
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
    """Pushes serialized item into a redis list/queue
    Settings
    --------
    REDIS_ITEMS_KEY : str
        Redis key where to store items.
    REDIS_ITEMS_SERIALIZER : str
        Object path to serializer function.
 
    """
    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize):
        """Initialize pipeline.
 
        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.
        """
        self.server = server
        self.key = key
        self.serialize = serialize_func
 
    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )
        return cls(**params)
    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)
    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)
 
    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item
    def item_key(self, item, spider):
        """Returns redis key based on given spider.
 
        Override this function to use a different key depending on the item
        and/or spider.
 
        """
        return self.key % {'spider': spider.name}

_init_()方法传入三个参数:参数server是Redis服务连接实例对对象;key是在settings配置文件中配置的键值,item_key()方法使用该键值与字典{'spider': spider.name}取模运算后的结果,作为Redis数据库的一个键值;serialize_func()是回调方法,将item数据进行串行化。

process_item()方法将item数据存储到Redis数据库,该方法会调用deferToThread()方法启动一个线程,在线程中执行_process_item()方法。

_process_item()首先调用item_key()获取Redis数据库的键值key,然后调用serialize()方法对item数据串行化,最后调用Redis服务的rpush方法将串行化的数据压入key。

代码在线纠错(通义千问 qwen-max)

支持粘贴多个代码文件,提交后由阿里云通义千问自动分析代码漏洞、语法错误、逻辑问题并给出修改建议。
您已解锁 AI 代码纠错功能,可正常使用!

评论区

登录 后发表评论
暂无评论