C++_Multithreading_Tools - RicoJia/notes GitHub Wiki
========================================================================
========================================================================
0. NOT RECOMMENDED because you might get notified before the listener thread starts!. use std::promise<void>
-
Say you have two functions, that one function depends on another
- You need an mtx, one lock guard, at least one unique_lock, and one condition variable
- unique_lock so condition_variable can do ul.unlock();
- You check the condition because of spurious wakeups, not just C++ has this problem!
- Notify_one or notify_all:
- notify_all will wake up all threads, then they will fight for the lock. Contention!
- Basic Eg (not recommended for the above reasons)
#include <mutex> #include <condition_variable> #include <thread> std::condition_variable cv; std::mutex mtx; int balance = 0; void Foo(){ std::lock_guard<std::mutex> lg(mtx); balance += 30; cv.notify_one(); } void Bar(){ // have a lock std::unique_lock<std::mutex> ul(mtx); // unlock, sleep, wakes up by cv.notify_one(), locks, evaluate predicate cv.wait(ul, [](){return balance > 60; }); //lambda has access to global variable; // because Foo is executed once, cv will always sleep // lock is still locked here // process with mtx being locked } int main(){ std::thread t1(Bar); std::thread t2(Foo); t1.join(); t2.join(); }
- Draw backs:
- you need a mutex
- there're spurious wakeups
- the Foo might finish before cv.wait even gets executed.
- You need an mtx, one lock guard, at least one unique_lock, and one condition variable
-
Solution to the drawbacks of pure cond var
- std::atomic is better than the one before
std::atomic<bool> flag = false; void Foo(){ ... flag = true; } void Bar(){ while (!flag); // Polling }
- Drawback: your polling will ACTUALLY occupy a hardware thread. In condition variable, a blocking thread can be shut off and put to sleep.
- A "mestizo solution"
std::condition_variable cv; std::mutex mtx; bool flag(false); // you just need a bool void Foo(){ ... lock_guard<mutex> lg(mtx); flag = true; cv.notify_one(); } void Bar(){ ... std::unique_lock<mutex> ul(mtx); cv.wait(ul, []{return flag;}); }
- no polling!
- Drawback:
- Bar may not aquire the lock!
- you set a bool, but also do notifying! Not clean
-
conditional_variable_any
can work with any mutex (whileconditional_variable
just withstd::mutex
), it's larger, and may be slower as well -
Recommended way to wait on a conditional variable with time limit, if you're not passing in a predicate:
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(60); std::unique_lock<std::mutex> ul(m); while (!done){ //it's okay if you don't pass in the predicate. if(cv.wait_until(lk, timeout) == std::cv_status::timeout) break; // if you use this, once you get a spurious wake up, you will start waiting again! if(cv.wait_for(lk, time) == std::cv_status::timeout) break; } return done
- another one is
std::cv_status::no_timeout
- another one is
========================================================================
========================================================================
-
Basic example. see code
- A data channel, promise sets a value, future catches it. Data Channel: two end points, pass in
std::promise
and read fromstd::future
- Cons: in ..., p might not be able to set value if there's an exception. Then, t1 will hang
- Uses:
- A promise is meant to be used only once. A setter-getter model. So we cannot pass the same promise into multiple functions
-
std::promise
: aim to replace condition variables, good for handling a lot of small tasks that launch std::thread. E.g, TCP connections -
std::future
was designed for "one-off" events.-
std::thread
doesn't provide a mechanism to directly transfer result back. -
std::future<void>
specialization when there's no associated data. -
std::shared_future
should be accessed by multiple threads by mutexes?
-
- A data channel, promise sets a value, future catches it. Data Channel: two end points, pass in
-
each promise has a reference to a shared state, and result. The shared state can be:
- make_ready: the result has been stored in result, or exception has been stored in shared state. Then, unblocks thread that's waiting on the future.
- release: the promise gives up reference to the shared state. The reference might be destroyed, if this was the last reference.
- Abandon: promise stores and error code as exception. The state will be first make_ready, then release
-
The shared state stored on heap. In a place shared by
std::promise
, last instance ofstd::shared_future
-
3 ways to get
std::future
:std::async
,std::packaged_task
,std::promise
. see code -
Cautions of
std::future
, see code -
Destructor of futures:
- if it's the last future for a shared state, non-deferred task, launched via
std::async
, dtor will block until the task finishes - In all other futures (e.g, don't have shared state, or deferred task, or launched by std::packaged_tasks)
- dtor is basically calling "detach".
- decrementing the reference count
- so future that's 1. moved, 2. shared_future 3. copied shared_future might point to the same shared state.
- therefore, we basically don't know if the dtor will block, because it depends on other threads.
- if it's the last future for a shared state, non-deferred task, launched via
-
Wait (std::future), see code
status = future.wait_for(std::chrono::seconds(1)); // release after 1s, or the result becomes available if (fut.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready){}
- there are 3 states:
std::future_status::ready
,std::future_status::timeout
,std::future_status::deferred
(if the task was deferred)
- there are 3 states:
- If you have multiple threads, use shared future, whose shared_state is known by all shared_future, like a better version of notify_all. see code
- Pros:
- No spurious wakeups, only condition_variables have it.
- Bar always knows when Foo finishes, even if Foo finishes before Bar starts! (not like condition_variable)
- No mutex needed.
- No polling!
- Just one variable for the "notification", very clean!
- one application of shared_future is excel sheet, where multiple cells using formulas depend on a cell input. Once that becomes available, it should be notified.
- Cons:
- promise can be used only once, so this is for one-shot applications.
- Pros:
========================================================================
========================================================================
-
Basic Usage: you create a future, then designate it to another thread
- packaged_task can wrap around a callable, i,e, a lambda, a regular function
- you must start it on a thread
- movable, not copyable, just like promise
-
std::packaged_task
is a high level abstraction ofstd::promise
-
std::packaged_task
will store the result in an internal shared state, sostd::future
can read it. - Canonical use of packaged_task with a regular function - so it can be pushed to a thread pool. see code
- packaged_task can wrap around a callable, i,e, a lambda, a regular function
-
Usage: have a queue, and let another thread process these tasks
- can be created with std::bind, lambda, or use
std::thread(pt, 1, 2)
#include <future>
#include <iostream>
#include <deque>
#include <functional>
#include <utility>
int addNumbers(int a, int b) {
return a + b;
}
std::deque<std::packaged_task<int()>> dq;
void print_num(){
while (dq.size() > 0){
std::packaged_task<int()> task = std::move(dq.at(0));
dq.pop_front();
task(); //it doesn't return int here, it's just executing. You need future to get the return value
}
}
int main() {
// std::packaged_task<int()> pt ([](){return 1+1; });
// std::packaged_task<int()> pt = std::bind(addNumbers, 1,2); //doesn't work, why?
// std::threadi t1(pt, 1, 2);
std::packaged_task<int()> pt(std::bind(addNumbers, 1,2));
dq.push_back(std::move(pt));
std::future<int> fu = dq.front().get_future();
std::thread t1 (print_num);
t1.join();
std::cout<<"sum: "<<fu.get()<<std::endl;
}
-
std::packaged_task()
converted tostd::function<void()>
?. There's no explicit conversion, but there's a trick#include <vector> #include <functional> #include <future> // #include <> int main () { std::vector<std::function<void()>> vec; using task_t = std::packaged_task<int()>; task_t task( [] { return 100; } ); auto future = task.get_future(); // will work even after moving. vec.emplace_back( std::move(task) ); // ERROR: you're still passing packaged_task into std::function, which is non-copy-construstible. vec.emplace_back([t = std::make_shared<task_t>(std::move(task))]{ // with a shared ptr, this lambda is copy-constructible. (*task)(); // calling function inside, this is the trick to make a std::function. }) }
- of course you may want to get the
std::future
ofpackaged_task
.- even though we have moved
packaged_task
, we can still see the valid result. See here -
std::future
can be moved into a shared_future too, so the data channel cannot store its result in eitherstd::future
, orstd::promise
. Instead, it's stored in shared state.. That's the reason why we can still get a valid result back.
- even though we have moved
- do
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
before making the packaged_task, this way we can have empty argument list.
- of course you may want to get the
========================================================================
========================================================================
-
See above for basic usage of
std::async
. see code-
std::async
can take in an move-constructed, copy-constructed, pointer, or reference. - launch policy
-
-
Advantages of async, see code 2. No Over-subscription - when there're more software threads than hardware threads, the scheduler creates "time-slice" for each thread. When one time-slice is finished, a "context switch" is performed. -switch hardware thread -> another hardware thread for a software thread, CPU caches for that software thread contains little data & instructions. which pollutes CPU caches for other threads. -
std::async
used cutting-edge scheduler. 3. Less load-balancing issue? - Default is no guarantee forstd::launch::async
, because that's running on a different thread immediately - So we're letting the scheduler to defer some executions. 4.std::async
internally uses an optimized threadpool. Tested, around the same performance as athreadpool
-
Cons of Async
- Does not work well with TLS (thread local storage)
#include <future> thread_local int i = 0; int f(){return ++i;} int main(){ i = 1; auto fut = std::async(f); int new_i = fut.get(); // you may get 1 as you create an independent thread, or 2 from main()'s i }
- The default async mode cannot:
- Predict whether f is concurrently
- f runs at all.
- Does not work well with TLS (thread local storage)
- Motivation: Imagine we want to compress a file in a separate thread, then return file name and size once we're done.
- If we use std::thread, the traidtional way,
- lock mutex
- we need shared_variables for storing the new values
- use a condition_variable to wake up the main thread.
- If we want to return 100 different values in different points in time, this method is not practical. std::future will handle the mutex, condition_variable for u. If you create too many
std::threads
, you'll getstd::system_error
- If we use std::thread, the traidtional way,
========================================================================
========================================================================
-
std::async
vspacakged_task
vsstd::promise
-
packaged_task
is likestd::function
, but stores result in a future, withshared_state
, so it can be run asynchronously.- Task based programming using
std::async
assumes no access to the same data by threads. Haskell uses this - Erlang uses "Communicating Sequential Processes", which creates channels for communication between threads.
- Task based programming using
-
std::async
automatically launches on certain thread, while packaged task can be launched on any thread.// async auto as = std::async(std::lauch::deferred, sleep); cout<<"hehe"<<endl; //even though as is deferred, we can still see this instantly // this will block as.get(); // packaged_task std::packaged_task<int ()> task(sleep); auto fut = task.get_future(); task(); // explicit invokation cout << "heee"<<endl; // this will have to wait since we launched in main thread. cout<<fut.get(); //this will print the final value.
-
std::async
doesn't need to be invoked,std::packaged_task
does. Also, invoke the program before calling fut.get()!! -
std::future
fromstd::async
destruction will block the main thread. that fromstd::packaged_task
won'tbool get_pizza(){ std::this_thread::sleep_for(10s); return true; } std::future<bool> get_pizza_as = std::async(get_pizza); if (need to go) return; // block even tho there's no block!
- Reason is:
- cpp requires the last reference of shared_state of
std::promise - std::future
pair must finish executing before destruction. -
std::packaged_task
needs explicit invokation, like a function, so that's explicit. -
std::promise
just sets a value,std::future.get()
fromstd::promise
will block, but that's quite explicit. if not calledstd::future.get()
, no blocking.std::promise<int> pr; auto fut = pr.get_future(); // if exit here, no blocking. fut.get();
- But
future = std::async()
will start immediately or a bit later, so it is hidden. Destruction will block even withoutfuture.get()
- cpp requires the last reference of shared_state of
- Reason is:
-
As a summary,
std::packaged_task
is a wrapper that calls a function, and "sets" the result to future (thru its internalstd::promise
).std::async
callsstd::packaged_task
automatically on a different thread.
-
========================================================================
========================================================================
0. Motivation
- std::atomic
is much smaller than std::mutex
, cuz mutex is operation system level, generating a lot of code
- use std::atomic<T>::is_lock_free
to check if a type is lock free
cpp a.is_lock_free()
- Basic Usage: std::atomic
#include <atomic>
std::atomic<bool> if_write_data_(true);
if_write_data_ = false;
-
std::atomic
restrictions- It's for concurrent, not designed for special registers. Special registers should use volatile
volatile x; auto i = x i = 1; i = 2;
- auto here deduces to int, because it drops cv-qualifiers
- volatile means no optimization
- volatile is reserved for sensors, printers, network ports, normal memories. Memory-mapped I/O
- imagine replacing the above with
std::atomic
- those two would be "redundant load (read), and store (write)"
- so they can be optimized.
- If you should use
std::atomic
but instead you usevolatile
,- values might be changed externally
- You get some random value while reading and writing to the same value
- this is called "Data race"
- But you can have volatile std::atomic, which is: atomic can be changed externally any time, but each thread has to change it atomically.
- auto here deduces to int, because it drops cv-qualifiers
-
std::atomic
restrictions:- cannot be copied (ctor, =), or moved (ctor, =)
- Because on hardware it's very difficult to read the value and write to another register atomically.
- needs special load and store because of the above restriction. Also, since
=
means copy construction by default, initialization needs direct-initialization () or list initialization {}.
std::atomic<int> x(1); std::atomic<int> y(x.load()); y.store(x.load());
-
std::atomic
cannot be moved, so classes withstd::atomic
CANNOT be moved either
- Other implementation details
- std::atomic does not use mutex internally
- std::atomic uses RMW hardware operations.
- std::atomic ensures sequential consistancy
- It's for concurrent, not designed for special registers. Special registers should use volatile
- Atomic Container
- std::atomicstd::string doesn't exist
- not trivially_copyable
- trivially copyable = continguous storeage, arrays.
- std::unique_ptr cannot be atomic because it's not copyable
- std::shared_ptr can be atomic.
- But there's a bunch of atomic operations shared_ptr can do see here
- std::shared_ptr's control block is atomic, but not the resource.
-
value = std::atomic_load(ptr)
atomically loads the ptr
-
Theory: Closer look at std::atomic.
- everything is built on top of std::atomic_bool. which can be used to build a lock
- it's very basic, don't use it.
- std::atomic is a template, has load(), store(), assignment
- assignment: returns a copy of the value, not a reference, (otherwise race condition is easy)
- can be used for implicit type conversions
- store: essentially the same as assignment
- std::atomic may have locks(TODO)
-
compare_exchange(T& expected, T desired):
- this is compare and swap (CAS) in cpp. if value is expected, load desired, and return true. Else, store the actual value in expected, and return false. (Note, it's T&)
-
compare_exchange_weak()
can return false spuriously, even though the actual value is equal to the expected. In a while loop it should be fine. compare_exchange_strong()
- std::mutex simple implementation
#include <iostream> #include <atomic> #include <thread> #include <chrono> using namespace std; using namespace std::chrono_literals; int main() { std::atomic<bool> rico_mtx(false); //note deleted: std::atomic<bool> rico_mtx = false; int sum = 0; auto func = [&rico_mtx, &sum]{ //note: NOT rico_mtx&! pay attention to the position of & bool expected = false; // You must have a separate variable to store the actual value while(!rico_mtx.compare_exchange_weak(expected, true)); //you can use compare_exchange_strong as well for (int i = 0; i < 10; ++i){ ++sum; std::this_thread::sleep_for(10ms); } rico_mtx = false; }; std::thread th1(func); std::thread th2(func); th1.join(); th2.join(); cout<<sum<<endl; }
========================================================================
========================================================================
0. IMPORTANT: std::atomic with std::memory_order_seq_cst
does ensure order, even though change may not become immediately visible. Great answer
-
Motivation:
- Before C++11, primarily everything is on single-threaded apps.
- Pthreads lib assume no data race. Also, it's faster because it has more relaxed rules. However, pthreads are not guraranteed to work on every single compiler in the same way. Therefore,
std::thread
should be implemented as a non-third-party lib- C++ 11 added memory model, threading, atomics,mutexes Before, it could be only done in POSIX, BOOST, or Windows Thread API
- reordering of operations can happen during compilation stage. Or, because writing to memory takes longer than reading from shared memory, there's a store buffer for writing which writes to memory altogether. This is due to optimization
- Memory Ordering: "Thou shalt not modify the behavior of a single-threaded program.", so as long as the behavior of a single threaded program is not broken, the compiler achieves its goal.
- Memory Ordering by compiler, source
- cpp code
int A, B; void foo() { A = B + 1; B = 0; }
- without optimization:
$ gcc -S -masm=intel foo.c $ cat foo.s ... mov eax, DWORD PTR _B (redo this at home...) add eax, 1 mov DWORD PTR _A, eax mov DWORD PTR _B, 0 ...
- With Optimization:
$ gcc -O2 -S -masm=intel foo.c $ cat foo.s ... mov eax, DWORD PTR B mov DWORD PTR B, 0 add eax, 1 mov DWORD PTR A, eax ...
- Therefore, it's possible that B (which may be atomic) can be ready before A (which may/may not be atomic)
- To prevent,
std::atomic
or functions are automatically compiler fences.
- cpp code
-
ensuring sequential consistency across multiple threads are costly. (or just on one thread?)
- code optimization by compiler
- instruction reordering
- write buffers in processors
-
Need guarantee that write to one var by a thread must be visible to another
- atomic variables = volatile in Java
- memory_order_seq_cst
std::atomic<int> a; a.store(12, std::memory_order_seq_cst); // memory order sequential consistent, the easiest, the default, and ensures a single-threaded "sequential consistency" int b = a.load(std::memory_order_seq_cst);
- 6 orders. Note that all these memory_orders are just for operations on THE SAME object!!
- memory_order_relaxed
- memory_order_acquire
- memory_order_release
- memory_order_consume
- memory_order_acq_rel
- memory_order_seq_cst
- seq_cst is the default, an
-
what is memory fence? aka memory barrier, instruction that guarantees that a CPU execute an instruction before another,
-
CAS: compare and swap
CAS(*val, expected, newval){ if (*val == expected) expected = new val; return 0; else{ return 1; } }
- Implement mutex:
Lock(*mutex){ while(CAS(mutex, 1, 0)); // if another lock has called, this will wait. } Unlock(*mutex){ *mutex = 1; // set the value to 1. }
-
Locks for synchronization is slow. Atomic is a lot faster.
memory_order
enum forbids the reordering of Memory access- the default is
memory_order_seq_cst
, which generates a memory fence after every single load/ memory barrier. - relaxed: no restrictions. i.e, atomic store, and load are still atomic actions, but they can actually happen in any arbitrary order on a single thread, doesn't have to obey the order in the source code at all.
// Thread 1: r1 = y.load(std::memory_order_relaxed); // A x.store(r1, std::memory_order_relaxed); // B // Thread 2: r2 = x.load(std::memory_order_relaxed); // C y.store(42, std::memory_order_relaxed); // D
- You may still see r1 == r2 ==42, since it's possible to have
D>A>B>C
- You may still see r1 == r2 ==42, since it's possible to have
- E.g, can be used in implementing a counter, where we only care if the total count has passed a value. So that requires adding atomically, but not and ordering or synchronization.
- The other models will generate fences on load, store operations on the same thread
- I tested with my compiler, seems that
memory_order_seq_cst
was always enforced...
- the default is
========================================================================
========================================================================
- There're scenarios where const member function modifies a mutable object, and this function is called by different threads
- So you
-
Use a std::atomic, if there's only one memory locations requiring synchronization, as std::atomic is less expensive than a lock acquiring and releasing
- an atomic num is only movable, not copyable
-
use a lock, if there are more than one memory locations requiring synchronization
- release and acquiring are non-const members, so the lock has to be declared as mutable
- a lock is only movable, not copyable
- E.g of good example: lock used for managing more than one synced variable
class Widget{ public: int readValue() const{ // declared const as we're telling user that we're only reading value. But actually, we want to cache value at the same time if it's not there. std::lock_guard<std::mutex> guard(mtx); // we place a lock here, because there're more than one vars to worry about! if (cacheValid) return val; else{ val = expensive_calculation(); cacheValid = true; return val; } } private: mutable std::mutex m; //it's mutable because lock and release are NOT const member functions. mutable int val; mutable bool cacheValid = false; // This value might be flipped multiple times. }
- E.g, bad example: using std::atomic on two synched vars:
int readValue() const{ if (cacheValid) return val; else{ val = expensive_calculation(); cacheValid = true; //if thread 1 and 2 are both here, there'd be more CPU usage! return val; } } std::atomic<bool> cacheValid = false; std::atomic<int> val;
-
Use a std::atomic, if there's only one memory locations requiring synchronization, as std::atomic is less expensive than a lock acquiring and releasing
========================================================================
========================================================================
-
std::future::valid()
checks if the future has a shared_state.
========================================================================
========================================================================
- C++17: synchronized pool: each pool has chunks of contiguous arrays, when you request it, it will return a pointer to the smallest memory to u. see here ========================================================================
========================================================================
- access to POSIX threads (pthreads)
pthread_getschedparam(t1.native_handle(), &policy, &sch);