python学习:python并发编程 - 06/07 线程池

一套不错的 python 并发编程教程
更新于: 2022-10-29 08:10:52

好用的线程池

  • 新建/终止会有资源的时间的开销
  • 好处: 提升了性能
  • 场景:适合突发性大量请求,适合需要大量,每个任务时间短的场景
  • 防御机制:避免系统中短时间大量的线程创建,资源占用过多情况
线程池原理
线程池基本用法
import concurrent.futures
import requests
from bs4 import BeautifulSoup

urls = [
  f'https://www.cnblogs.com/#p{page}'
  for page in range(1, 4 + 1)
]

def crawl(url):
  html = requests.get(url).text
  print(f'current urls is :{url}')
  return html

def parse(html):
  soup = BeautifulSoup(html, 'html.parser')
  return soup.title.text

def main():
  with concurrent.futures.ThreadPoolExecutor() as executor:
    htmls = executor.map(crawl, urls)
  print('crawl done')

  with concurrent.futures.ThreadPoolExecutor() as executor:
    urls_htmls = zip(urls, htmls)
    futures = {}
    for url, html in urls_htmls:
      future = executor.submit(parse, html)
      futures[future] = url

    # 顺序的输出
    # for future, url in futures.items():
    #   print(f'{url}: {future.result()}')

    # 无序的输出
    for future in concurrent.futures.as_completed(futures):
      url = futures[future]
      print(f'{url}: {future.result()}')

  print('parse done')

if __name__ == '__main__':
  main()

web 服务中使用线程池

web服务的特点

web 中使用线程池的好处

  • 方便的将磁盘文件,数据库,远程API的IO调用并发执行
  • 线程池的数目不会无限创建(导致系统挂掉),具有防御功能

代码实战

  • 没有 thread 的情况下,模拟 600ms 的延时
  • 有 thread pool 的情况下,优化(不过,本机优化的效果不明显)
import flask
import time
import json

app = flask.Flask(__name__)


def result_file():
  time.sleep(0.1)
  return 'file'


def result_db():
  time.sleep(0.2)
  return 'db'


def result_api():
  time.sleep(0.3)
  return 'api'


@app.route('/')
def index():
  res_file = result_file()
  res_db = result_db()
  res_api = result_api()

  return json.dumps({
    'file': res_file,
    'db': res_db,
    'api': res_api,
  })


if __name__ == '__main__':
  app.run()
$ time curl http://127.0.0.1:5000/
{"file": "file", "db": "db", "api": "api"}
real	0m0.757s
user	0m0.016s
sys	0m0.077s

import flask
import time
import json
from concurrent.futures import ThreadPoolExecutor

app = flask.Flask(__name__)
pool = ThreadPoolExecutor()


def result_file():
  time.sleep(0.1)
  return 'file'


def result_db():
  time.sleep(0.2)
  return 'db'


def result_api():
  time.sleep(0.3)
  return 'api'


@app.route('/')
def index():
  res_file = pool.submit(result_file)
  res_db = pool.submit(result_db)
  res_api = pool.submit(result_api)

  return json.dumps({
    'file': res_file.result(),
    'db': res_db.result(),
    'api': res_api.result(),
  })


if __name__ == '__main__':
  app.run()
$ time curl http://127.0.0.1:5000/
{"file": "file", "db": "db", "api": "api"}
real	0m0.667s
user	0m0.017s
sys	0m0.016s

参考