跳转至

现代高性能并发编程:异步、多进程和多线程

本章介绍了现代高性能并发编程的基本概念和技术,涵盖了异步编程、多进程编程和多线程编程的原理和实践。通过学习本章内容,读者将能够理解并应用这些技术来提升程序的性能和响应能力。

我们刚学编程的时候,代码大多数是“把所有的东西一口气做完”,代码一行接一行,像火车过隧道,即使漆黑一片也得走完。后来,我们发现,CPU核心越来越多,但硬盘、网络等数据来源的速度根本跟不上。我们发现,试图一口气把所有东西搬进内存时,CPU在睡觉;在等用户输入内容的时候,CPU就阻塞住,啥也不做。而这显然和实际生活中的情况不符:工人砌砖的时候,难道要等所有水泥全都送到才开始砌墙吗?显然不是的。

那为了让CPU不睡觉,我们就有这两个类似的思路: - 异步:让一个工人一边等待新的水泥,一边用手头显然不够的水泥砌墙; - 多线程:让多个工人同时工作,有的人去搬运水泥,有的人去砌墙。

这两种思路各有优缺点。下文就将会逐一阐述。

由于Python的方便性,本文前半部分的代码示例均使用Python编写。

从一口气做完到边做边等

假设我们写了这样一个脚本,顺序抓取100个网页并处理:

for url in urls:
    html = request.urlopen(url).read()  # 网络往返 200 ms
    process(html)                      # CPU 计算 5 ms
我们发现,CPU利用率竟然高达$ 5 \div 205 \approx 2.4 \% $!有高达 97\%的时间被阻塞在 read 这一步:在等待网络数据返回的时候,CPU啥也不干。

那怎么处理这个问题?一拍脑袋,“进程是资源的调度单位”,那我们就开多个进程同时抓取网页吧!

for url in urls:
    pid = os.fork()          # 经典 Unix 套路
    if pid == 0:             # 子进程
        single_fetch(url)
        os._exit(0)
这就是朴素的多进程编程了。

问题貌似解决了,但我们发现:fork瞬间复制父进程内存,100个进程同时跑,内存瞬间爆炸;而且,操作系统调度这么多进程,开销也不小。更捉急的是:子进程把结果存在哪儿?父进程怎么拿到结果?这都没想好,代码就乱成一锅粥了。

于是我们被迫继续思考:要么让进程“轻一点”,要么让进程“少一点”,甚至干脆就不要阻塞了!

异步:一个进程来回跳

事件循环和协程

单线程内,我们不如换一种思路:遇到IO不睡觉,我们先去做别的;等数据好了,再回来继续处理。这样一来,CPU就不会闲着了。

这就是事件循环的核心:

loop = get_event_loop()
for url in urls:
    loop.create_task(fetch(url))   # 立刻返回,不阻塞
loop.run_forever()

fetch 内部在真正读写数据时会 await ,也就是“暂时等着”,让出控制权、挂起当前协程:“Nobody blocks, Everybody cooperates”。于是CPU的利用率立刻提升,但内存并无显著提升。

事件循环的核心是“协程”(coroutine)。协程是一种“可暂停的函数”,它可以在执行过程中被挂起,等到某个条件满足时再恢复执行。我们经常在现成代码中看到的 asyncawait ,就是用来定义和使用协程的关键字,但管它是C#还是Python,本质上都是状态机和回调函数的语法糖。

那状态机和回调函数是什么呢?简单来说,状态机就是一个有多个状态的机器,每个状态都有自己的行为和转换规则;回调函数则是当某个事件发生时被调用的函数。协程通过状态机来管理自己的执行状态,通过回调函数来处理异步事件,从而实现非阻塞的执行。

于是,编译器或解释器把协程切成上半段和下半段,遇到 await 时就保存当前状态,中间插个注册回调函数的操作,等事件完成时再调用回调函数恢复状态,继续执行下半段代码。这样一来,单线程内就能实现多任务并发执行了;而对开发者而言,代码看起来和同步代码差不多,易读性大大提升。

异步的边界

异步的“不阻塞”指的仅仅是IO。一旦上述 process(html) 真就成了大型纯计算,那事件循环就被霸占,所有协程集体卡死——异步失效。

于是新问题就诞生了:计算太重,异步扛不住;进程太多,内存又爆炸。怎么办呢?有没有介于两者之间的方案?

先卖关子,当然我倒是可以告诉你答案是“线程”。但还得把进程讲完,再请这尊大佛。

多进程编程:尤里复制人

fork之后

刚刚我创建进程的时候,用了 os.fork() 。学过ICS的肯定认识这个函数:它会复制当前进程的内存,创建一个一模一样的新进程。新进程从 fork() 返回,父进程得到新进程的PID,子进程得到0。这是Unix-Like系统惯用的套路。

但这个套路有个大问题:内存被完全复制了!假设父进程有1GB内存,fork后子进程也有1GB内存,系统总共就得分配2GB内存。要是我fork 100个进程呢?那不就得200GB内存了吗?

