你的Python程序实际上可以并行使用多少个 CPU 内核?

在运行 CPU 密集型并行程序时,通常需要根据机器上 CPU 内核的数量来确定线程或进程池的大小。线程数量少了,就无法充分利用所有内核;线程数量多了,程序运行速度就会开始变慢,因为多个线程会争夺同一个内核。反正理论上是这样。

那么,如何检查计算机有多少个内核?这个建议真的好吗?

事实证明,要确定运行多少个线程非常棘手:

  • Python 标准库提供了多个 API 来获取这些信息,但都不够充分。
  • 更糟的是,由于指令级并行和同步线程(又称 Intel CPU 上的超线程)等 CPU 特性,您能有效使用的内核数量取决于您编写的代码!

让我们来看看为什么计算程序可以使用多少个 CPU 内核如此困难,然后再考虑可能的解决方案。

用 Python 获取 CPU 内核数

如果阅读 Python 标准库文档,它有一个 os.cpu_count() 函数,可以返回 “系统中逻辑 CPU 的数量”。逻辑是什么意思?我们稍后再讨论。

文档还告诉你,”len(os.sched_getaffinity(0)) 可以获取当前进程的调用线程被限制使用的逻辑 CPU 数量”。调度器亲和性是一种限制进程使用特定内核的方法。

遗憾的是,这种应用程序接口也不够充分。例如,在 Linux 上,用于实现 Docker 和其他容器系统的 cgroups API 有多种限制 CPU 使用的方法。在这里,我们将 CPU 限制为 2.25 个内核:

$ docker run -i -t --cpus=2.25 python:3.12-slim
Python 3.12.1 (main, Dec  9 2023, 00:21:37) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> os.cpu_count()
20
>>> len(os.sched_getaffinity(0))
20

实际上,我们只能使用 2.25 个 CPU,但两个应用程序接口都不知道这一点。

什么是逻辑 CPU?

操作系统选项只是问题的开端,在举例说明之前,我们需要了解什么是物理 CPU 内核和逻辑 CPU 内核。我的电脑使用的是英特尔 i7-12700K 处理器

  • 12 个物理内核(8 个高性能内核和 4 个性能较弱的内核)。
  • 20 个逻辑内核。

现代 CPU 内核可以并行执行多条指令。但是,如果 CPU 在等待从 RAM 中加载某些数据时卡住了,会发生什么情况呢?在此之前,它可能无法执行任何工作。

为了利用这些可能被浪费的资源,CPU 物理内核会在操作系统面前假装成多个内核。在我的 CPU 上,8 个速度更快的内核可以假装成两个内核,总共有 16 个逻辑内核。成对的逻辑内核将共享同一个物理内核。如果一个逻辑内核没有充分利用所有内部算术逻辑单元,例如因为它在等待内存加载,那么通过配对逻辑内核运行的代码仍可使用这些闲置资源。

这种技术被称为同步多线程技术,英特尔公司称之为超线程技术。如果你有一台个人电脑,通常可以在 BIOS 中禁用它。

现在我们又有了一个新问题。抛开调度器亲和性等因素不谈,我们应该使用物理内核数还是逻辑内核数作为线程池大小?

一个令人尴尬的并行例子

让我们来看看用 Numba 编译成机器代码的两个函数。我们确保释放 GIL 以实现并行性。

这两个函数做同样的事情,但其中一个比另一个快得多。我们可以在多个线程上并行运行这些函数,理论上可以线性提高吞吐量,直到内核耗尽为止。

from numba import njit
import numpy as np

@njit(nogil=True)
def slow_threshold(img, noise_threshold):
    noise_threshold = img.dtype.type(noise_threshold)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            result[i, j] = img[i, j] // 256
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            if result[i, j] < noise_threshold // 256:
                result[i, j] = 0
    return result

