python学习:python并发编程 01-06
一套讲得不错的并发编程课程
课程目录
- 01-并发编程简介
- 02-怎样选择:多线程/多进程/多协和
- 03-Python速度慢的罪魁祸首,全局解释器锁GIL
- 04-使用多线程,Python爬虫被加速10倍
- 05-Python实现生产者消费者爬虫
- 06-Python线程安全问题以及解决方案
- 07-Python好用的线程池ThreadPoolExecutor
- 08-Python使用线程池在Web服务中实现加速
- 09-使用多进程multiprocessing模块加速程序的运行
- 10-Python在Flask服务中使用多进程池加速程序运行
- 11-Python异步IO实现并发爬虫
- 12-在异步IO中使用信号量控制爬虫并发度
并发编程简介
为什么要并发编程?
- 提升程序的整体运行速度
- 高级技能,必备的
有哪些程序提速方法?
- 单线程串行
- 多线程并发
- 多CPU并行
- 多机器并行
python 对并发的支持
- 多线程 threading
- 多进程 multiprocession
- 异步IO asyncio
- 使用 lock,对资源进行加锁,防止冲突
- 使用 Queue,实现不同线程/进程之间的数据通信
- 线程Pool/进程Pool
- subprocess启动外部进程
如何选择
- 多线程 Thread
- 多进程 Process
- 多协程 Coroutine
CPU密集型计算
- CPU bound(受限)
- CPU需要很大量的计算,CPU占有率高
- IO 很快
IO 密集型
- IO bound(受限)
- 大部分状况是在 IO(磁盘/内存/网络)的读写操作
- CPU占有率很低
多线程/进程/协程
- 多进程: 比较消耗性能
- 多线程: 在 python中有 GIL(全局解释性锁)限制,只能利用一个CPU,线程间的切换有开销
- 多协程: 技术较新,开销小,性能好,但支持库较少
全局解释性锁 GIL
- 任何情况只有一个线程在执行
- GIL同一时间执行一个线程
- 为什么有 GIL: 初期引入,为了规避并发的问题,现在很难去掉
- GIL根源: python的内存管理是用引用计数器机制,防止线程间的切换,不小心释放了其它程序的内存,导致意外情况发生
Python慢的原因
- 解释性语言: 边解释边执行
- 无类型:各种类型判断
- GIL: 无法利用多核CPU,
python创建多线程方法
线程 threading
的使用改进爬虫
import threading
import requests
urls = [
f'https://cnblogs.com/#p{page}'
for page in range(1, 4)
]
def crawl(url):
response = requests.get(url)
print(url, len(response.text))
def single_thread():
for url in urls:
crawl(url)
def multiple_thread():
threads = []
for url in urls:
thread = threading.Thread(
target=crawl,
args=(url,)
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# 运行多线程版本
multiple_thread()
Colipolt
代码提示,改进版本
import requests
import time
import threading
urls = [
f'https://www.cnblogs.com/#p{page}'
for page in range(1, 50 + 1)
]
def single_spider(url):
res = requests.get(url)
print(url, len(res.text))
def multi_thread_spider(urls):
threads = []
for url in urls:
t = threading.Thread(target=single_spider, args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
if __name__ == '__main__':
# calc duraiton:
start = time.time()
multi_thread_spider(urls)
end = time.time()
print('duration: ', end - start)
Queue 实现生产者/消费者爬虫
第1步:改写爬虫为生产者,消费者模式
import threading
import requests
from bs4 import BeautifulSoup
urls = [
f'https://cnblogs.com/#p{page}'
for page in range(1, 4)
]
# 生产者
def crawl(url):
response = requests.get(url)
return response.text
# 消费者
def parse(html):
soup = BeautifulSoup(html, 'html.parser')
# link css selector: .post-item-title
links = soup.select('.post-item-title')
results = []
for link in links:
href = link.get('href')
title = link.get_text()
results.append({
'href': href,
'title': title
})
return results
html = crawl(urls[0])
res = parse(html)
print(res)
第2步: 多线程版本的生产者/消费者模式
import threading
import requests
import queue
import time
import random
from bs4 import BeautifulSoup
urls = [
f'https://cnblogs.com/#p{page}'
for page in range(1, 11)
]
# 生产者
def crawl(url):
response = requests.get(url)
return response.text
# 消费者
def parse(html):
soup = BeautifulSoup(html, 'html.parser')
# link css selector: .post-item-title
links = soup.select('.post-item-title')
results = []
for link in links:
href = link.get('href')
title = link.get_text()
results.append({
'href': href,
'title': title
})
return results
def do_crawl(url_queue, html_queue):
while True:
url = url_queue.get()
html = crawl(url)
html_queue.put(html)
print(
threading.current_thread().name,
'url_queue size: ', url_queue.qsize(),
)
time.sleep(random.randint(1, 2))
def do_parse(html_queue, file):
while True:
html = html_queue.get()
results = parse(html)
for result in results:
file.write(str(result) + '\n')
print(
threading.current_thread().name,
'html_queue size: ', html_queue.qsize(),
)
time.sleep(random.randint(1, 2))
# 这里的线程启动没有 join
if __name__ == '__main__':
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in urls:
url_queue.put(url)
file = open('results.txt', 'w')
# create 5 threads for crawl
for _ in range(5):
t = threading.Thread(target=do_crawl, args=(
url_queue, html_queue), name='crawl')
t.start()
# create 3 threads for parse
for _ in range(3):
t = threading.Thread(target=do_parse, args=(
html_queue, file), name='parse')
t.start()
print('main thread end')
线程安全问题以及 Lock
- 什么线程安全问题?
取钱问题代码演示
- 无线程锁
- 有
lock
情况,处理线程安全问题
import threading
import time
# 账户
class Account:
def __init__(self, balance):
self.balance = balance
# 转账
def withdraw(account, amount):
if account.balance >= amount:
time.sleep(0.01)
account.balance -= amount
print(threading.current_thread().name, "取钱成功,余额为:", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足")
if __name__ == "__main__":
account = Account(1000)
t1 = threading.Thread(target=withdraw, args=(account, 800), name="t1")
t2 = threading.Thread(target=withdraw, args=(account, 800), name="t2")
t1.start()
t2.start()
t1.join()
t2.join()
在多线程的情况下,无锁会出现异常。
t2 取钱成功,余额为: 200
t1 取钱成功,余额为: -600
lock = threading.Lock()
+ with lock 解决并发问题
import threading
import time
# 用 lock 解决并发问题
lock = threading.Lock()
# 账户
class Account:
def __init__(self, balance):
self.balance = balance
# 转账
def withdraw(account, amount):
# 获取锁
with lock:
if account.balance >= amount:
time.sleep(0.01)
account.balance -= amount
print(threading.current_thread().name, "取钱成功,余额为:", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足")
if __name__ == "__main__":
account = Account(1000)
t1 = threading.Thread(target=withdraw, args=(account, 800), name="t1")
t2 = threading.Thread(target=withdraw, args=(account, 800), name="t2")
t1.start()
t2.start()
t1.join()
t2.join()