实际上Unix-Like系统搞出这玩意之后,早就料到了“全量拷贝”还是太不“环保”了,于是引入了“写时复制”(Copy-On-Write, COW)技术:fork之后,父子进程共享同一块内存区域,读共享,谁写谁复制。这样一来,内存占用就大大减少了。

但这仍然挡不住“一页未写、逻辑却独立”的浪费。于是, execve 函数应运而生:它可以让子进程加载一个全新的程序镜像,彻底摆脱父进程的内存束缚,旧的地址空间直接丢掉,又省内存又解耦逻辑,干净利落。

于是新的经典模式诞生了:先 fork ,再 exec 。先复制父进程的环境,然后加载新程序。这样一来,子进程就有了自己的逻辑和内存空间,互不干扰。shell的启动流水线就是这么干的。

进程间的通信

但问题又来了:进程间如何通信?父子进程的内存空间独立了,数据如何传递?

经典的Unix哲学是“一切皆文件”,进程间通信(Inter-Process Communication, IPC)自然也离不开文件。于是,最常见的IPC方式无非匿名管道、命名管道(FIFO)、共享内存、消息队列和套接字等。

管道那可是Unix系的老朋友了,数据一头进一头出,简单高效;一般匿名管道只能父子进程之间用,命名管道则可以跨进程;共享内存比它们都要快,是页级别的共享,但是需要自己处理同步,不然数据乱套;消息队列则是更高级的IPC机制,支持优先级和异步通信;套接字则是网络编程的基础,可以实现跨机器通信。

换句话说,进程之间的通信本质上都是为了数据同步。即使我们确实是把一个任务拆成多个进程来做,但最终肯定还是要汇总结果的;在这个过程中,数据的传递和同步就显得尤为重要。但这些子任务肯定逻辑上也是有先后顺序的:比如说,先抓取网页,再处理网页内容。那我们就得确保抓取任务完成后,处理任务才能开始。要不然,那就乱套了。

进程池

既然进程开多了内存爆炸,开少了CPU利用率低,那我们不如折中一下,先生成一些进程让他们睡觉,任务来了就分配给它们去做,做完了接着睡。这个思路听起来不错,而这就是大名鼎鼎的“进程池”(Process Pool)。

所谓进程池,就是预先创建一组进程,任务来了就分配给空闲的进程去做,做完再回来等下一个任务。这样一来,既能提高CPU利用率,又能控制内存占用。

多进程编程

那有的人说我三纸无驴,讲了这么多才提到真正的多进程编程。我承认这种指摘,但是因为现代的多进程基本依附于进程池而存在,脱离进程池则多进程几乎无意义,所以只能把进程池讲完以后再提多进程编程了。

现代的多进程编程是典型的异步-多进程混搭:主进程做纯异步的IO调度,而子进程统统塞进进程池里做纯计算,通过UNIX Domain Socket或共享内存等等传递数据;主进程则仅负责事件循环和调度,绝不自我阻塞。这样一来,CPU利用率和内存占用都能得到很好的平衡。

现代编程中随处可见上述典型架构:Nginx+Flask/uWSGI、Chrome的GPU进程、VS Code的Extension Host,全都是这种异步主控、多进程打工的模式。学会这种模式,才能真正理解现代高性能编程的精髓。

来个简单示范:

import asyncio, multiprocessing as mp

def cpu_crunch(data):
    # 真正的计算,GIL 也挡不住的 CPU 密集
    return sum(i*i for i in data)

async def main():
    with mp.Pool() as pool:
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(pool, cpu_crunch, range(10_000_000))
        print("async 拿到结果:", result)

asyncio.run(main())

上述代码中,主进程运行一个异步事件循环,负责调度任务;而真正的计算任务则被分配给进程池中的子进程去执行,由 run_in_executor 方法把进程池包装成一个异步调用:异步和多进程永远不是二选一,而是组合拳。

那为什么还要多线程?

读到这儿,细心的你会问:“异步解决 I/O 等,进程池解决计算重,线程究竟图什么?”

那自然是有它的优势的: - 进程要切换内核态重跑,乃至页表刷新、TLB Flush,时间是微秒级别的;线程切换用户态就能完成,时间在百纳秒级别,开销小得多。 - 某些资源没法拷贝,比如大缓存、数据库连接,这些东西按着头也得共享。 - 部分系统调用一次返回的事件要分给多处消费,多线程监听一个资源比多进程方便得多。

如果你确实想弄明白,就继续往下看吧。但多线程的难度,那可是非常高的。下文我直接从Python切成C++来讲解,在硬件底层控制方面目前还没有比C和C++更好的语言了。(Rust是什么东西?先不提它。)

为了更好地理解多线程编程,我们需要先了解程序的执行原理。

程序执行原理

进程

我们知道,操作系统的本质是一个“资源管理器”,负责管理计算机的各种资源。而一个程序希望运行起来,就需要操作系统分配给它一些资源,比如CPU时间、内存空间等,因此操作系统需要“调度”程序的运行。而这个调度的基本单位就是进程(Process)。

