scrapy学习:在程序中启动 scrapy 使用 CrawlerProcess、CrawlerRunner

用程序启动 scrapy,而不只是scrapy crawl <spider-name>,常用在 debug 或者启动多个 chunk 的场景
更新于: 2022-10-20 01:22:17

背景

目前為止我們都是用 scrapy crawl <spider-name> 指令來啟動爬蟲,但有時候可能需要在程式中來啟動爬蟲(例如提供一個 API 接口,外部發請求來告知要啟動哪一支爬蟲,由程式來啟動對應的爬蟲),今天會介紹幾種啟動爬蟲的方式。

方法1:利用 subprocess 来启动

  • 方便,很好懂
  • 问题:非常吃資源
import subprocess

subprocess.run('scrapy crawl ithome')

因為每次執行 scrapy crawl 都會產生一個新的 Scrapy Engine 實體,如果用這個方式啟動多個爬蟲會非常吃資源,所以其實不建議使用 (那幹嘛講)。

方法2:CrawlerProcess

我們可以利用 scrapy.crawler.CrawlerProcess 這個類別來啟動爬蟲,scrapy crawl 指令其實也是使用這個類別。

from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

'''
get_project_settings() 方法會取得爬蟲專案中的 settings.py 檔案設定
啟動爬蟲前要提供這些設定給 Scrapy Engine
'''
process = CrawlerProcess(get_project_settings())

process.crawl('ithome')
process.start()

 必须等上一个运行完成,才会运行下一个

from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from uniprot_spider.spiders.ncbi_protein_down_fasta import NcbiProteinDownFastaSpider

settings = get_project_settings()
process = CrawlerProcess(settings)
count = len(NcbiProteinDownFastaSpider.get_chunks())


for i in range(0, count):
    process.crawl('ncbi_protein_down_fasta', index=i)
    process.start()

一次可以启动多个,同时运行

  • 会消耗较大内存
  • 会报这个错误: reactor already installed 这个错误暂时未找到原因
  • 消耗内存会很大(在有其它资源占用的时候,会无法跑起来)
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from uniprot_spider.spiders.ncbi_protein_down_fasta import NcbiProteinDownFastaSpider

settings = get_project_settings()
process = CrawlerProcess(settings)
count = len(NcbiProteinDownFastaSpider.get_chunks())


for i in range(0, count):
    process.crawl('ncbi_protein_down_fasta', index=i)
process.start()

方法3:CrawlerRunner

如果原本的程式中已經有使用 Twisted 來執行一些非同步的任務,官方建議改用 scrapy.crawler.CrawlerRunner 來啟動爬蟲,如此可以跟原本的程式使用同一個 Twisted reactor

from twisted.internet import reactor
from scrapy.crawler import CrawlerRunner
from scrapy.utils.project import get_project_settings

'''
get_project_settings() 方法會取得爬蟲專案中的 settings.py 檔案設定
啟動爬蟲前要提供這些設定給 Scrapy Engine
'''
runner = CrawlerRunner(get_project_settings())

d = runner.crawl('myspider')
d.addBoth(lambda _: reactor.stop())
reactor.run()

看一个下载爬虫的例子

import scrapy
import jsw_nx as nx
from uniprot_spider.models.ncbi_protein import NcbiProtein
from uniprot_spider.items import NcbiProteinDownItem


class NcbiProteinDownFastaSpider(scrapy.Spider):
    name = 'ncbi_protein_down_fasta'
    allowed_domains = ['www.baidu.com', 'ncbi.nlm.nih.gov']

    handle_httpstatus_list = [400]

    custom_settings = {
        'CONCURRENT_REQUESTS': 100,
        'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36'
    }

    @classmethod
    def get_chunks(cls):
        chunks = list(NcbiProtein.where({"is_crawled": False}).chunk(50000))
        return chunks

    @property
    def records(self):
        chunks = self.get_chunks()
        return chunks[int(self.index)]

    def start_requests(self):
        records = self.records
        total = len(records)
        for index, entity in enumerate(records):
            id = entity.id
            self.logger.info(f'Current progress: {index} / {total}')
            yield scrapy.Request(url=f'https://www.baidu.com?id={id}', callback=self.parse, meta={'entity': entity})

    def parse(self, response):
        try:
            entity = response.meta['entity']
            fasta_url = entity.fasta_url
            genpept_url = entity.genpept_url
            item = NcbiProteinDownItem()
            item['file_urls'] = [fasta_url, genpept_url]
            item['protein_id'] = entity.protein_id
            item['id'] = entity.id
            yield item
        except Exception as e:
            self.logger.error(e)
            raise scrapy.exceptions.CloseSpider('Crawl failed')

任务文件:tasks/ncbi_protein_down_fasta.py

from scrapy.utils.project import get_project_settings
from uniprot_spider.spiders.ncbi_protein_down_fasta import NcbiProteinDownFastaSpider
from scrapy.crawler import CrawlerRunner
from twisted.internet import reactor

settings = get_project_settings()
process = CrawlerProcess(settings)
count = len(NcbiProteinDownFastaSpider.get_chunks())


runner = CrawlerRunner(get_project_settings())
for i in range(0, count):
    d = runner.crawl('ncbi_protein_down_fasta', index=i)
    d.addBoth(lambda _: reactor.stop())

reactor.run()

进程守护:用 pm2 运行命令如下

  • ecosystem.config.js 配置文件在下面
pm2 start ecosystem.config.js --only "ncbi_down"
module.exports = {
  apps: [
    {
      name: 'ncbi_down',
      interpreter: '/root/.cache/pypoetry/virtualenvs/uniprot-spider-A_j0jmEh-py3.10/bin/python',
      namespace: 'uniprot',
      script: './tasks/ncbi_protein_down_fasta.py',
      ignore_watch: ['node_modules', 'logs', 'tmp', '*.pyc']
    }
  ]
};

多进程运行

import multiprocessing

import numpy as np
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

tasks = [{'start_url': 'https://www.theguardian.com/', 'selenium': False},
         {'start_url': 'https://www.nytimes.com/', 'selenium': True}]


def crawl(tasks):
    process = CrawlerProcess(get_project_settings())

    def run_spider(_, index=0):
        if index < len(tasks):
            deferred = process.crawl('test_spider', task=tasks[index])
            deferred.addCallback(run_spider, index + 1)
            return deferred

    run_spider(None)
    process.start()


def main():
    processes = 2
    with multiprocessing.Pool(processes) as pool:
        pool.map(crawl, np.array_split(tasks, processes))


if __name__ == '__main__':
    main()

参考