线程和原子操作

线程和原子操作

线程thread基本使用

动或按值复制线程函数的参数。如果需要传递引用参数给线程函数,那么必须包装它(例如用 std::refstd::cref)。

忽略来自函数的任何返回值。如果函数抛出异常,那么就会调用 std::terminate需要将返回值或异常传递回调用方线程时可以使用 std::promisestd::async

传值和传引用:

1
2
3
4
5
6
7
8
9
10
11
void func1(int a , int b) {
cout << a +b <<endl;
}
void func2(int & a) {
a += 10;
}
int main() {
std::thread t1(func1 , 10 , 20);
int c = 10
std::thread t2(func2 , std::ref(c));
}

std::ref 用于包装按引用传递的值。
std::cref 用于包装按const引用传递的值。

创建一个可运行(创建时传入线程函数)的线程对象后,必须对该线程对象进行处理,要么调用join(),要么调用detach(),否则线程对象析构时程序将直接退出。

std::thread::~thread销毁 thread 对象。若 *this 拥有关联线程( joinable() == true ),则调用std::terminate。

1
2
3
4
5
6
7
void f(int a) { cout << a << endl;}
int main() {
{
thread t1(f,1);
}
this_thread::sleep_for(std::chrono::seconds(2));
}

如上代码将收到Abort信号,因为,线程对象t1在作用域中没有调用join或detach,当t1出作用域时,将会抛出异常导致程序退出

线程遇到重载函数

1
2
3
4
5
6
7
8
9
10
void f(int a) { std:: cout << a << std::endl; }
void f(std::string a) { std::cout <<a << std::endl;}

int main() {
std::thread t1((void(*)(int))f , 1);
std::thread t2((void(*)(std::string))f , "aaa");
t1.join();
t2.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
}

原子变量

每个 std::atomic 模板的实例化和全特化定义一个原子类型。如果一个线程写入原子对象,同时另一线程从它读取,那么行为良好定义,std::atomic 既不可复制也不可移动。

std::atomic模板可用于任何满足可复制构造,可复制赋值,可平凡复制类型T的特化,不支持复制初始化

1
2
3
4
5
std::is_trivially_copyable<T>::value
std::is_copy_constructible<T>::value
std::is_move_constructible<T>::value
std::is_copy_assignable<T>::value
std::is_move_assignable<T>::value

上述值为false非良构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::atomic<int> foo(0);  //直接初始化
void set_foo(int x) {
foo.store(x , std::memory_order_relaxed);
}
void print_foo() {
int x;
do {
x = foo.load(std::memory_order_relaxed);
} while(x == 0);
std::cout <<"foo :" <<x << '\n';
}

int main() {
std::thread first(print_foo);
std::thread second(set_foo , 10);
first.join();
second.join();
std::cout << "main finish \n";
return 0;
}
  • std::atomic::store

void store(T desired , std::memort_order order = std::memory_order_seq_cst) noexcept以 desired 原子地替换当前值。按照 order 的值影响内存。order必须是 std::memory_order_relaxed . std::memory_order_release , std::memory_order_seq_cst

  • std::atomic::load

void load(std::memort_order order = std::memory_order_seq_cst) const noexcept,原子地加载并返回原子变量的当前值。按照 order 的值影响内存。返回原子变量的当前值。

  • std::atomic::operator=

T operator=( T desired ) noexcept;将 desired 原子地赋给原子变量。等价于 store(desired)。返回 desired

std::call_once std::once_flag

定义:

1
2
template< class Callable, class... Args >
void call_once(std::once_flag& flag , Callable&& f , Args&&... args);
  • 如果在调用 std::call_once 的时刻,flag 指示 f 已经调用过,那么 std::call_once 会立即返回(称这种对 std::call_once 的调用为消极)。

  • 否则,std::call_once 会调用 INVOKE(std::forward(f), std::forward(args)…)。与 std::thread 的构造函数或 std::async 不同,不会移动或复制参数,因为不需要转移它们到另一执行线程(称这种对 std::call_once 的调用为积极)。

    • 如果该调用抛出了异常,那么将异常传播给 std::call_once 的调用方,并且不翻转 flag,这样还可以尝试后续调用(称这种对 std::call_once 的调用为异常)。
    • 如果该调用正常返回(称这种对 std::call_once 的调用为返回),那么翻转 flag,并保证以同一 flag 对 std::call_once 的其他调用为消极
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <iostream>
#include <mutex>
#include <thread>

std::once_flag flag1, flag2;

void simple_do_once()
{
std::call_once(flag1, [](){ std::cout << "简单样例:调用一次\n"; });
}

void may_throw_function(bool do_throw)
{
if (do_throw)
{
std::cout << "抛出:call_once 会重试\n"; // 这会出现不止一次
throw std::exception();
}
std::cout << "没有抛出,call_once 不会再重试\n"; // 保证一次
}

void do_once(bool do_throw)
{
try
{
std::call_once(flag2, may_throw_function, do_throw);
}
catch (...) {}
}