我们暂时先仅考虑一个程序。当我们运行一个程序时,操作系统会为该程序创建一个进程,并分配相应的资源。进程是程序在操作系统中的一个实例,它包含了程序的代码、数据以及运行时的状态信息。每个进程都有自己独立的内存空间和系统资源,且一般不能与其他进程直接共享资源。

一般而言,一个程序就是一个进程。当一个程序被运行,也就是一个进程被创建时,操作系统会为该进程分配一个唯一的“进程标识符”(Process ID, PID),并为其分配内存空间和其他资源。进程中的代码会被加载到内存中,并开始执行。这个过程的资源开销是比较大的。进程之间的通信等也比较麻烦,需要通过管道、共享内存等机制来实现。

而现代计算机显然不能一个时间内仅运行一个程序。操作系统通过“时间片轮转”(Time Slicing)等调度算法,让多个进程交替使用CPU时间,从而实现多任务处理。这样,用户可以同时运行多个程序,而操作系统会在它们之间切换,使得每个程序都能获得一定的CPU时间。而这样的切换过程是由操作系统内核负责管理的,其开销也是比较大的。

线程

而在一些情况下,我们希望一个程序能够同时执行多个任务。以著名大型即时战略游戏《群星》为例,游戏同时需要计算AI决策、船只轨迹、空间与地面战斗逻辑、资源收支、人口增减,乃至图形渲染、音频播放等。如果把这些东西都写成一个同步的程序,那么假设CPU计算上述任务的时间为\(t_i\),则游戏内每过一天(也就是游戏开发常说的“游戏刻”或“tick”)所需的时间就是\(\sum t_i\),这样就会导致游戏的响应速度非常慢,玩家体验极差。而如果把上述任务全都写成不同的进程,那么操作系统就需要频繁地在这些进程之间切换,导致大量的时间浪费在进程切换上,游戏的响应速度虽然会比单进程好一些,但仍然不够理想。

我们知道,现代CPU往往是多核的,每一个核心都能基本独立地执行一些任务。能不能把上述复杂的计算任务拆成多个子任务,并让它们同时运行在不同的核心上,最终返回其结果,从而提升整体的计算效率呢?答案是肯定的。

我们引入线程(Thread)这个概念。线程指的是CPU调度的最小单位,可以理解为一个相对独立的计算任务,但是许多线程可以共享同一个进程的资源。线程之间可以共享进程的内存空间和其他资源,因此线程之间的通信和数据共享相对容易,开销也较小。

这样,一个复杂的计算任务(进程)就可以被拆分成许多小的线程,并让它们同时运行在不同的CPU核心上,从而提升整体的计算效率。或者说,进程是“项目组”,而线程则是“员工”;从国家或公司领取资源是以“项目组”的名义领取的,而具体的工作则是由“员工”来完成的。通过合理地分配任务,不同的员工同时进行不同的工作,就能提高整个项目组的工作效率。

以上述游戏为例,我们可以把AI决策、船只轨迹、空间与地面战斗逻辑等任务都拆成不同的线程,并让它们同时运行在不同的CPU核心上。这样,游戏每过一天所需的时间就变成了\(\max(t_i)\),大大提升了游戏的响应速度。

多线程编程基本思想

理论存在,但实践是检验真理的唯一标准。

多线程编程出现时立刻面临两个极端严重的问题,这两个问题直接影响结果的正确性以及程序能否执行下去: 1. 内存只有一块,多个线程怎么分配?会不会把东西都写到一块,结果彻底乱套? 2. 线程A在等线程B的结果,而线程B又在等线程A的结果,结果两个线程就这么干瞪眼怎么办? 这些问题,本质上是“并发”带来的问题。多线程编程的核心思想,就是要解决这些并发问题,从而实现高效且正确的并行计算。

并发和并行

并发”可不是“并行”。并发是“一起出发”,逻辑上同时处理多件事,例如一个熟练的厨师能一边备菜一边起锅烧油;而并行是“同时进行”,物理上同时处理多件事,例如多个厨师同时在不同的灶台上做菜。在单核的CPU上只能并发,而在多核的CPU或多个CPU上可以并行。写代码的时候,必须先严格地按并发思维设计,然后再考虑并行能不能提升,这关系到数据访问的正确性。否则,一旦并发逻辑设计错了,再多的并行也没用,结果只会错上加错。

问题抽象

上述三个问题,实际上可以抽象成两个基本问题:

竞态条件

多个线程同时读写同一块内存,至少一个是写,结果取决于执行顺序,典型表现为同样输入多次,得到的结果不同。

例如,线程A和线程B同时对变量\(x\)进行操作,线程A执行\(x = x + 1\),线程B执行\(x = x * 2\)。如果线程A先执行,那么最终结果是\((x + 1) * 2\);如果线程B先执行,那么最终结果是\(x * 2 + 1\)。这种情况下,结果取决于哪个线程先执行,导致程序行为不可预测。

死锁

