单线程 Python 应用程序在性能上受到限制:它们按顺序执行任务,并且不利用多核处理器。此外,此类程序无法同时处理许多作,尤其是在涉及 I/O 任务(例如网络请求或读取文件)时。
通过在代码中实施并行计算、并发或异步编程,可以显著提高性能。为此,Python 提供了多处理、线程和 asyncio 等工具。
多处理、线程和 Asyncio:有什么区别?
首先,让我们记住什么是线程和进程。
进程是单独的程序,每个程序都有自己的内存,可以在不同的处理器内核上并行运行。
线程是同一进程的一部分,它们同时执行任务并共享内存以执行此作。
-
threading 模块启动线程以在单个进程中并发工作。
-
asyncio 库异步管理任务,无需额外的线程或进程。
-
multiprocessing 模块创建用于在多个内核上并行执行的进程。
让我们来看看这些工具是如何工作的以及它们的使用地点。
多线程和并发任务执行
threading 模块允许您在单个进程中运行多个线程以并发执行任务。
并发是程序管理多个任务的能力,以便从用户的角度同时执行这些任务,即使它们在物理上是交错的。例如,当一个任务正在等待来自服务器的响应时,另一个任务可以处理数据。
线程处理中的线程以类似的方式工作 — 它们在任务之间切换并共享进程的公共内存和资源。
使用线程
该模块的主要工具是 Thread 类,它允许您在单独的线程上运行函数。
基本语法:
import threading
def task(name):
print(f"Задача {name} выполняется в потоке {threading.current_thread().name}")
t = threading.Thread(target=task, args=("A",))
t.start()
t.join()
在这里,start() 方法启动线程,join() 强制主代码等待它完成。
GIL 及其对多线程的影响
Python 的标准实现 (CPython) 具有限制多线程的全局解释器锁 (GIL)。GIL 在任何给定时间都只允许一个线程在解释器中执行,即使在多核处理器上也是如此。
这意味着线程中的线程不能并行使用多个内核进行计算。因此,对于速度取决于处理器的 CPU 密集型任务(例如,复杂的数学计算或处理大量数据),它们的效率低下。
但是在 I/O 绑定的任务中,程序会等待大量外部作,例如网络请求或读取文件,GIL 不会干扰。当一个作等待完成时,线程会相互切换。
线程同步:锁、信号量、事件
由于线程共享共享内存,因此可能会发生冲突,例如,如果两个线程同时更改同一变量。为了防止它们,使用了同步工具:
- Lock 会阻止对资源的访问,一次只允许一个线程处理该资源。这可以防止 “数据竞争”,即结果取决于线程的运行顺序。
例:
import threading
import time
shared_resource = 0
lock = threading.Lock()
def increment():
global shared_resource
thread_name = threading.current_thread().name
print(f"Поток {thread_name} пытается увеличить значение")
with lock:
current_value = shared_resource
# Имитация небольшой задержки
time.sleep(0.1)
shared_resource = current_value + 1
print(f"Поток {thread_name} увеличил значение до {shared_resource}")
threads = [threading.Thread(target=increment) for in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Итоговое значение: {sharedresource}")
在这里,Lock 确保 shared_resource 正好增加 10 倍,而不是由于同时更改而丢失值。
输出将显示不同的线程在等待前一个线程释放锁时如何轮流访问资源。如果不使用 Lock,则由于作重叠,结果可能小于 10。
- 信号量限制可以同时访问资源的线程数。当您想要授予对多个线程的访问权限,但不是一次授予所有线程的访问权限时,这很有用。
例:
import threading
import time
# Ставим ограничение на максимум 2 одновременных потока
semaphore = threading.Semaphore(2)
def task(name):
with semaphore:
print(f"Поток {name} начал работу")
time.sleep(1)
print(f"Поток {name} завершил работу")
threads = [threading.Thread(target=task, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
在这种情况下,只有两个线程同时工作,其余线程等待 “空间” 变为可用。输出将显示任务以两个为一组的批处理执行。
- 事件允许一个线程等待来自另一个线程的信号继续工作。这对于协调线程之间的作非常有用。
例:
import threading
import time
event = threading.Event()
def waiter():
print("Ожидаю сигнала...")
event.wait()
print("Сигнал получен, продолжаю работу")
def signaler():
time.sleep(2)
print("Отправляю сигнал")
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=signaler)
t1.start()
t2.start()
t1.join()
t2.join()
在这里,等待线程等待信号调用 event.set() 以继续执行。
多线程编程示例
threading 模块在任务花费大量时间等待外部资源(I/O 绑定任务)的情况下特别有用。也就是说,任务的执行速度不受 CPU 计算的限制,而是受 input/output作的限制,这通常与长时间延迟有关。
线程允许您在它们之间切换:当一个任务等待作完成时,另一个任务可以开始工作。
我们来看两个示例:在后台加载网页和处理任务。
竞争性 URL 加载
从 Internet 加载数据是 I/O 任务的一个典型示例,其中顺序执行效率可能较低。
import threading
import urllib.request
urls = ["https://python.org", "https://example.com"]
def fetch_url(url):
response = urllib.request.urlopen(url)
print(f"Загружен {url}, длина: {len(response.read())}")
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
此代码为每个 URL 创建一个单独的线程,使用 start() 启动它们,并通过 join() 等待完成。因此,页面加载具有竞争力,这比顺序执行更快,尤其是在网络延迟很大的情况下。
在后台处理任务
线程可用于在主程序继续运行时执行日志记录或监视等任务。
后台日志记录示例
import threading
import time
def background_logger(logs):
while True:
if logs:
print(f"Лог: {logs.pop(0)}")
time.sleep(1)
def main_task(logs):
for i in range(5):
logs.append(f"Событие {i} в основном потоке")
print(f"Основной поток работает: шаг {i}")
time.sleep(0.5)
logs = []
logger_thread = threading.Thread(target=background_logger, args=(logs,), daemon=True)
logger_thread.start()
main_task(logs)
在此示例中,background_logger线程在后台运行,在主线程执行其任务时定期显示日志列表中的消息。daemon=True 属性意味着后台线程将与主程序一起终止。
使用 asyncio 进行异步编程
异步编程允许您在不阻塞主线程的情况下执行任务,这对于 I/O 密集型任务特别有用。与线程不同,线程用于并发,asyncio 通过事件循环管理单个线程上的任务。
这减少了创建线程的开销,适用于 I/O 密集型应用程序,例如 Web 爬虫或服务器。
同步代码和异步代码有什么区别?
异步代码与同步代码的不同之处在于,它不会在等待作时阻止程序的执行,而是通过事件循环在同一线程上的任务之间切换。
-
同步代码:带锁定的顺序执行,与队列进行比较。
-
异步代码:无需等待即可运行任务,在事件循环之间切换。
asyncio 基础知识:async 和 await
asyncio 库基于具有 Python 3.5 中引入的 async 和 await 关键字的协程。它们允许您编写在可读性方面类似于同步代码的异步代码。
-
async def 定义了一个协程,该协程可以暂停而不会阻止程序执行。
-
AWIT 指定协程应等待另一个异步作完成的位置,从而释放对其他任务的控制权。
例:
import asyncio
async def say_hello():
print("Начинаем...")
# Ждем 1 секунду, не блокируя
await asyncio.sleep(1)
print("Привет!")
async def main():
# Запускаем корутину
await say_hello()
# Запускаем программу
asyncio.run(main())
在这里,asyncio.sleep(1) 模拟异步作(例如对服务器的请求),而 await 允许程序继续运行,直到等待完成。asyncio.run() 函数是运行异步代码的标准方法。
事件循环和任务管理
asyncio 中的 Asynchrony 之所以有效,要归功于 Event Loop,这是一种机制,通过在协程等待作时在协程之间切换来管理协程的执行。
当一个协程等待来自网络的响应时,另一个协程可以处理其数据。
要计划多个任务,请使用 asyncio.create_task() :
async def task(name):
print(f"Задача {name} началась")
await asyncio.sleep(1)
print(f"Задача {name} завершилась")
async def main():
task1 = asyncio.create_task(task("A"))
task2 = asyncio.create_task(task("B"))
await task1 # Ждем первую задачу
await task2 # Ждем вторую
asyncio.run(main())
在此示例中,两个任务同时运行,事件循环在它们之间切换。create_task() 方法将协程转换为 asyncio 独立执行的任务。
异步 HTTP 请求
Asyncio 与其他库(如 aiohttp)配合使用,以实现快速、有竞争力的页面加载。
与使用 urllib.request 的同步页面加载不同,每个页面都是按顺序加载的,而 aiohttp 会同时向多个 URL 发送请求,而不会阻塞主线程。
例:
import asyncio
import aiohttp
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
print(f"Загружен {url}, длина: {len(content)}")
async def main():
urls = ["https://python.org", "https://example.com"]
tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
await asyncio.wait(tasks)
asyncio.run(main())
因此,如果一个页面响应缓慢,另一个页面不会等待它。与同步代码相比,这大大减少了总时间,因为同步代码的延迟是相加的。
使用套接字
Asyncio 允许您创建异步服务器和客户端来处理连接。
考虑一个示例,服务器从客户端接收消息,并通过将文本转换为大写字母将其发回:如果客户端发送“hello”,则服务器使用“HELLO”进行响应。
import asyncio
async def handle_client(reader, writer):
data = await reader.read(100)
writer.write(data.upper())
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
此服务器接受消息并以大写字母响应,同时处理多个客户端。
多进程和并行计算
并行计算是一种同时执行多个计算任务的方法,分布在可用资源(如处理器内核)中。此方法可让您加快程序的执行速度。
并行计算的一种形式是多进程,其中任务分布在多个独立的进程中。在 Python 中,它是使用 multiprocessing 模块实现的。
使用 CPU 密集型任务
Multi-process 使用自己的 Python 解释器实现,绕过 GIL。这允许处理器的所有内核并行使用,并提供真正的并行性,而不仅仅是像 threading 或 asyncio 那样的并发性。
因此,multiprocessing 模块是 CPU 密集型任务的最佳选择,而不是 I/O 密集型场景,在这种场景中,线程或异步效果更好。
multiprocessing 模块的工作原理
multiprocessing 模块创建单独的进程,每个进程都使用自己的 Python 解释器。
例:
from multiprocessing import Process
def square(number):
print(f"Квадрат числа {number}: {number * number}")
if name == "__main__":
processes = []
for i in range(5):
p = Process(target=square, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
在此示例中,创建了 5 个进程,每个进程都计算一个数字的平方。start() 方法启动进程,join() 等待它完成。
Multiprocessing 具有以下工具:
- Process 是一个用于创建和管理单个流程的类。
您需要指定目标函数 (target) 和参数 (args),然后使用 start() 方法启动进程。
例:
from multiprocessing import Process
def square(number):
result = number ** 2
print(f"Квадрат числа {number}: {result}")
if name == "__main__":
processes = []
for i in range(5):
p = Process(target=square, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
在这里,我们创建了 5 个进程,每个进程都计算数字的平方。检查 name 是否为 == “main” 是必要的,以避免在启动进程时递归导入。
这在 Windows 上尤其重要,因为 Windows 中的进程是通过 spawn 和重新导入模块创建的,但对于代码到其他平台(如 Linux 或 macOS)的可移植性也很重要,在这些平台上,行为取决于启动方法。
- Pool 是一个用于创建进程池的类,该进程池可自动在多个进程之间分配任务。map() 方法允许您将函数应用于数据列表,并在池中的进程之间划分工作。
例:
from multiprocessing import Pool
def cube(number):
return number ** 3
if name == "__main__":
# Определяем пул из 3 процессов
with Pool(3) as pool:
results = pool.map(cube, [1, 2, 3, 4, 5])
print(results)
在这种情况下,Pool 在三个进程之间分配数字立方体的计算。每个进程都占用列表 [1, 2, 3, 4, 5] 的一部分,执行任务,并返回一个结果,该结果被组合成一个最终列表。
方法比较
我们来看看这三种方法有何不同:并发、异步和并行。
多处理 | 线程 | asyncio |
---|---|---|
它使用进程,绕过 GIL,适用于 CPU 密集型任务(计算、数据处理)。 由于进程隔离,需要的资源比线程多。 | 它在单个进程中与线程一起工作,仅限于 GIL,并且对于等待优先于计算的 I/O 密集型任务(网络、文件)有效。 比流程更容易、更快速地创建。 | 通过事件循环在单个线程中的异步非常适合具有大量作(例如 Web 请求)的 I/O 绑定任务。 不适合 CPU 密集型任务,但可以最大限度地减少开销。 |
代码性能优化
要优化代码,您首先需要通过性能分析找到性能瓶颈。
性能分析是对代码的分析,用于识别瓶颈,即程序花费最多时间或资源的领域。
Python 有几个工具可以做到这一点:
- cProfile 是一个内置模块,用于测量每个函数的执行时间及其调用次数。
例:
import cProfile
def slow_function():
return sum(i * i for i in range(1000000))
cProfile.run("slow_function()")
输出将显示一个包含列的表:ncalls (调用次数), tottime (函数中的总时间), cumtime (考虑子函数的时间)。例如,如果一个周期需要 0.5 秒,您将看到它是瓶颈。
- 时间是测量总执行时间的简单工具,适用于比较方法(例如,顺序和并行)。
例:
import time
from multiprocessing import Pool
def square(n):
return n * n
start = time.time()
with Pool(4) as pool:
pool.map(square, range(1000))
print(f"Время: {time.time() - start:.2f} сек")
这使您可以快速检查 multiprocessing 是否加快了代码速度(例如,在单线程变体中为 0.1 秒而不是 0.4 秒)。
- 其他工具。line_profiler 模块显示每行代码的执行时间(需要通过 pip install line_profiler 安装),Py-Spy 是一个可视化分析器,可以实时构建程序的图表。
分析后,使用结果进行优化:
-
定义任务的类型。 如果瓶颈是计算 (CPU 受限),请切换到多处理。要等待 I/O 绑定,请使用 asyncio 或 threading。
-
并行化繁重的作。 如果 cProfile 显示函数耗时较长,请拆分数据并使用 Pool 跨核心分配。
-
最大限度地减少开销。不要为小型任务创建进程或线程 - 运行 Process 比从短作的并发性中受益更昂贵。
-
测试更改。在优化之前和之后测量时间,以确保多进程或异步实际上可以加快代码速度。
您可以在 MEPhI 和 Skillfactory 的软件工程硕士课程中从头开始学习成为一名程序员,并将您的技能提高到技术主管或软件架构师。在实际业务任务中磨练您的技能并获得经验。