scrapy学习:利用 scrapy 来完成文件批量入库操作

有时候我们有处理大量文件的场景,利用 scrapy 也可以完成
更新于: 2022-02-27 00:45:32

背景

有大量的文件100G左右,每个文件里有很多条数据 ,可以被处理成一条条的数据

所以,基本思路是把这些文件当成资源来请求

建立文件 fasta.py

我这个文件是 fasta 的序列

# 建议 fasta 的 task
scrapy genspider fasta www.baidu.com

重写 start_request 方法

重点在这里:

  1. 以id为key,构造不同的URL: http://www.baidu.com?id={url_id}
  2. 这里可能用 127.0.0.1 为请求可能速度更快,但需要起一个本地服务
  3. 另外:这个请求会产生400结果 ,所以,还有一个设置很关键: handle_httpstatus_list = [400]
def start_requests(self):
    self.start_urls = glob.glob(f'./downloads/{self.quality}/*.fasta')
    total = len(self.start_urls)
    for index, file in enumerate(self.start_urls):
        url_id = os.path.basename(file).split('.')[0]
        self.logger.info(f'Current progress: {index} / {total}')
        meta = {'file': file, "url_id": url_id}
        yield scrapy.Request(f'http://www.baidu.com?id={url_id}', callback=self.parse, meta=meta)

完整代码如下

import scrapy
import os
import glob
from gbins_spider.items import FastaItem
from gbins_spider.helpers.fast2list import fasta2list
import jsw_nx as nx


class FastaSpider(scrapy.Spider):
    name = 'fasta'
    # allowed_domains = ['localhost']
    # start_urls = ['https://www.baidu.com/']
    quality = 'low'
    handle_httpstatus_list = [400]

    def start_requests(self):
        self.start_urls = glob.glob(f'./downloads/{self.quality}/*.fasta')
        total = len(self.start_urls)
        for index, file in enumerate(self.start_urls):
            url_id = os.path.basename(file).split('.')[0]
            self.logger.info(f'Current progress: {index} / {total}')
            meta = {'file': file, "url_id": url_id}
            yield scrapy.Request(f'http://www.baidu.com?id={url_id}', callback=self.parse, meta=meta)

    def parse(self, response):
        file = response.meta['file']
        url_id = response.meta['url_id']
        seqs = fasta2list(file)
        self.logger.info(f'Current seqs.length :  {len(seqs)}')
        self.logger.info(f'Current filename -> url_id : {url_id} -> {file}')
        for seq in seqs:
            item = FastaItem()
            item["fasta_id"] = seq["header"]
            item["fasta_content"] = seq["sequence"]
            item["url_id"] = url_id
            yield item
        os.rename(file, f'./downloads/{self.quality}/{url_id}.done')

以命令方式运行

# 1. 以下方式测试
scrapy crawl fasta
# 2. 以 background 方式运行
nohup scrapy crawl fasta > /dev/null 2>&1 &

需要改进的点

  1. 现在处理完会用改名为 .done 的方式,最好以单独的数据库来记录
  2. 由于文件数据量很大,最好能把  quality = 'low' 做成不同的启动参数,可以启多个 scrapy 程序,带参数的方式

更合理的做法

scrapy 是可以直接读取p这种 file:///Users/aric.zheng/aric-juzi/kksc/xx.html 格式的文件

class FileToDbSpider(scrapy.Spider):
    name = 'file_to_db'
    # allowed_domains = ['www.baidu.com']
    # handle_httpstatus_list = [400]

    def start_requests(self):
        cwd = CUSTOMIZE_SETTINGS['cwd']
        os.chdir(cwd)
        file_glob = '**/*.html'
        files = glob.glob(file_glob, recursive=True)

        for index, file in enumerate(files):
            # file:///Users/aric.zheng/aric-juzi/kksc/xx.html
            file_url = f'file:///Users/aric.zheng/aric-juzi/kksc/{file}'
            yield scrapy.Request(
                file_url,
                callback=self.parse,
                meta={'file': file}
            )

 

参考