多个线程互相等待对方释放资源,结果这些线程都永久阻塞,无法继续执行。其四个必要条件为线程互斥、占有且等待、不可剥夺、循环等待。简单来说,就是多个线程各自占有一些资源,并且还在等待其他线程占有的资源,而这些资源又不能被强制剥夺,最终导致所有线程都在等待对方释放资源,形成一个循环等待的局面。只要缺失一个条件,就不会发生死锁。

而这两个基本问题最终都可以找到起源:无序访问。也就是说,多个线程对共享资源的访问没有一个明确的顺序,导致了竞态条件和死锁的发生。因此,解决多线程编程中的并发问题,关键在于如何确保对共享资源的有序访问。

那有的同学就会说了,“只要访问有序不就得了?”没错,但问题是“有序”怎么保证?这就需要一些机制来实现了。

解决问题

为了解决上述并发问题,前人发明了多种手段来保证对共享资源的有序访问。

互斥量

这是最简单粗暴的手段。互斥量指的是:当一个线程试图访问某个共享资源时,必须先获得该资源的“”(Lock)。如果该资源已经被其他线程锁定,那么该线程就必须等待,直到资源被释放为止。这样,就能确保同一时间只有一个线程能够访问该资源,从而避免竞态条件的发生。

打个比方,一个人进门要先拿钥匙把门锁打开,进去了之后要把门锁上,别人才能进来。这样就保证了同一时间只有一个人能进屋子。

    std::mutex mtx; // 创建一个互斥量
    int shared_resource = 0; // 共享资源
    void thread_function() {
        mtx.lock(); // 获取锁
        // 访问共享资源
        shared_resource++;
        mtx.unlock(); // 释放锁
    }
当然上述代码比较简单粗暴,实践中一般用下面这个,构造函数获取锁,析构函数释放锁,避免忘记释放锁的问题:
    std::mutex mtx; // 创建一个互斥量
    int shared_resource = 0; // 共享资源
    void thread_function() {
        std::lock_guard<std::mutex> lock(mtx); // 自动获取和释放锁
        // 访问共享资源
        shared_resource++;
    }

条件变量

有时候,线程需要等待某个条件成立才能继续执行。条件变量(Condition Variable)就是用来实现这种等待机制的。线程可以在条件变量上等待,直到另一个线程通知它条件已经成立。这样就可以避免死锁的发生。

std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void thread_function() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return ready; }); // 等待条件成立
    // 继续执行
}
void signal_function() {
    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true; // 设置条件
    }
    cv.notify_all(); // 通知等待的线程
}

通过使用互斥量和条件变量等机制,我们可以有效地解决多线程编程中的并发问题,从而实现高效且正确的并行计算。在实际编程中,还需要根据具体的应用场景选择合适的同步机制,以确保程序的正确性和性能。

除了这两个家伙外,还有更激进的手段:

原子操作

原子操作(Atomic Operation)指的是一种不可分割的操作,即在执行过程中不会被中断。这样,多个线程同时执行原子操作时,从源头上就避免了竞态条件的发生。

std::atomic<int> shared_resource{0}; // 共享资源
shared_resource.fetch_add(1); // 原子加1,不会被中断
这个 fetch_add 函数会将共享资源加1,并返回加1前的值。由于这是一个原子操作,多个线程同时执行这个操作时,不会发生竞态条件。

无锁数据结构

无锁数据结构(Lock-Free Data Structure)是一种特殊的数据结构,设计时就考虑到了多线程环境下的并发访问。它们通过使用原子操作和内存屏障等技术,实现了在不使用锁的情况下,多个线程可以安全地访问和修改数据结构。

这样就能彻底摒弃互斥量,能把并行性能推向极致。但是其复杂度极高,我个人非常建议同学们先学会走路再学飞。这里就不展开讲解了。

新的问题与内存模型

但是,即使加了锁,仍然可能看到奇奇怪怪的数值。这是因为现代CPU和编译器为了提升性能,往往会对指令进行重排序(Instruction Reordering);每个核心也有自己的缓存行,写入也未必立即刷回主存(内存)。这可真是福祸相依:一方面,重排序能提升指令执行效率,但另一方面,也可能导致多线程程序中的数据访问顺序与预期不符,从而导致加了锁也看到了奇怪的数值。

那这就很麻烦了。对此,人们又发明了“内存模型”(Memory Model)这个概念。内存模型定义了多线程程序中不同线程对共享内存的访问顺序和可见性规则。通过使用内存屏障(Memory Barrier)等机制,可以确保某些操作在特定的顺序下执行,从而避免由于重排序和缓存引起的问题。该操作与原子操作配套,用 happens-before 关系来描述操作之间的顺序,从而解决上述问题。

初学阶段,用 std::memory_order_seq_cst 就行了,保证所有操作按程序顺序执行。等到性能瓶颈的时候,再考虑逐步降级到更松的内存顺序模型,如 acquirerelease ,以提升性能。

实践之前的碎碎念

多线程代码的调试成本是我们常规单线程的指数倍。因为多线程程序的行为往往是非确定性的,同样的输入可能会导致不同的执行路径和结果,这使得调试变得非常复杂。此外,多线程程序中可能存在竞态条件、死锁等并发问题,这些问题往往难以重现和定位。所以,在写多线程代码之前,请务必三思:先保证对,再考虑快