@njit(nogil=True)
def fast_threshold(img, noise_threshold):
    noise_threshold = np.uint8(noise_threshold // 256)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            value = img[i, j] >> 8
            value = (
                0 if value < noise_threshold else value
            )
            result[i, j] = value
    return result

我们将运行该函数处理图像,并测量其运行所需的时间:

rng = np.random.default_rng(12345)

def make_image(size=256):
    noise = rng.integers(0, high=1000, size=(size, size), dtype=np.uint16)
    signal = rng.integers(0, high=5000, size=(size, size), dtype=np.uint16)
    # A noisy, hard to predict image:
    return noise | signal

NOISY_IMAGE = make_image()
assert np.array_equal(
    slow_threshold(NOISY_IMAGE, 1000),
    fast_threshold(NOISY_IMAGE, 1000)
)

下面是在单核心上运行每个功能所需的时间:

%timeit slow_threshold(NOISY_IMAGE, 1000)
90.6 µs ± 77.7 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

%timeit fast_threshold(NOISY_IMAGE, 1000)
24.6 µs ± 10.8 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

扩展到多个线程

现在我们有了几个函数,我们将设置一种方法,利用线程池处理给定的图像列表:

from multiprocessing.dummy import Pool as ThreadPool

def apply_in_thread_pool(
    num_threads, function, images
):
    with ThreadPool(num_threads) as pool:
        for image in images:
            result = pool.map(
                lambda img: function(img, 1000),
                images
            )
            assert len(result) == len(images)

接下来,我们将使用 benchit 库(也可以使用 perfplot,但要注意它是 GPL 许可的)绘制不同线程数运行不同函数所需的时间图:

import benchit
benchit.setparams(rep=1)

# 400 images to run through the pool:
IMAGES = [make_image() for _ in range(400)]

def slow_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, slow_threshold, IMAGES)

def fast_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, fast_threshold, IMAGES)

# Measure the two functions with 1 to 24 threads:
timings = benchit.timings(
    [slow_threshold_in_pool, fast_threshold_in_pool],
    range(1, 25),
    input_name="Number of threads"
)
timings.plot(logy=True, logx=False)

请注意运行时间是如何随着线程数的增加而缩短的……直到一个点。之后运行时间又开始变短。到目前为止,这正是我们所期望的。但也有出乎意料的地方:每个函数的最佳线程数都不同。

timings.to_dataframe().idxmin(axis="rows")
函数 最佳线程数
slow_threshold 19
fast_threshold 8

并行性的最佳效果也取决于你的代码

我们的慢速函数基本上可以利用所有逻辑内核。可能它没有充分利用特定物理内核的所有可用处理能力,因此逻辑内核允许更多并行性。

相比之下,我们速度更快的函数只能利用不超过 8 个内核;超过 8 个内核后,速度开始减慢。也许它开始遇到计算以外的瓶颈,比如内存带宽。

对于这两个函数来说,线程池的大小都不是最佳的。

另辟蹊径:实证测量

在获得最佳线程数方面,我们遇到了很多问题:

  1. 考虑到操作系统限制 CPU 使用的所有不同方式,很难获得准确的内核数量。
  2. 最佳并行程度(如线程数)取决于工作量。更优化的代码可能无法利用额外的逻辑内核。
  3. 内核数量并不是唯一的瓶颈。
  4. 额外的问题:如果你在云中运行,你使用的是 “vCPU”,不管这意味着什么。不同的实例可能有不同的 CPU 型号。

因此,这里有另一种方法:在运行时根据经验发现最佳线程数。在上面的例子中,我们测量了特定代码的最佳线程数。如果你有一个长期运行的数据处理任务,需要在多个线程中运行相同的代码一段时间,你也可以这样做。也就是说,你可以在开始时花一点时间,根据经验测算出最佳线程数,或许还可以使用一些启发式方法来补偿工作量。

就运行时而言,如果您使用的是经验测量法,那么您就不需要关心为什么特定数量的线程是最佳的。无论硬件、操作系统配置或云环境如何,您都将使用最佳并行效果。

本文文字及图片出自 How many CPU cores can you actually use in parallel?

你也许感兴趣的:

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注