在多线程编程中,当多个线程需要访问和修改共享数据时,如果没有任何同步机制,就可能发生数据竞争(Data Race),导致程序行为不可预测、数据损坏甚至崩溃。C++标准库通过<mutex>头文件提供了一系列互斥量(Mutex)类型,它们是实现线程同步、保护共享资源免遭并发访问破坏的基础工具。
1. <mutex>库简介
互斥量是一种同步原语,它允许在任意时刻只有一个线程可以锁定(获取)该互斥量。当一个线程成功锁定互斥量后,其他试图锁定该互斥量的线程将被阻塞,直到持有锁的线程释放它。这种机制确保了被互斥量保护的代码段(称为临界区,Critical Section)在同一时间只能被一个线程执行,从而保证了共享数据的操作的原子性和一致性。
<mutex>库主要提供了以下几种互斥量类型:
- std::mutex: 基本的互斥量,不可递归锁定,不带超时尝试锁定。
- std::recursive_mutex: 递归互斥量,允许同一线程多次锁定,但必须对应多次解锁。
- std::timed_mutex: 带超时的互斥量,允许尝试锁定一段时间,如果超时仍未获得锁则返回。
- std::recursive_timed_mutex: 带超时的递归互斥量。
此外,<mutex>库还提供了锁管理工具,如std::lock_guard、std::unique_lock和std::scoped_lock (C++17),它们通过RAII(Resource Acquisition Is Initialization)原则简化了互斥量的锁定和解锁管理,有助于避免死锁和资源泄漏。
2. <mutex>库的特点
- 排他性访问:确保临界区代码在同一时间只被一个线程执行。
- 阻塞机制:当锁被占用时,其他尝试获取锁的线程会阻塞等待。
- 多种类型:提供不同特性的互斥量以适应不同场景(如递归、超时)。
- RAII锁管理:通过lock_guard等工具类,实现锁的自动获取和释放,提高代码健壮性。
- 标准化与跨平台:作为C++标准库的一部分,保证了在不同平台上的行为一致性。
3. 互斥量类型详解
3.1. std::mutex
std::mutex是最基础的互斥量类型。
成员函数:
- mutex() noexcept;: 默认构造函数。
- ~mutex();: 析构函数。如果互斥量在析构时仍被任何线程锁定,行为是未定义的。
- lock();: void 锁定互斥量。如果互斥量已被其他线程锁定,则当前线程阻塞,直到获得锁。 如果当前线程已持有该(非递归)互斥量的锁,再次调用lock()会导致死锁(行为未定义)。
- try_lock();: bool 尝试锁定互斥量。如果互斥量未被锁定,则当前线程获得锁并返回true。如果互斥量已被锁定,则立即返回false,不会阻塞。
- unlock();: void 解锁互斥量。必须由当前持有锁的线程调用。如果对一个未被锁定的互斥量或不被当前线程持有的互斥量调用unlock(),行为是未定义的。
- native_handle();: native_handle_type 返回底层实现的互斥量句柄(平台相关)。
std::mutex不可复制,不可移动。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mtx; // 全局互斥量
int shared_counter = 0;
void increment_counter(int iterations) {
for (int i = 0; i < iterations; ++i) {
mtx.lock(); // 获取锁
// --- 临界区开始 ---
shared_counter++;
// --- 临界区结束 ---
mtx.unlock(); // 释放锁
}
}
void safe_increment_with_try_lock(int iterations) {
for (int i = 0; i < iterations; ++i) {
while (!mtx.try_lock()) {
// 锁被占用,可以做点别的事情或稍后重试
std::cout << "Thread " << std::this_thread::get_id() << " failed to acquire lock, yielding." << std::endl;
std::this_thread::yield(); // 让出CPU
}
// 获取锁成功
shared_counter++;
mtx.unlock();
}
}
int main() {
std::vector<std::thread> threads;
int num_threads = 5;
int iterations_per_thread = 100000;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, iterations_per_thread);
// threads.emplace_back(safe_increment_with_try_lock, iterations_per_thread); // 或者使用 try_lock 版本
}
for (auto& t : threads) {
t.join();
}
std::cout << "Expected counter value: " << num_threads * iterations_per_thread << std::endl;
std::cout << "Actual shared_counter value: " << shared_counter << std::endl;
return 0;
}
注意:直接使用lock()和unlock()需要非常小心,如果在锁定后、解锁前发生异常,锁可能不会被释放,导致死锁。因此,强烈推荐使用RAII包装类如std::lock_guard。
3.2. std::recursive_mutex
std::recursive_mutex允许同一个线程多次锁定同一个互斥量。线程必须对每一次lock()调用执行一次unlock()才能完全释放锁。其他线程只有在该互斥量的锁定计数降为0时才能获取锁。
这在某些递归函数需要保护共享资源,且递归调用中可能再次需要获取同一把锁的场景下有用。但过度使用递归锁可能表明设计上存在问题,应优先考虑非递归方案。
成员函数:与std::mutex类似 (lock, try_lock, unlock, native_handle)。
#include <iostream>
#include <thread>
#include <mutex> // For std::recursive_mutex
std::recursive_mutex rec_mtx;
int recursion_depth_counter = 0;
void recursive_function(int depth) {
rec_mtx.lock();
std::cout << "Thread " << std::this_thread::get_id() << " entered recursive_function at depth " << depth << ". Lock count likely > 1." << std::endl;
recursion_depth_counter++;
if (depth > 0) {
recursive_function(depth - 1);
}
std::cout << "Thread " << std::this_thread::get_id() << " exiting recursive_function at depth " << depth << std::endl;
rec_mtx.unlock();
}
int main() {
std::thread t1(recursive_function, 3);
std::thread t2(recursive_function, 2);
t1.join();
t2.join();
std::cout << "Final recursion_depth_counter: " << recursion_depth_counter << std::endl;
return 0;
}
3.3. std::timed_mutex
std::timed_mutex扩展了std::mutex的功能,增加了两个带超时的尝试锁定操作:try_lock_for()和try_lock_until()。
新增成员函数:
- try_lock_for(const std::chrono::duration<Rep, Period>& timeout_duration);: bool 尝试在timeout_duration指定的时间段内锁定互斥量。如果在超时前获得锁,返回true。如果超时,返回false。
- try_lock_until(const std::chrono::time_point<Clock, Duration>& timeout_time);: bool 尝试在到达timeout_time指定的时间点前锁定互斥量。如果在超时前获得锁,返回true。如果超时,返回false。
其他成员函数与std::mutex相同。
#include <iostream>
#include <thread>
#include <mutex> // For std::timed_mutex
#include <chrono>
std::timed_mutex tmtx;
void worker_with_timeout() {
std::cout << "Thread " << std::this_thread::get_id() << " attempting to lock with timeout." << std::endl;
// 尝试在 100 毫秒内获取锁
if (tmtx.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread " << std::this_thread::get_id() << " acquired lock." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 持有锁一段时间
tmtx.unlock();
std::cout << "Thread " << std::this_thread::get_id() << " released lock." << std::endl;
} else {
std::cout << "Thread " << std::this_thread::get_id() << " failed to acquire lock within timeout (try_lock_for)." << std::endl;
}
auto now = std::chrono::steady_clock::now();
auto deadline = now + std::chrono::milliseconds(200);
// 尝试在某个时间点前获取锁
if (tmtx.try_lock_until(deadline)) {
std::cout << "Thread " << std::this_thread::get_id() << " acquired lock (try_lock_until)." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
tmtx.unlock();
std::cout << "Thread " << std::this_thread::get_id() << " released lock (try_lock_until)." << std::endl;
} else {
std::cout << "Thread " << std::this_thread::get_id() << " failed to acquire lock by deadline (try_lock_until)." << std::endl;
}
}
int main() {
std::thread t1(worker_with_timeout);
std::thread t2(worker_with_timeout);
// 主线程先获取锁,让子线程超时
tmtx.lock();
std::cout << "Main thread acquired lock, worker threads will likely timeout initially." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
tmtx.unlock();
std::cout << "Main thread released lock." << std::endl;
t1.join();
t2.join();
return 0;
}
3.4. std::recursive_timed_mutex
结合了std::recursive_mutex和std::timed_mutex的特性,既支持递归锁定,也支持带超时的尝试锁定操作。
4. RAII 锁管理类
手动调用lock()和unlock()容易出错,尤其是在有多个返回路径或可能抛出异常的代码中。RAII(Resource Acquisition Is Initialization,资源获取即初始化)是一种C++编程技法,通过类对象的构造获取资源,在析构时释放资源,从而保证资源管理的自动化和异常安全。
<mutex>库提供了几种RAII包装器来管理互斥锁。
4.1. std::lock_guard<Mutex>
std::lock_guard是最简单的锁包装器。在其构造函数中,它会调用传入的互斥量对象的lock()方法。在其析构函数中(即lock_guard对象离开作用域时),它会自动调用互斥量的unlock()方法。
- explicit lock_guard(Mutex& m);: 构造时锁定m。
- lock_guard(Mutex& m, std::adopt_lock_t);: 构造,假定m已经被当前线程锁定(不再次调用lock())。std::adopt_lock是一个空标记类型struct adopt_lock_t {};。
std::lock_guard不可复制,不可移动。一旦构造,它就严格持有着锁,直到析构。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex counter_mtx;
long long counter_val = 0;
void increment_with_lock_guard(int iterations) {
for (int i = 0; i < iterations; ++i) {
std::lock_guard<std::mutex> guard(counter_mtx); // 构造时自动 lock
// --- 临界区 ---
counter_val++;
// guard 离开作用域时自动 unlock (即使发生异常)
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(increment_with_lock_guard, 100000);
}
for (auto& t : threads) {
t.join();
}
std::cout << "Counter value: " << counter_val << std::endl;
return 0;
}
4.2. std::unique_lock<Mutex>
std::unique_lock是一个更灵活的锁包装器。它也遵循RAII原则,但在功能上比std::lock_guard更强大:
- 支持延迟锁定:可以在构造时不立即锁定,之后再手动调用lock()、try_lock()、try_lock_for()或try_lock_until()。
- 可移动:std::unique_lock对象可以被移动,从而转移锁的所有权。
- 条件变量配合:std::unique_lock是与std::condition_variable一起使用的标准方式,因为条件变量的wait操作需要能够原子地释放锁并重新获取锁。
- 显式解锁和重锁:可以调用unlock()提前释放锁,之后如果需要还可以再次调用lock()等方法重新获取锁。
- 所有权检查:owns_lock()成员函数可以检查unique_lock当前是否持有锁。
构造函数 (部分):
- unique_lock() noexcept; (默认构造,不关联互斥量)
- explicit unique_lock(Mutex& m); (构造并锁定m)
- unique_lock(Mutex& m, std::defer_lock_t) noexcept; (构造,关联m但不锁定。std::defer_lock是空标记类型)
- unique_lock(Mutex& m, std::try_to_lock_t); (构造,尝试锁定m,不阻塞。std::try_to_lock是空标记类型)
- unique_lock(Mutex& m, std::adopt_lock_t); (构造,假定m已被锁定)
- unique_lock(Mutex& m, const std::chrono::duration<Rep, Period>& timeout_duration); (构造,尝试在超时内锁定m)
- unique_lock(Mutex& m, const std::chrono::time_point<Clock, Duration>& timeout_time); (构造,尝试在超时前锁定m)
- unique_lock(unique_lock&& other) noexcept; (移动构造)
主要成员函数:
- lock(), try_lock(), try_lock_for(), try_lock_until(), unlock()
- owns_lock() const noexcept;: bool,检查是否持有锁。
- operator bool() const noexcept;: 等价于owns_lock()。
- release() noexcept;: Mutex*,断开与互斥量的关联并返回指向它的指针,但不解锁互斥量。调用后unique_lock不再拥有锁。
- mutex() const noexcept;: Mutex*,返回关联的互斥量指针。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <chrono>
std::timed_mutex flexible_mtx;
int resource_value = 0;
void use_resource_flexible(int id) {
std::unique_lock<std::timed_mutex> lock(flexible_mtx, std::defer_lock); // 延迟锁定
std::cout << "Thread " << id << " created unique_lock, initially not locked." << std::endl;
if (lock.try_lock_for(std::chrono::milliseconds(150))) {
std::cout << "Thread " << id << " acquired lock." << std::endl;
resource_value++;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// lock.unlock(); // 可以提前解锁
// std::cout << "Thread " << id << " explicitly unlocked." << std::endl;
// ... 做一些不需要锁的操作 ...
// lock.lock(); // 重新锁定 (如果之前解锁了)
// std::cout << "Thread " << id << " re-acquired lock." << std::endl;
} else {
std::cout << "Thread " << id << " failed to acquire lock." << std::endl;
}
// lock 在析构时自动解锁 (如果持有锁)
if (lock.owns_lock()) {
std::cout << "Thread " << id << " still owns lock before destruction." << std::endl;
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back(use_resource_flexible, i);
}
// 主线程持有锁一段时间,让其他线程的 try_lock_for 可能失败或等待
{
std::lock_guard<std::timed_mutex> main_lg(flexible_mtx);
std::cout << "Main thread holds lock for 200ms." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "Main thread released lock." << std::endl;
for (auto& t : threads) {
t.join();
}
std::cout << "Final resource_value: " << resource_value << std::endl;
return 0;
}
4.3. std::scoped_lock<MutexTypes...>(C++17)
std::scoped_lock是一个可变参数模板类,用于同时锁定一个或多个互斥量,并保证采用避免死锁的算法(通常是按固定顺序或尝试锁定并回退的策略)。它在构造时锁定所有传入的互斥量,在析构时按相反顺序解锁它们。
这是处理多个锁时推荐的方式,因为它内置了死锁避免机制。
- explicit scoped_lock(MutexTypes&... m);
- scoped_lock(std::adopt_lock_t, MutexTypes&... m);
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mtx1, mtx2;
int val1 = 0, val2 = 0;
void transfer_money_safe(int amount) {
// 使用 scoped_lock 同时锁定 mtx1 和 mtx2,避免死锁
std::scoped_lock lock(mtx1, mtx2); // C++17
// 或者在 C++11/14 中使用 std::lock 和 std::lock_guard:
// std::lock(mtx1, mtx2); // 死锁避免算法
// std::lock_guard<std::mutex> guard1(mtx1, std::adopt_lock);
// std::lock_guard<std::mutex> guard2(mtx2, std::adopt_lock);
std::cout << "Thread " << std::this_thread::get_id() << " acquired both locks." << std::endl;
// 模拟转账
val1 -= amount;
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟操作耗时
val2 += amount;
std::cout << "Thread " << std::this_thread::get_id() << " finished transfer." << std::endl;
// lock 析构时自动按相反顺序解锁 mtx2, mtx1
}
int main() {
val1 = 1000;
val2 = 1000;
std::vector<std::thread> threads;
threads.emplace_back(transfer_money_safe, 100);
// 如果另一个线程尝试以相反顺序获取锁,没有 std::scoped_lock 或 std::lock 很容易死锁
// 例如,另一个操作可能需要先锁 mtx2 再锁 mtx1
threads.emplace_back([](){
std::scoped_lock lock(mtx2, mtx1); // 顺序不同,但 scoped_lock 会处理
std::cout << "Thread " << std::this_thread::get_id() << " (reverse order attempt) acquired both locks." << std::endl;
val2 -= 50;
val1 += 50;
std::cout << "Thread " << std::this_thread::get_id() << " finished reverse transfer." << std::endl;
});
for (auto& t : threads) {
t.join();
}
std::cout << "Final val1: " << val1 << ", val2: " << val2 << std::endl;
// 预期: val1 = 1000 - 100 + 50 = 950, val2 = 1000 + 100 - 50 = 1050
return 0;
}
5. 其他相关函数
- std::lock(Lockable1& lock1, Lockable2& lock2, LockableN&... lockN);: (自由函数) 这是一个可变参数模板函数,用于同时锁定多个可锁定对象(如std::mutex及其变体)。它使用死锁避免算法来确保所有锁都被获取,或者在无法获取所有锁时(例如,如果一个try_lock失败),它会释放已经获取的锁并可能抛出异常(如果某个lock操作抛出)。 通常与std::lock_guard的adopt_lock版本配合使用。
- std::try_lock(Lockable1& lock1, Lockable2& lock2, LockableN&... lockN);: (自由函数) 尝试锁定所有给定的可锁定对象。它会按顺序对每个对象调用try_lock()。如果所有try_lock()都成功,则返回-1。如果任何一个try_lock()失败,则它会释放所有已经成功锁定的对象,并返回失败的那个锁的索引(0-based)。
6. 应用场景
互斥量是并发编程中不可或缺的工具,用于:
- 保护共享数据:防止多个线程同时修改同一数据导致不一致。
- 实现临界区:确保一段代码在任意时刻只有一个线程执行。
- 构建线程安全的数据结构:如线程安全的队列、映射等。
- 同步线程操作:例如,一个线程等待另一个线程完成某个特定阶段的任务。
- 控制对有限资源的访问:如打印机、文件句柄等。
7. 注意事项与最佳实践
- RAII优先:始终优先使用std::lock_guard、std::unique_lock或std::scoped_lock来管理锁的生命周期,避免手动lock/unlock带来的风险。
- 减小临界区范围:锁定的代码段(临界区)应尽可能短小。只保护真正需要保护的共享数据访问,尽快释放锁,以减少线程阻塞时间,提高并发性。
- 避免死锁:
- 当需要获取多个锁时,确保所有线程都以相同的顺序获取它们。
- 使用std::lock或std::scoped_lock (C++17) 来原子地获取多个锁。
- 避免在持有锁的情况下调用可能导致阻塞或回调用户代码的函数。
- 不要在持有锁时进行耗时操作:如文件I/O、网络通信或长时间计算,这会严重影响其他等待该锁的线程的性能。
- 选择合适的互斥量类型:根据是否需要递归、超时等特性选择。
- std::unique_lock的灵活性与责任:虽然unique_lock灵活,但也更容易误用。例如,手动unlock后忘记在需要时重新lock。
- 警惕锁的粒度:锁的粒度过大(一个大锁保护过多数据)会降低并发性;粒度过小(过多小锁)则可能增加管理复杂度和死锁风险,并可能因频繁加解锁而产生性能开销。
8. 总结
std::mutex及其相关工具是C++中实现线程同步、保护共享资源的核心机制。通过正确使用互斥量和RAII锁管理器,开发者可以有效地防止数据竞争,构建健壮且高效的并发应用程序。
理解不同互斥量类型的特性、RAII锁管理类的用法以及避免死锁的策略,是进行C++多线程编程的基础。虽然互斥锁是强大的工具,但也应注意其潜在的性能影响和复杂性,并在设计时仔细考虑锁的范围、粒度和获取顺序。在某些对性能要求极高且竞争不激烈的场景,也可以考虑使用原子操作(<atomic>)作为替代或补充。