其行业共识是: - 能不用共享内存就不用。优先用消息队列、生产者-消费者模型、future-promise模型等值传递的方式来设计多线程程序,避免共享内存带来的复杂性,这样把实际上的并发问题转化为串行状态机的问题,简单且高效。 - 把可变状态局限起来。如果必须用共享内存,尽量把可变状态局限在某个特定的模块或类,所有访问走统一接口,内部用锁或无锁数据结构来保证有序访问。外部调用的人不需要关心并发细节,只需要调用接口即可。这样把并发可能带来的问题局限到最小范围内,便于调试和维护。 - 尽量用高级抽象。优先使用语言或库提供的高级并发抽象,比如线程池、并发容器、任务调度器等,避免直接操作线程和锁。这样能减少低级并发编程的复杂性,提高代码的可读性和可维护性。

因此,在实际编程中,建议先从简单的并发模型入手,逐步深入理解多线程编程的原理和技巧。只有在真正需要高性能并行计算时,才考虑使用复杂的多线程技术。切记:先保证对,再考虑快

相关模型解释

我们用点菜来解释两种常见的多线程编程模型。

我们去饭店吃饭,点了一份菜。下单时,厨房会给我们一张小票,然后厨房就在后面炒菜。等菜做好了就把菜端出来,放到传菜口。我们可以随时凭票去取餐,而不需要一直盯着厨房。

在上述实例中: - 厨房是生产者:负责制作菜品,也就是处理任务,产生结果。 - 我们是消费者:负责取餐和享用菜品,也就是后续的处理任务。 - 小票是Promise:指的是生产者对任务结果的承诺,在这里是用小票暂时地代替菜品。同时,我们拿到小票之后,它就变成了一个Future,是消费者对任务结果的期望,在这里表示未来可以凭它来取餐。 - 菜是结果:生产者完成的任务结果,在这里是菜品本身。 - 传菜口是消息队列:负责在生产者和消费者之间传递任务结果。

一般情况下,生产者持有Promise对象,消费者持有Future对象。生产者在完成任务后,通过Promise对象将结果传递给Future对象,而消费者则通过Future对象获取结果。而这就是多线程模型中的五个重要角色,它们构成了三个最重要的模型:生产者-消费者模型和Future-Promise模型,以及消息队列模型。

那么这些模型有什么关系?

首先,生产者-消费者模型其实是多线程乃至多进程中最常见的模型,它实际上是描述并发任务的根本模式,不论是消息队列还是承诺未来,实际上都是在生产者-消费者模式下的具体实现方式。

模型都涉及到异步处理,也就是消费者不需要一直等着生产者完成它需要的人物,而是可以先去做其他事情,等到需要结果时再去取。而生产者、消费者之间则通过某种中间机制(例如消息队列或Future-Promise)来传递结果。这两种方式都是仅传递结果,而不传递状态,防止直接的共享内存访问,从而避免了竞态条件的问题。

那么不同点在哪里呢?主要区别为以下五点: - 通信方式:消息队列模型通过消息队列进行通信,而Future-Promise模型通过Future和Promise对象进行通信。 - 数据传递:消息队列模型传递的是消息,而Future-Promise模型传递的是任务结果。 - 消费者行为:在消息队列模型中,消费者通常是主动从队列中获取消息,而在Future-Promise模型中,消费者通常是被动等待Future对象的结果。 - 结果获取方式:在消息队列模型中,消费者需要主动从队列中获取消息(如轮询等),而在Future-Promise模型中,消费者可以通过Future对象的接口来获取结果。 - 适用场景:消息队列模型适用于更大的任务,例如分布式系统、解耦服务乃至微服务架构等;而Future-Promise模型更适用于较小的任务,例如单个进程或线程内部的异步任务、并发编程等。

两者都有效地避免了直接的共享内存访问,降低了并发编程的复杂性,提高了系统的可靠性和可扩展性。

多线程的实践

实例:问题描述

用C++完成以下任务:

现在有一个文本文件,是UTF-8编码的纯英文文本,大小巨大(GB级别)。统计其中每个单词(连续的字母、数字、下划线)的出现频次,输出频次最高的十个单词以及其出现次数。你必须保证结果的正确性,并使得内存占用尽量可控,不一次性把整个文件读入内存。你自然是可以使用CMake等工具来管理你的项目的。大小写可以不敏感。

测试数据:可以用

wget https://git.savannah.gnu.org/cgit/grep.git/plain/README -O sample.txt
这个是GNU Grep的README文件,内容比较丰富。

原文档只有2.3KB,我们可以用下面的命令把它重复100次,然后把这个头尾相接,生成一个大文件:

for i in {1..100}; do cat sample.txt >> bigfile.txt; done
当然你可以用其他更大的文本文件来测试,也可以调大重复次数,以生成更大的文件。

实例:设计思路

