8. 并发和多线程编程

线程基础

1. 创建和管理线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <thread>
#include <iostream>

// 线程函数
void threadFunction() {
    std::cout << "Thread running\n";
}

// 带参数的线程函数
void threadWithParams(int x, std::string str) {
    std::cout << "Thread with params: " << x << ", " << str << "\n";
}

int main() {
    // 创建线程
    std::thread t1(threadFunction);
    
    // 创建带参数的线程
    std::thread t2(threadWithParams, 42, "Hello");
    
    // 使用lambda表达式
    std::thread t3([]() {
        std::cout << "Lambda thread\n";
    });
    
    // 等待线程完成
    t1.join();
    t2.join();
    t3.join();
    
    // 分离线程
    std::thread t4(threadFunction);
    t4.detach();
    
    // 检查线程是否可以join
    if (t4.joinable()) {
        t4.join();
    }
    
    return 0;
}

2. 线程标识和硬件并发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#include <thread>

void printThreadInfo() {
    // 获取当前线程ID
    std::thread::id thisId = std::this_thread::get_id();
    
    // 获取硬件支持的并发线程数
    unsigned int numThreads = std::thread::hardware_concurrency();
    
    std::cout << "Thread ID: " << thisId << "\n";
    std::cout << "Hardware Concurrency: " << numThreads << "\n";
}

// 线程本地存储
thread_local int counter = 0;

void threadLocalDemo() {
    ++counter;  // 每个线程都有自己的counter副本
    std::cout << "Thread " << std::this_thread::get_id() 
              << " counter: " << counter << "\n";
}

3. 线程生命周期管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ThreadGuard {
    std::thread& t;
public:
    explicit ThreadGuard(std::thread& t_) : t(t_) {}
    
    ~ThreadGuard() {
        if (t.joinable()) {
            t.join();
        }
    }
    
    // 禁止拷贝和赋值
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
};

void threadLifecycle() {
    std::thread t([]() {
        std::cout << "Working...\n";
    });
    
    ThreadGuard guard(t);  // 确保线程在作用域结束时被join
    
    // 其他代码...
}  // guard析构,自动join线程

同步机制

1. 互斥量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <mutex>

class Counter {
private:
    mutable std::mutex mutex;
    int value = 0;
    
public:
    // 基本互斥
    void increment() {
        std::lock_guard<std::mutex> lock(mutex);
        ++value;
    }
    
    // 使用unique_lock(更灵活)
    void decrement() {
        std::unique_lock<std::mutex> lock(mutex);
        --value;
        // lock.unlock();  // 可以提前解锁
        // lock.lock();    // 可以重新加锁
    }
    
    // const成员函数中的互斥
    int get() const {
        std::lock_guard<std::mutex> lock(mutex);
        return value;
    }
};

// 死锁避免
class BankAccount {
    std::mutex mutex;
    double balance;
public:
    void transfer(BankAccount& other, double amount) {
        // 使用std::lock同时锁定两个互斥量
        std::lock(mutex, other.mutex);
        std::lock_guard<std::mutex> lock1(mutex, std::adopt_lock);
        std::lock_guard<std::mutex> lock2(other.mutex, std::adopt_lock);
        
        balance -= amount;
        other.balance += amount;
    }
};

2. 条件变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <condition_variable>

class ThreadSafeQueue {
private:
    std::mutex mutex;
    std::condition_variable cv;
    std::queue<int> queue;
    
public:
    void push(int value) {
        std::lock_guard<std::mutex> lock(mutex);
        queue.push(value);
        cv.notify_one();  // 通知一个等待的线程
    }
    
    int pop() {
        std::unique_lock<std::mutex> lock(mutex);
        cv.wait(lock, [this]() { return !queue.empty(); });
        
        int value = queue.front();
        queue.pop();
        return value;
    }
    
    // 带超时的等待
    bool try_pop(int& value, const std::chrono::milliseconds& timeout) {
        std::unique_lock<std::mutex> lock(mutex);
        if (!cv.wait_for(lock, timeout, [this]() { return !queue.empty(); })) {
            return false;  // 超时
        }
        
        value = queue.front();
        queue.pop();
        return true;
    }
};

3. 读写锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <shared_mutex>

class ThreadSafeMap {
private:
    mutable std::shared_mutex mutex;
    std::map<std::string, int> data;
    
public:
    // 写操作(独占锁)
    void write(const std::string& key, int value) {
        std::unique_lock<std::shared_mutex> lock(mutex);
        data[key] = value;
    }
    
    // 读操作(共享锁)
    bool read(const std::string& key, int& value) const {
        std::shared_lock<std::shared_mutex> lock(mutex);
        auto it = data.find(key);
        if (it == data.end()) {
            return false;
        }
        value = it->second;
        return true;
    }
};

异步编程

1. Future和Promise

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <future>

// 使用async
std::future<int> calculateAsync() {
    return std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return 42;
    });
}

