scrapy学习:利用 deferToThreadPool 优化一些爬虫中的同步场景

一些同步场景出现在爬虫里,就会导致执行过慢,可以用此方式进行优化,如执行 os.system 的同步脚本
更新于: 2023-03-30 14:36:46

核心代码

  • 我的场景里 return deferToThreadPool 用的是 yield  
  • 优点:可以把同步的阻塞场景变成无阻塞的状态
from twisted.internet.threads import deferToThreadPool
from twisted.internet import reactor
...
    def parse(self, response):
        return deferToThreadPool(
            reactor, reactor.getThreadPool(), self.blocking_call, response.body)

原始方式

import os
import time
import requests
import scrapy
from spider_ebi.models.rbd_pdb import RbdPdb


class RbdPdbSyncblockSpider(scrapy.Spider):
    name = 'rbd_pdb_syncblock'
    handle_httpstatus_list = [400]
    start_urls = ['https://www.baidu.com/']

    custom_settings = {
        'CONCURRENT_REQUESTS': 100,
    }

    @property
    def records(self):
        return RbdPdb.select().where(RbdPdb.id.is_null(False)).limit(100)

    @property
    def is_done(self):
        return len(self.records) == 0

    def parse(self, response, **kwargs):
        for record in self.records:
            pdb_url = record.pdb_url
            res = requests.get(pdb_url)
            self.logger.info(f'get pdb_url: {pdb_url}')
            if res.status_code == 200:
                html = res.text
                # save html to file system
                file_path = os.path.join(
                    './downloads', 'test', f'{record.acc2}.cif')
                time.sleep(3)
                with open(file_path, 'w') as f:
                    f.write(html)

优化版

import os
import time
import requests

import scrapy
import jsw_nx as nx

from twisted.internet.threads import deferToThreadPool
from twisted.internet import reactor
from spider_ebi.models.rbd_pdb import RbdPdb


class RbdPdbSyncblockSpider(scrapy.Spider):
    name = 'rbd_pdb_syncblock'
    handle_httpstatus_list = [400]
    start_urls = ['https://www.baidu.com/']

    custom_settings = {
        'CONCURRENT_REQUESTS': 100,
    }

    @property
    def records(self):
        return RbdPdb.select().where(RbdPdb.id.is_null(False)).limit(100)

    @property
    def is_done(self):
        return len(self.records) == 0


    def parse(self, response, **kwargs):
        for record in self.records:
            yield deferToThreadPool(
                reactor, reactor.getThreadPool(), self.blocking_call, record)

    def blocking_call(self, record):
        url = record.pdb_url
        res = requests.get(url)
        self.logger.info(f'get pdb_url: {url}')
        if res.status_code == 200:
            html = res.text
            # save html to file system
            file_path = os.path.join(
                './downloads', 'test', f'{record.acc2}.cif')
            time.sleep(3)
            with open(file_path, 'w') as f:
                f.write(html)

参考