思路是显然的,使用分块读取-独立统计-合并结果的Map-Reduce风格。考虑三大行业共识: - 不用共享内存:每个线程持有一个map,保证线程之间零共享数据,彻底消除竞态条件; - 局限可变状态:让唯一的可变状态为最终汇总的TotalCounts。这东西在所有线程结束之后,由主线程串行地合并,尽量缩短临界区。 - 使用高级抽象:用 jthreadfutureasync 分别进行线程管理、异步结果和并行任务拆分。

关键点:怎么把任务拆分到多个线程上?思路是把文件分成多个块,每个线程负责处理其中的一块。这个“分治”思想相当常见,应该在大二就学过了。为了避免线程数过少导致的负载不均衡问题,我们可以把文件分成更多的块,然后让线程池中的线程去处理这些块。这样就能充分利用多核CPU的计算能力。

而分治的合并往往又是一个难题:假如一个单词跨块了怎么办?解决办法是让每个块的处理函数返回一个“前缀”字符串,表示该块最后一个单词的前半部分。下一个块在处理时,先把这个前缀加到自己的开头,然后再进行统计。这样就能确保每个单词都被完整地统计到。但是!这种做法依然有一个问题:现在把一个文件分成若干块(A、B、C、……),B线程可能没拿到A的前缀就开跑了,结果导致A的最后一个单词和B的第一个单词依然被拆开了。为了解决这个问题,我们可以让每个块在处理时,先检查自己的前一个块是否已经处理完毕,并获取其前缀。如果前一个块还没有处理完毕,那么当前块就等待,直到前一个块处理完毕并返回前缀。这样就能确保每个块在处理时都能获取到正确的前缀,从而避免单词被拆开的情况。

这实在是太让人晕头转向了。不如考虑一个更简单的办法:在分割的时候就不要跨单词分割,也就是宁可多读一点,也不要把单词拆开。这样就能彻底避免上述问题。具体做法是:在分割文件时,先按固定大小分割成若干块,然后检查每个块的结尾是否为单词边界。如果不是,就继续向后读取,直到遇到下一个非单词字符为止。这样就能确保每个块都是完整的单词,从而避免单词被拆开的情况。毕竟还是那句话:先保证对,再考虑快

当然,如果你想挑战更高难度,也可以把上述问题中的“纯英文”改成“任意UTF-8编码的文本”,这样就需要考虑多字节字符的情况了,例如变音字符、汉字等。这个就留给同学们自行挑战了。

实例:实际工作

Note

这个是笔者写的一个示例实现,供同学们参考学习。本人并没有打过诸如HPC Game之类的比赛,这个实现经测试是正确的,但没有经过严格的性能调优,仅供学习多线程编程之用。实际上肯定会有很多可以改进的地方,欢迎同学们自行优化。

定义项目文件结构,使用CMake管理项目:

demo/
    CMakeLists.txt
    include/
        wc.hpp
    src/
        wc.cpp
        main.cpp

wc.hpp

#pragma once
#include <string>
#include <unordered_map>

using Freq = std::unordered_map<std::string, std::size_t>;

/* 统计文件,返回全局频次表 */
Freq count_file(const std::string& path);          // 可能抛 std::system_error

源文件wc.cpp

#include "wc.hpp"
#include <algorithm>
#include <cctype>
#include <cstring>
#include <filesystem>
#include <future>
#include <iterator>
#include <stdexcept>
#include <system_error>
#include <vector>
#include <thread>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

// ---------- 低层 OS 封装 ----------
namespace os {

// RAII 包装 mmap
class mapped_region {
public:
    mapped_region(const std::filesystem::path& p)
        : size_(std::filesystem::file_size(p)),
          data_(do_map(p.c_str(), size_)) {}

    ~mapped_region() { ::munmap(data_, size_); }

    mapped_region(const mapped_region&)            = delete;
    mapped_region& operator=(const mapped_region&) = delete;

    const char* data() const noexcept { return static_cast<const char*>(data_); }
    std::size_t size() const noexcept { return size_; }

private:
    static void* do_map(const char* path, std::size_t sz) {
        int fd = ::open(path, O_RDONLY);
        if (fd < 0) throw std::system_error(errno, std::system_category(),
                                           "open: " + std::string(path));
        void* p = ::mmap(nullptr, sz, PROT_READ, MAP_PRIVATE, fd, 0);
        ::close(fd);
        if (p == MAP_FAILED)
            throw std::system_error(errno, std::system_category(), "mmap");
        return p;
    }
    std::size_t size_;
    void*       data_;
};

} // namespace os

// ---------- 单词扫描 ----------
static bool isword(char c) noexcept {
    return std::isalnum(static_cast<unsigned char>(c)) || c == '_';
}

// 扫描一段连续内存,返回局部表
static Freq scan_block(const char* first, const char* last) {
    Freq  local;
    std::string w;
    w.reserve(64);
    for (auto it = first; it != last; ++it) {
        char c = *it;
        if (isword(c)) {
            w.push_back(static_cast<char>(std::tolower(
                            static_cast<unsigned char>(c))));
        } else if (!w.empty()) {
            ++local[w];
            w.clear();
        }
    }
    if (!w.empty()) ++local[w];
    return local;
}