// 使用promise
void produceValue(std::promise<int> promise) {
    try {
        // 计算值
        int result = someComputation();
        promise.set_value(result);
    } catch (...) {
        promise.set_exception(std::current_exception());
    }
}

// 使用packaged_task
int compute(int x) {
    return x * x;
}

void futureDemo() {
    // async示例
    auto future1 = calculateAsync();
    int result1 = future1.get();  // 等待结果
    
    // promise示例
    std::promise<int> promise;
    std::future<int> future2 = promise.get_future();
    std::thread t(produceValue, std::move(promise));
    int result2 = future2.get();
    t.join();
    
    // packaged_task示例
    std::packaged_task<int(int)> task(compute);
    std::future<int> future3 = task.get_future();
    task(4);
    int result3 = future3.get();
}

2. 异步任务组合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 串行组合
template<typename F, typename G>
auto then(std::future<F>&& f, G&& g) {
    return std::async(std::launch::async, [f = std::move(f), g = std::forward<G>(g)]() mutable {
        return g(f.get());
    });
}

// 并行组合
template<typename F, typename G>
auto both(std::future<F>&& f, std::future<G>&& g) {
    return std::async(std::launch::async, [f = std::move(f), g = std::move(g)]() mutable {
        auto result_f = f.get();
        auto result_g = g.get();
        return std::make_pair(result_f, result_g);
    });
}

3. 协程(C++20)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <coroutine>
#include <iostream>

// 简单的协程示例
struct Generator {
    struct promise_type {
        int current_value;
        
        Generator get_return_object() {
            return Generator{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        
        auto initial_suspend() { return std::suspend_never{}; }
        auto final_suspend() noexcept { return std::suspend_always{}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
        
        auto yield_value(int value) {
            current_value = value;
            return std::suspend_always{};
        }
    };
    
    std::coroutine_handle<promise_type> handle;
    
    Generator(std::coroutine_handle<promise_type> h) : handle(h) {}
    ~Generator() { if (handle) handle.destroy(); }
    
    int current_value() { return handle.promise().current_value; }
    bool move_next() { return handle.resume(); }
};

// 生成斐波那契数列的协程
Generator fibonacci() {
    int a = 0, b = 1;
    while (true) {
        co_yield a;
        auto tmp = a;
        a = b;
        b = tmp + b;
    }
}

并发数据结构

1. 无锁队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
template<typename T>
class LockFreeQueue {
private:
    struct Node {
        std::shared_ptr<T> data;
        std::atomic<Node*> next;
        Node() : next(nullptr) {}
    };
    
    std::atomic<Node*> head;
    std::atomic<Node*> tail;
    
public:
    LockFreeQueue() {
        Node* dummy = new Node;
        head.store(dummy);
        tail.store(dummy);
    }
    
    void push(T value) {
        Node* new_node = new Node;
        new_node->data = std::make_shared<T>(std::move(value));
        
        while (true) {
            Node* old_tail = tail.load();
            Node* next = old_tail->next.load();
            
            if (old_tail == tail.load()) {
                if (next == nullptr) {
                    if (old_tail->next.compare_exchange_weak(next, new_node)) {
                        tail.compare_exchange_weak(old_tail, new_node);
                        return;
                    }
                } else {
                    tail.compare_exchange_weak(old_tail, next);
                }
            }
        }
    }
    
    std::shared_ptr<T> pop() {
        while (true) {
            Node* old_head = head.load();
            Node* old_tail = tail.load();
            Node* next = old_head->next.load();
            
            if (old_head == head.load()) {
                if (old_head == old_tail) {
                    if (next == nullptr) {
                        return nullptr;
                    }
                    tail.compare_exchange_weak(old_tail, next);
                } else {
                    auto result = next->data;
                    if (head.compare_exchange_weak(old_head, next)) {
                        delete old_head;
                        return result;
                    }
                }
            }
        }
    }
};

2. 线程安全单例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Singleton {
public:
    static Singleton& getInstance() {
        static Singleton instance;  // C++11保证线程安全的初始化
        return instance;
    }
    
    // 删除拷贝和移动操作
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;
    
private:
    Singleton() = default;
};

// 双检查锁定模式(不推荐,仅作示例)
class SingletonDCL {
private:
    static std::atomic<SingletonDCL*> instance;
    static std::mutex mutex;
    
    SingletonDCL() = default;
    
public:
    static SingletonDCL* getInstance() {
        SingletonDCL* tmp = instance.load(std::memory_order_relaxed);
        std::atomic_thread_fence(std::memory_order_acquire);
        if (tmp == nullptr) {
            std::lock_guard<std::mutex> lock(mutex);
            tmp = instance.load(std::memory_order_relaxed);
            if (tmp == nullptr) {
                tmp = new SingletonDCL;
                std::atomic_thread_fence(std::memory_order_release);
                instance.store(tmp, std::memory_order_relaxed);
            }
        }
        return tmp;
    }
};

57.12k 字
43篇文章