python学习:python并发编程 01-06

一套讲得不错的并发编程课程
更新于: 2022-10-29 07:20:30

课程目录

  • 01-并发编程简介
  • 02-怎样选择:多线程/多进程/多协和
  • 03-Python速度慢的罪魁祸首,全局解释器锁GIL
  • 04-使用多线程,Python爬虫被加速10倍
  • 05-Python实现生产者消费者爬虫
  • 06-Python线程安全问题以及解决方案
  • 07-Python好用的线程池ThreadPoolExecutor
  • 08-Python使用线程池在Web服务中实现加速
  • 09-使用多进程multiprocessing模块加速程序的运行
  • 10-Python在Flask服务中使用多进程池加速程序运行
  • 11-Python异步IO实现并发爬虫
  • 12-在异步IO中使用信号量控制爬虫并发度

并发编程简介

为什么要并发编程?

  • 提升程序的整体运行速度
  • 高级技能,必备的

有哪些程序提速方法?

  • 单线程串行
  • 多线程并发
  • 多CPU并行
  • 多机器并行
并发有哪些方法

python 对并发的支持

  • 多线程 threading
  • 多进程 multiprocession
  • 异步IO asyncio
  • 使用 lock,对资源进行加锁,防止冲突
  • 使用 Queue,实现不同线程/进程之间的数据通信
  • 线程Pool/进程Pool
  • subprocess启动外部进程
python 对并发编程的支持

如何选择

  • 多线程 Thread
  • 多进程 Process
  • 多协程 Coroutine

CPU密集型计算

  • CPU bound(受限)
  • CPU需要很大量的计算,CPU占有率高
  • IO 很快

IO 密集型

  • IO bound(受限)
  • 大部分状况是在 IO(磁盘/内存/网络)的读写操作
  • CPU占有率很低
CPU/IO密集型计算

多线程/进程/协程

  • 多进程: 比较消耗性能
  • 多线程: 在 python中有 GIL(全局解释性锁)限制,只能利用一个CPU,线程间的切换有开销
  • 多协程: 技术较新,开销小,性能好,但支持库较少

全局解释性锁 GIL

  • 任何情况只有一个线程在执行
  • GIL同一时间执行一个线程
  • 为什么有 GIL: 初期引入,为了规避并发的问题,现在很难去掉
  • GIL根源: python的内存管理是用引用计数器机制,防止线程间的切换,不小心释放了其它程序的内存,导致意外情况发生
GIL是什么

Python慢的原因

  • 解释性语言: 边解释边执行
  • 无类型:各种类型判断
  • GIL: 无法利用多核CPU,

python创建多线程方法

 

线程 threading 的使用改进爬虫

import threading
import requests

urls = [
    f'https://cnblogs.com/#p{page}'
    for page in range(1, 4)
]


def crawl(url):
    response = requests.get(url)
    print(url, len(response.text))


def single_thread():
    for url in urls:
        crawl(url)


def multiple_thread():
    threads = []
    for url in urls:
        thread = threading.Thread(
            target=crawl,
            args=(url,)
        )
        threads.append(thread)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

# 运行多线程版本
multiple_thread()

Colipolt 代码提示,改进版本

import requests
import time
import threading

urls = [
  f'https://www.cnblogs.com/#p{page}'
  for page in range(1, 50 + 1)
]


def single_spider(url):
  res = requests.get(url)
  print(url, len(res.text))


def multi_thread_spider(urls):
  threads = []
  for url in urls:
    t = threading.Thread(target=single_spider, args=(url,))
    t.start()
    threads.append(t)

  for t in threads:
    t.join()


if __name__ == '__main__':
  # calc duraiton:
  start = time.time()
  multi_thread_spider(urls)
  end = time.time()
  print('duration: ', end - start)

Queue 实现生产者/消费者爬虫

Queue的基本用法

第1步:改写爬虫为生产者,消费者模式

import threading
import requests
from bs4 import BeautifulSoup