// ---------- 并行框架 ----------
static Freq count_region(const char* first, const char* last,
                         unsigned threads) {
    if (threads == 0) threads = 1;
    const std::size_t chunk = (last - first) / threads;
    std::vector<std::future<Freq>> futures;
    futures.reserve(threads);

    for (unsigned i = 0; i < threads; ++i) {
        const char* beg = first + i * chunk;
        const char* end = (i + 1 == threads) ? last : beg + chunk;

        // 调整 beg 到单词边界
        if (i && beg < last && isword(beg[-1]) && isword(beg[0]))
            while (beg != end && isword(*beg)) ++beg;

        futures.push_back(std::async(std::launch::async,
                                    [beg, end] { return scan_block(beg, end); }));
    }

    // 归并所有局部表
    Freq global;
    for (auto& f : futures)
        for (auto& [w, c] : f.get())
            global[w] += c;
    return global;
}

// ---------- 对外接口 ----------
Freq count_file(const std::string& path) {
    os::mapped_region file(path);
    unsigned cores = std::thread::hardware_concurrency();
    return count_region(file.data(), file.data() + file.size(), cores);
}

主程序main.cpp

#include "wc.hpp"
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <string>
#include <vector>

int main(int argc, char* argv[]) {
    if (argc != 2) {
        std::cerr << "Usage: " << argv[0] << " <utf8-en.txt>\n";
        return 1;
    }

    // 1. 统计
    const auto freq = count_file(argv[1]);

    // 2. 直接把 map 转成 vector<string_view> 避免拷贝 key
    using entry = std::pair<std::size_t, std::string_view>;
    std::vector<entry> top;
    top.reserve(freq.size());
    for (const auto& [w, c] : freq) top.emplace_back(c, w);

    // 3. 取 Top-10(nth_element + sort 前 10 更快)
    const std::size_t k = 10;
    if (top.size() > k) {
        std::nth_element(top.begin(), top.begin() + k, top.end(),
                         std::greater<>{});
        top.resize(k);
    }
    std::sort(top.begin(), top.end(), std::greater<>{});

    // 4. 输出
    for (const auto& [c, w] : top)
        std::cout << std::setw(10) << w << "  " << c << '\n';
}

CMake

cmake_minimum_required(VERSION 3.10)
project(wordcount LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

add_executable(wordcount
    src/main.cpp
    src/wc.cpp
)
target_include_directories(wordcount PRIVATE include)
# 需要 pthread
find_package(Threads REQUIRED)
target_link_libraries(wordcount PRIVATE Threads::Threads)

编译运行:

mkdir build
cd build
cmake ..
cmake --build . -j --config Release # Release模式编译,开启优化
./word_count ../bigfile.txt

线程池

上述代码中用的是 std::async 来启动线程并行处理任务。这样做的好处是简单易用,但缺点是每次调用都会创建一个新的线程,频繁创建销毁的开销也不小。对此,人们提出了“线程池”(Thread Pool)这个概念。线程池指的是预先创建一组线程,并将它们放入一个池子中。当有任务需要处理时,就从线程池中取出一个空闲的线程来执行任务,任务完成后再将线程放回池子中。这样就能避免频繁创建销毁线程的开销,提高程序的性能。

实现线程池的方式有很多种,下面是一个简单的线程池实现示例:

#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
class ThreadPool {
public:
    ThreadPool(std::size_t n) : stop(false) {
        for (std::size_t i = 0; i < n; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->mtx);
                        this->cv.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty()) return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        for (std::thread &worker : workers) worker.join();
    }
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(mtx);
            if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace([task]() { (*task)(); });
        }
        cv.notify_one();
        return res;
    }
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex mtx;
    std::condition_variable cv;
    bool stop;
};
使用这个线程池:
ThreadPool pool(hardware_threads());
std::vector<std::future<wc::CountMap>> futures;
for (const auto& r : ranges)
    futures.push_back(pool.enqueue(wc::count_range, file, r));

线程池虽然有效地提升了性能,但也增加了代码的复杂性。

怎样调试?

多线程程序的调试往往比单线程程序复杂得多。

首先,我们应确认是“多线程”本身的问题,还是其他的逻辑问题。例如上述代码中,如果结果不正确,首先应检查文件分割和单块统计的逻辑是否正确,而不是直接怀疑多线程部分。一个简单的检查方式就是多次运行程序,看看结果是否一致。如果结果不一致,那么很可能是多线程部分的问题;如果结果一致但不正确,那么问题可能出在其他地方。