int main()
{
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
std::thread st3(simple_do_once);
std::thread st4(simple_do_once);
st1.join();
st2.join();
st3.join();
st4.join();

std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();
}
/*
简单样例:调用一次
抛出:call_once 会重试
抛出:call_once 会重试
抛出:call_once 会重试
没有抛出,call_once 不会再重试
*/

异步操作

future

1
2
3
4
5
6
template< class T > class future;
template< class T > class future<T&>;
template<> class future<void>;
//asynchronous 异步
//future 未来
//promise 承诺

类模板 std::future 提供访问异步操作结果的机制

通过std::async std::packaged_task std::promise创建的异步操作能提供一个std::future对象给该异步操作的创建者,然后,异步操作的创建者能用各种方法查询 等待 或从std::future中提取值。若异步操作认为提供值,则这些方法可能阻塞。异步操作准备好发送结果给创建者时,它能通过修改链接到创建者的 std::future 的共享状态,std::future 所引用的共享状态不与另一异步返回对象共享

  • get():get 方法等待直至 future 拥有合法结果并(依赖于使用哪个模板)获取它。它等效地调用 wait() 等待结果,泛型模板和二个模板特化各含单个 get 版本。 get 的三个版本仅在返回类型有别。若调用此函数前 valid() 为 false 则行为未定义。

async

1
2
3
4
template< class Function, class... Args >
std::future<typename std::result_of<typename std::decay<Function>::type(
typename std::decay<Args>::type...)>::type>
async( Function&& f, Args&&... args );

函数模板 std::async 异步地运行函数 f(有可能在可能是线程池一部分的分离线程中),并返回最终将保有该函数调用结果的std::future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int find_result_to_add() {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 用来测试异步延迟的影响
std::cout << "find_result_to_add" << std::endl;
return 1 + 1;
}
int main() {
std::future<int> result = std::async(find_result_to_add);
cout << "valid:"<< result.valid() << std::endl;
// std::future<decltype (find_result_to_add())> result = std::async(find_result_to_add);
// auto result = std::async(find_result_to_add); // 推荐的写法
do_other_things();
std::cout << "result: " << result.get() << std::endl; // 延迟是否有影响?
cout << "valid:"<< result.valid() << std::endl;
}

packaged_task

1
2
template< class R, class ...Args >
class packaged_task<R(Args...)>;

类模板 std::packaged_task 包装任何可调用 (Callable) 目标(函数、 lambda 表达式、 bind 表达式或其他函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中

std::function std::packaged_task 是多态、具分配器的容器:可在堆上或以提供的分配器分配存储的可调用对象。

  • operator():如果以INVOKE<R>(f ,args...)调用存储的任务 f。任务返回值或任何抛出的异常被存储于共享状态。令共享状态就绪,并解除阻塞任何等待此操作的线程。

  • get_future():返回与 *this 共享同一共享状态的 futureget_future 只能对每个 packaged_task 调用一次。

  • reset():重置状态,抛弃先前执行的结果。构造共享状态。等价于 *this = packaged_task(std::move(f)) ,其中 f 是存储的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int add(int a, int b, int c) {
std::cout << "call add\n";
return a + b + c;
}
void do_other_things() {
std::cout << "do_other_things" << std::endl;
}
int main() {
std::packaged_task<int(int, int, int)> task(add); // 封装任务,不运行
do_other_things();
std::future<int> result = task.get_future(); //将result与task的future关联,不运行
task(1, 1, 2); //任务执行,否则在get()获取future的值时会一直阻塞
std::cout << "result:" << result.get() << std::endl;
}

promise

传统的线程返回值:传递一个指针给线程,表示该线程将会把返回值写入指针指向的内存空间。此时主线程将用条件变量等待值被写入,当线程把值写入指针指定的内存后,将唤醒(signal)条件变量,然后主线程将被唤醒,然后从指针指向的内存中获取返回值。

为了实现获取一个返回值的需求,使用传统的方法,我们需要条件变量(condition variable), 互斥量(mutex),和指针三个对象。

C++11的方法:使用std::futurestd::promise

1
template< class R > class promise;

类模板 std::promise 提供存储值或异常的设施,之后通过 std::promise 对象所创建的 std::future对象异步获得结果。注意 std::promise 只应当使用一次。

  • std::future get_future:返回与 *this 关联同一状态的 future 对象。若 *this 无共享状态,或已调用 get_future 则抛出异常

  • set_value(const R& value):原子地存储 value 到共享状态,并令状态就绪。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void print(std::promise<std::string>& p)
{
p.set_value("There is the result whitch you want."); //设置线程返回值
}
int main()
{
std::promise<std::string> promise;
std::future<std::string> result = promise.get_future(); //将promise中的future与result相关联
std::thread t(print, std::ref(promise));
//新建线程,并传入 promise 的 引用,promise 无法复制故要传入引用
do_some_other_things();
std::cout << result.get() << std::endl; //从result中获取结果
t.join();
return 0;
}