Scrapy学习: 一种分布式爬虫的实现

自己结合 mysql 实现的一种简单分布式爬虫
更新于: 2022-10-30 08:55:42

设计思路

  • 将所有要采集的URL分成 chunks,在设定里有 CHUNK_SIZE 这个 setting
  • 每个 chunk 有三种状态: INIT/RUNNING/DONE
    • init: 初始化的状态
    • running: 每个 chunk 开始采集的时候,就会是这个状态了
    • done: 当前 chunk 采集完成
原理图
简单的数据库 UML

scrapy settings

CHUNK_SIZE = 30

spider_chunk.py

KninfoDetail 是要采集的目标 URL 表。

import scrapy
import jsw_nx as nx

from spider_knlib.models.kninfo_detail import KninfoDetail
from spider_knlib.settings import CHUNK_SIZE


class ChunkSpider(scrapy.Spider):
    name = 'chunk'
    handle_httpstatus_list = [400]
    start_urls = ['https://www.baidu.com/']

    custom_settings = {
        'CONCURRENT_REQUESTS': 100,
    }

    @property
    def total(self):
        return KninfoDetail.select().count()

    def parse(self, response, **kwargs):
        total = self.total
        pages = total // CHUNK_SIZE + 1
        for page in range(1, pages + 1):
            page_size = CHUNK_SIZE if page < pages else total % CHUNK_SIZE
            yield {
                'page': page,
                'page_size': page_size,
                'current': 0,
                'progress': 0,
                'status': 'INIT'
            }

简单实现

  • models/chunk
  • spider
import datetime
from peewee import *
from ..db import db


class Chunk(Model):
    page = IntegerField()
    page_size = IntegerField()
    current = IntegerField()
    progress = IntegerField()
    status = CharField()
    created_at = DateTimeField(default=datetime.datetime.now())
    updated_at = DateTimeField(default=datetime.datetime.now())

    @classmethod
    def update_status(cls, instance, status):
        instance.status = status
        instance.save()

    class Meta:
        database = db
        table_name = 'chunks'
import scrapy
import jsw_nx as nx

from spider_knlib.models.kninfo_detail import KninfoDetail
from spider_knlib.models.chunk import Chunk
from spider_knlib.loaders import SpiderKnlibItemLoader
from spider_knlib.items import SpiderKnlibItem
from spider_knlib.settings import CHUNK_SIZE


class KninfoDetailChunkSpider(scrapy.Spider):
    name = 'kninfo_detail_chunk'
    handle_httpstatus_list = [400]
    start_urls = ['https://www.baidu.com/']
    latest_chunk = None
    records = None

    custom_settings = {
        'CONCURRENT_REQUESTS': 20
    }

    @property
    def is_finished(self):
        crawled_count = KninfoDetail.select().where(
            KninfoDetail.is_crawled == False).count()
        return crawled_count == 0

    @staticmethod
    def close(spider, reason):
        res = super().close(spider, reason)
        if reason == 'finished':
            Chunk.update_status(spider.latest_chunk, 'DONE')
            spider.latest_chunk = None

        return res

    def init_records(self):
        self.latest_chunk = Chunk.select().where(Chunk.status == 'INIT').first()
        page = self.latest_chunk.page
        self.logger.info(
            f'Current chunk is: {self.latest_chunk.page}/{self.latest_chunk.status}')
        self.records = KninfoDetail.select().paginate(page, CHUNK_SIZE)
        Chunk.update_status(self.latest_chunk, 'RUNNING')

    def start_requests(self):
        self.init_records()
        records = self.records

        self.logger.info(
            f'Current records/start_records count is: {len(records)}')

        if self.is_finished:
            nx.notify(
                body=f'All records is finished - from spider: {self.name}')
            self.logger.info('All records are crawled')

        for record in records:
            url = record.ncbi_sview_url
            self.logger.info(f'Current url is: {url}')
            yield scrapy.Request(url, callback=self.parse_ncbi_sview, meta={'record': record})

    def parse(self, response, **kwargs):
        record = response.meta['record']
        item_l = SpiderKnlibItemLoader(
            item=SpiderKnlibItem(), response=response)
        yield item_l.load_item()

继续改进的地方

  • 异常情况下,某台机器上的爬虫停止,下次再启动的时候,会有部分处于 RUNNING 状态?
    • 可以使用的解决方案是,添加一个“巡航”程序,定时将一些 RUNNING 状态,但时间( updated_at )距当前时间很久的,状态置为 INIT
  • 需要与 pm2 等进程守护程序配合使用
  • 配合 docker,方便部署
    • 单台机器,多个 docker ?

测试爬虫运行

该爬虫单条数据的开销比较大,所以,未优化前爬取速度比较慢。

采集过程 chunk 表截图
#1台/1进程/10协程/每分钟/30 -> 58 (28)
#2台/1进程/10协程/每分钟/73 -> 114 (41)
#2台/8进程Pool/10协程/每分钟/215 -> 394 (179)