当定位问题确实在多线程部分时,可以尝试以下工具: - ThreadSanitizer:这是一个动态检测工具,可以帮助我们检测多线程程序中的数据竞争和死锁等问题。它通过在编译时插入额外的检查代码,在运行时监控线程之间的内存访问,从而发现潜在的并发问题。使用该工具需要在编译时添加相应的编译选项,例如对于GCC和Clang,可以使用 -fsanitize=thread 选项。 - Clang-Tidy:这是一个静态代码分析工具,可以帮助我们发现多线程程序中的潜在问题。它通过分析源代码,检查线程安全性、锁的使用等方面的问题,从而提高代码的质量和可靠性。使用该工具需要安装Clang,并运行相应的命令行工具。 - Valgrind-Helgrind:这是Valgrind工具集中的一个子工具,专门用于检测多线程程序中的数据竞争和死锁等问题。它通过在运行时监控线程之间的内存访问,发现潜在的并发问题。使用该工具需要安装Valgrind,并运行相应的命令行工具。

另,也可以用可视化方式调试,例如chrome:://tracing/,可以把程序运行时的线程调度信息导出来,然后用这个工具查看线程的执行情况,从而发现潜在的问题。或者Intel VTune、Windows Concurrency Visualizer等工具,也能帮助我们分析多线程程序的性能瓶颈和并发问题。实在不行,用 std::this_thread::get_id() 打印线程ID,结合日志分析也是一种可行的办法。

什么时候不能用多线程?

多线程确实是提升性能的有效手段,但并不是所有情况下都适合使用多线程。因为多线程的代码写起来实在是太复杂了,调试起来更是头疼。所以,在决定使用多线程之前,我们需要评估一下任务的性质和性能瓶颈,看看多线程是否真的能带来显著的性能提升。

为了衡量性能瓶颈,人们人为地把任务分成两类:CPU密集型I/O密集型。前者指的是任务主要消耗CPU资源,例如复杂的计算、数据处理等;后者指的是任务主要消耗I/O资源,例如文件读写、网络通信等。

要分析一个任务究竟是CPU密集型还是I/O密集型,可以这样做:首先,把任务拆成两段,也就是“计算”和“I/O”两部分。然后,分别测量这两部分的执行时间。如果计算部分的时间远大于I/O部分的时间,那么这个任务就是CPU密集型;反之,如果I/O部分的时间远大于计算部分的时间,那么这个任务就是I/O密集型。而在不同的机器上,这个比例可能会有所不同:机械硬盘、SATA协议的SSD、NVMe协议的SSD和内存盘,它们的带宽和延迟差别巨大,越靠后的设备,任务越倾向于CPU密集型。

对于CPU密集型任务,多线程通常能显著提升性能,因为它能充分利用多核CPU的计算能力,实现并行处理,从而缩短任务的执行时间。而对于I/O密集型任务,多线程的效果可能并不明显,甚至可能带来额外的开销。因为I/O操作通常是阻塞的,线程在等待I/O完成时会被挂起,这会导致线程切换的开销增加,从而降低整体性能。

上述任务中,如果我们确实能一口气把所有文件读入内存,那它就是一个典型的CPU密集型任务,多线程能显著提升性能;但如果使用的是机械硬盘,那还不如单线程。关于上述问题,机械硬盘选择单线程,SATA可以“轻量多线程”(也就是说线程数在2到4之间),NVMe和内存盘则可以使用“重度多线程”(线程数等于CPU物理核心数)。而现在已经2025年,NVMe硬盘随处可见,大家大可放心大胆地使用多线程。

另外,上述代码其实相当朴素,仅奔着“实现”去,没有经过严格的性能调优。实际上,在NVMe或内存场景下,用更高效的并行归并算法也能提升合并性能;用更高效的哈希表实现(例如Google的 dense_hash_map )也能提升性能;用线程池也能减少线程创建销毁的开销;用更低级的内存顺序模型也能提升原子操作的性能……这些都是可以考虑的优化方向。

需要注意的是CPU物理核心数和逻辑核心数是不一样的:物理核心数是CPU实际拥有的核心数量,而逻辑核心数则是通过超线程技术(Hyper-Threading)实现的、操作系统看到的核心数量。一般来说,逻辑核心数是物理核心数的两倍。例如,一个四核八线程的CPU,其物理核心数为4,逻辑核心数为8。

对内存带宽敏感的任务,超线程(线程数量大于物理核心数)反而会降低性能,因为多个线程争抢同一个核心的资源,导致缓存命中率下降和上下文切换增加,从而降低整体性能。因此,在选择线程数时,建议优先考虑线程数等于物理核心数,然后再根据实际情况进行微调。而当上述文件极大(TB级别)时,TLB Miss也会成为瓶颈,这时可以考虑NUMA架构下的内存亲和性(Memory Affinity)等高级优化手段。

扩展阅读

由于多线程编程的复杂性和多样性,建议同学们进一步阅读相关书籍和文档,以深入理解多线程编程的原理和实践。

我个人推荐The Art of Multiprocessor Programming一书,作者Maurice Herlihy和Nir Shavit是并发编程领域的权威专家。这本书深入探讨了多处理器编程的基本原理和技术,涵盖了锁、无锁数据结构、事务内存等主题。书中不仅介绍了理论知识,还提供了大量的实践案例和代码示例,帮助读者理解如何在实际应用中实现高效且正确的多线程程序。