urls = [
    f'https://cnblogs.com/#p{page}'
    for page in range(1, 4)
]

# 生产者
def crawl(url):
    response = requests.get(url)
    return response.text


# 消费者
def parse(html):
    soup = BeautifulSoup(html, 'html.parser')
    # link css selector: .post-item-title
    links = soup.select('.post-item-title')
    results = []
    for link in links:
        href = link.get('href')
        title = link.get_text()
        results.append({
            'href': href,
            'title': title
        })
    return results


html = crawl(urls[0])
res = parse(html)

print(res)

第2步: 多线程版本的生产者/消费者模式

import threading
import requests
import queue
import time
import random
from bs4 import BeautifulSoup

urls = [
    f'https://cnblogs.com/#p{page}'
    for page in range(1, 11)
]

# 生产者


def crawl(url):
    response = requests.get(url)
    return response.text


# 消费者
def parse(html):
    soup = BeautifulSoup(html, 'html.parser')
    # link css selector: .post-item-title
    links = soup.select('.post-item-title')
    results = []
    for link in links:
        href = link.get('href')
        title = link.get_text()
        results.append({
            'href': href,
            'title': title
        })
    return results


def do_crawl(url_queue, html_queue):
    while True:
        url = url_queue.get()
        html = crawl(url)
        html_queue.put(html)
        print(
            threading.current_thread().name,
            'url_queue size: ', url_queue.qsize(),
        )
        time.sleep(random.randint(1, 2))


def do_parse(html_queue, file):
    while True:
        html = html_queue.get()
        results = parse(html)
        for result in results:
            file.write(str(result) + '\n')

        print(
            threading.current_thread().name,
            'html_queue size: ', html_queue.qsize(),
        )
        time.sleep(random.randint(1, 2))

# 这里的线程启动没有 join
if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in urls:
        url_queue.put(url)

    file = open('results.txt', 'w')

    # create 5 threads for crawl
    for _ in range(5):
        t = threading.Thread(target=do_crawl, args=(
            url_queue, html_queue), name='crawl')
        t.start()

    # create 3 threads for parse
    for _ in range(3):
        t = threading.Thread(target=do_parse, args=(
            html_queue, file), name='parse')
        t.start()

    print('main thread end')

线程安全问题以及 Lock

  • 什么线程安全问题?
线程安全问题

取钱问题代码演示

  • 无线程锁
  • lock 情况,处理线程安全问题
import threading
import time


# 账户
class Account:
  def __init__(self, balance):
    self.balance = balance


# 转账
def withdraw(account, amount):
  if account.balance >= amount:
    time.sleep(0.01)
    account.balance -= amount
    print(threading.current_thread().name, "取钱成功,余额为:", account.balance)
  else:
    print(threading.current_thread().name, "取钱失败,余额不足")


if __name__ == "__main__":
  account = Account(1000)
  t1 = threading.Thread(target=withdraw, args=(account, 800), name="t1")
  t2 = threading.Thread(target=withdraw, args=(account, 800), name="t2")
  t1.start()
  t2.start()
  t1.join()
  t2.join()

在多线程的情况下,无锁会出现异常。

t2 取钱成功,余额为: 200
t1 取钱成功,余额为: -600

lock = threading.Lock() + with lock 解决并发问题

import threading
import time

# 用 lock 解决并发问题
lock = threading.Lock()

# 账户
class Account:
  def __init__(self, balance):
    self.balance = balance


# 转账
def withdraw(account, amount):
  # 获取锁
  with lock:
    if account.balance >= amount:
      time.sleep(0.01)
      account.balance -= amount
      print(threading.current_thread().name, "取钱成功,余额为:", account.balance)
    else:
      print(threading.current_thread().name, "取钱失败,余额不足")


if __name__ == "__main__":
  account = Account(1000)
  t1 = threading.Thread(target=withdraw, args=(account, 800), name="t1")
  t2 = threading.Thread(target=withdraw, args=(account, 800), name="t2")
  t1.start()
  t2.start()
  t1.join()
  t2.join()

参考