C++_Multithreading_Design - RicoJia/notes GitHub Wiki
========================================================================
========================================================================
-
Tricky deadlock
class Foo{ public: int data; std::mutex mtx_; }; void exchange_data(Foo& f1, Foo& f2){ // usually, locking two locks (for two separate things) following a specific order will work, but in this case it's prone to deadlock f1.mtx_.lock(); f2.mtx_.lock(); std::swap(f1.data, f2.data); f1.mtx_.unlock(); f2.mtx_.unlock(); } // here's the deadlock: it might be that f1 and f2 locking at the same time. and each is waiting for the other to release. Foo f1, f2; std::thread t1(exchange_data, f1, f2); std::thread t2(exchange_data, f2, f1);
- one problem with mutex is contention - threads trying to grab the same lock, to gain access to the same lock. First versions of Linux had one global kernel lock, and there was a lot of contention. Fine-grained locks are better.
-
std::lock
to avoid deadlocks-
std::lock
can emit exceptions.- why exception?
std::mutex
is locked before main() - will unlock when there's exception
- Recommended to use std::scoped_lock, RAII and doesn't need lock_guard
- example with
lock_guard and std::adopt_lock
void foo(Obj& o1, Obj& o2){ if(&o1 == &o2) return; std::lock(o1.m, o2.m); // Transfer ownership of mutexes to lock guard, to make sure they will be unlocked at the end of scope std::lock_guard<std::mutex>(o1.m, std::adopt_lock); std::lock_guard<std::mutex>(o2.m, std::adopt_lock); //Do stuff }
- why exception?
-
-
std::mutex::try_lock()
tries to acquire lock without blocking - if the mutex is being held by another thread, will return false.std::mutex mtx; int i = 0; void foo(){ if (mtx.try_lock()){ ++i; mtx.unlock(); } } int main(){ std::thread th1(foo); std::thread th2(foo); }
-
std::recursive_mutex
allows multiple locking by the same thread-
std::mutex
is locked by the same thread again, deadlock with undefined behaviour.
-
-
Avoiding deadlocks:
- avoid calling 2 mutexes at a time. If so, do this in
std::lock
However, if you write general-purpose code, you can't control it - use the same order to lock and unlock: If you can find an order, technically this order will make sure you'll lock things in a sequence. The cost is you can't traverse in the other direction.
list: node A -> B -> C thread 1 lock sequence: A -> B, thread 2 lock sequence: B -> C
- Hierarchical lock:
- When you have to use nested locks on a thread for different levels of tasks, hold the high level mutex first, then hold the lower level mutexes.
- So a thread only waits for lower level functions to finish on other threads
- If a thread tries to lock a mutex for higher level task, after locking a lower level task, then it's prohibited during run time.
- An Example of the locking hirarchy
int main(){ hierarchical_mutex high_level_mtx(10000); hierarchical_mutex low_level_mtx(500); auto high_level_func = [&high_level_mtx]{ std::lock_guard<hierarchical_mutex> lg1 (high_level_mtx); // this is good because we're not locking mtx for lower level //.... high level stuff; } std::mutex th_high_level(high_level_func); auto low_level_func = [&low_level_mtx]{ std::lock_guard<hierarchical_mutex> lg1 (low_level_mtx); std::lock_guard<hierarchical_mutex> lg2 (high_level_mtx); // will cause runtime error, as locking a lower level mutex is prohibited // high_level stuff, then low level stuff } std::mutex th_high_level(low_level_func); }
- Untested Implementation of the
hierarchical_mutex
- has
try_lock, lock, unlock
so it can be put intostd::lock_guard
- workflow:
- init on any thread with a hierarchy val,
- on another thread if you can lock it, thread_hierarchical value will change.
- If another lock fails to be acquired, that's cuz its hierarchical val is higher than the current one
class Hierarchical_Mutex{ private: static int this_thread_hierarchy_val_; int previous_hierarchy_; int hierarchy_candidate_; std::mutex mtx_; void check_for_hierarchy_violation(){ if (this_thread_hierarchy_val_ < hierarchy_candidate_) throw std::logic_error("Low level lock cannot be acquired before a high level task is completed!") } void update_hierarchy(){ previous_hierarchy_ = this_thread_hierarchy_val_; this_thread_hierarchy_val_ = hierarchy_candidate_; } public: explicit Hierarchical_Mutex(int hierarchy): hierarchy_candidate_(hierarchy){} void lock(){ check_for_hierarchy_violation(); mtx_.lock() update_hierarchy(); } bool try_lock(){ check_for_hierarchy_violation(); if (!mtx_.try_lock()) return false; update_hierarchy(); return true; } void unlock(){ this_thread_hierarchy_val_ = previous_hierarchy_; mtx_.unlock(); } }; int Hierarchical_Mutex::this_thread_hierarchy_val = INT_MAX; // initially any thread can lock this.
- has
- avoid calling 2 mutexes at a time. If so, do this in
-
adopt_lock
vsdefer_lock
-
std::lock_guard<std::mutex>(mtx, std::adopt_lock)
on construction, assuming it's been locked by the thread. (so you can't lock it later either) -
std::unique_lock<std::mutex>(mtx, std::defer_lock)
doesn't lock on contruction, assuming later it will be lockedstd::unique_lock<std::mutex> ul1(o1.mtx, defer_lock); std::unique_lock<std::mutex> ul2(o2.mtx, defer_lock); std::lock(ul1.mtx, ul2.mtx); // putting ul1, ul2 into lock, since std::unique_lock has lock(), try_lock(), etc.
-
std::unique_lock
is a bit slower thanstd::lock_guard
, but you can lock it at a later time.- Internally it has a flag indicating whether the mutex is being locked by itself:
try_lock
andunlock
may set this flag to false. - Flag important for the RAII-ness
-
owns_lock()
can check. - Because of the flag_check,
std::unique_lock
is a bit slower,
- Internally it has a flag indicating whether the mutex is being locked by itself:
- Flexiblity of std::unique_lock - ownership can be transferred: when you have one generic function to prepare some data, this function can transfer its unique_lock to a custom function
std::mutex mtx; std::unique_lock<std::mutex> prepare_data(){ std::unique_lock<std::mutex> ul(mtx); //do_stuff; return ul; } void custom_func(){ std::unique_lock<std::mutex> ul2(prepare_data()); // chaining the two funcs together; don't need std::move for NRVO // custom process }
-
std::unique_lock
can help reducing the time to hold two mutexes together.class Foo{ std::mutex mtx_; int i; public: int get_data(){ std::unique_lock<std::mutex> ul(mtx_); return i; // this may be a copy } }; bool operator ==(const Y& lhs, const Y& rhs){ if (&rhs == &lhs) return true; int i = lhs.get_data(); int j = rhs.get_data(); // technically, i and j may change right before this line, so that's a race condition return i==j; }
-
========================================================================
========================================================================
- Read-only data that needs initialization only once:
-
std::mutex
to check is CPU consuming -
std::atomic
may be used. -
std::once_flag may be the best choice, cuz it's designed just for initialization
- it has 3 states: execution-not-started, in-progress, started. Once it's finished, the flag is stored on L1 cache.
-
std::atomic
CAS is more expensive than this "load and acquire" - movable but not copyable
- lazy initialization: YAGNI - "You ain't gonna need it", will only initialize resource when we need it.
-
-
std::once_flag
andstd::call_once
#include <mutex> class Foo{ private: // std::mutex mtx_; //No need for mutex! std::once_flag flag_; std::vector<int> long_vec_; void init_long_vec(){long_vec_ = std::vector<int>(100000,666); }; public: void do_stuff_on_vec(){ std::call_once(flag_, &Foo::init_long_vec, this); } };
- initialization of
static variables
in function is already thread_safe!- whoever gets to initialize the static variable will "temporarily" block other threads.
void multi_threaded_func(){ static std::vector<int> vec(10000, 0); }
- whoever gets to initialize the static variable will "temporarily" block other threads.
- Multi-threaded singleton
- use
std::once_flag
if necessaryclass Foo{ ... static *Foo get_instance(){ std::call_once(flag_, []{instance_ = new Foo();}); return instance_; } private: static std::once_flag flag_; static Foo* instance_; };
- alternative: create a static instance inside the fucntion, instead of storing it as class member
class Foo{ public: static *Foo get_instance(){ static Foo f; // thread-safe lazy initialization in C++11 return *f; } Foo(const Foo&) = delete; Foo& operator=(const Foo&) = delete; private: Foo(){} };
- use
========================================================================
========================================================================
- A simple block implementation of threads:
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <functional>
#include <numeric>
using namespace std;
// parallel accumulation: a vector is segmented into blocks, block has n threads.
// n = 2 if returned is 0 or 1; or n = length(vec) if we don't have many elements.
// Or n = std::thread::hardware_concurrency() - 1 since you already have a thread.
// Too many threads = lots of context switching
// We store the result of each block, then we sum them up.
int main()
{
std::vector<int> test_vec {1,2,3,4,5,6,7,8,9,10};
unsigned int num_hardware_threads = std::thread::hardware_concurrency();
unsigned int num_threads = std::min((num_hardware_threads == 0 || num_hardware_threads == 1)? 2 : num_hardware_threads - 1, (unsigned int) test_vec.size());
std::vector<int> results(num_threads);
std::vector<std::thread> thread_pool;
for(unsigned int id = 0; id < num_threads; id++){
thread_pool.emplace_back(
std::thread(
[&results, &test_vec, id, num_threads]{
for(auto start = id; start < test_vec.size(); start += num_threads){
results.at(id) += test_vec.at(start);
}
}
)
);
}
std::for_each(thread_pool.begin(), thread_pool.end(), std::mem_fn(&std::thread::join));
auto sum = std::accumulate(results.begin(), results.end(), 0);
cout<<sum<<endl;
return 0;
}
========================================================================
========================================================================
- threadpool: fixed num of threads, saves on overhead for thread creation, (which might be expensive).
- Notes
- future: can I bind a class function? yes.
- class function can modify class members?
- Can I use this on decoder?
- Code
-
thread_pool
must need you to catch the future, and wait for things to finish. Also, the overhead of thread_pool might be high, compared to GPU// summing over a vector using the "block" method with the threadpool, assuming at least 1 thread is available. Public method for testing. // CAUTION: it can be slower than std::accumulate, Please do some speed profiling on your machine. double multi_threaded_sum(const std::vector<double>& vec); double Particle_Filter::multi_threaded_sum(const std::vector<double>& vec){ unsigned int num_threads = thread_pool_ -> get_size(); std::vector<double> results(num_threads); std::vector<std::future<void>> fut_vec; // why void? for(unsigned int id = 0; id < num_threads; id++){ auto func = [&results, &vec, id, num_threads]{ for(auto start = id; start < vec.size(); start += num_threads){ results.at(id) += vec.at(start); } }; fut_vec.emplace_back(thread_pool_ -> enqueue(func)); } for (auto& fut : fut_vec) fut.get(); auto sum = std::accumulate(results.begin(), results.end(), 0.0); return sum; }
-
About destruction with thread_pool
-
destruct the objects first, then the thread pool
-
when destructing objects, stop the loops first. - certain tricks are useful:
- have a stop variable
- if you have another loop internally, make sure it can get out too.
-
Cautions:
-
You need a while loop in the main function to make sure your objects don't get destructed
-
future.get() will wait till when the function gets updated. So if you have a while loop at the end, you might be screwed
-
Destruction could be tricky for threadpool - In a class, destruction order is the order in declaration - if your task has a while loop inside, you must know how to terminate it properly.
- throwing an exception if necessary - If your threadpool has tasks from two different objects, then you must terminate the two objects first
- If their destructions are dependent on each others', then don't use threadpool -tricks:
- use a std::unique_ptr, if you need to free raw pointers. That will ensure destruction sequence.
- you can explicitly call destructor on unique_ptr without worrying about double freeing
- If you haven't initialized unique_ptr, dtor won't be called
-
another way to push a task onto thread: use
std::thread::detach
andstd::future
template<typename F,typename A> std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f,A&& a) { typedef std::result_of<F(A&&)>::type result_type; std::packaged_task<result_type(A&&)> task(std::move(f))); std::future<result_type> res(task.get_future()); std::thread t(std::move(task),std::move(a)); t.detach(); return res; }
========================================================================
========================================================================
- queue never pops (mutex not acquired) - wait condition
- why? conditional variable not waken up? - because if queue push happens before queue_pop (on a different thread), then push's notify_one has no effect on pop.
- This may cause hanging if we have complex destruction order, because only explicitly calling destructor can "unblock this"
- say we have object A that uses
threadsafe_queue
. A has to be destructed beforethreadsafe_queue
, because it has an infinite loop which has to be stopped first by A's own signal. Ifthreadsafe_queue
gets destructed, the infinite loop may keep running and hits a segfault. - in that case, we want to have an "stop()" function that the user can use to notify
threadsafe_queue::pop
to get out of the wait.
- say we have object A that uses
- Set up
- A singly linked list is good, with head and tail
- Use unique_ptr for head, and next, so objects can be properly destructed
- Design Choices
- two mutexes for head and tail - corner case: one node in list:
push
andpop
are pushing and popping to the same node. This means both functions need to lock both mutexes so we are not modifying the same node at the same time! - add dummy node to tail. When head == tail, we don't pop, but can only push.
- So when there's only the dummy node, we never modify it at the same time.
- When there's one valid node & the dummy node, we can push and pop as usual.
- So push doesn't need head - pop is the only one that accesses head. So push and pop just fights for one mutex.
- However 2 pops still need to fight for a another mutex.
- two mutexes for head and tail - corner case: one node in list:
- Race Condition Inherent to
std::stack
- e.g
#include <iostream> #include <stack> #include <vector> int main() { std::stack<std::vector<int>> s; s.push(std::vector<int>{1,2,3}); // race condition may happen here, since we're saving and modifying non-atomically auto vec = s.top(); s.pop(); return 0; }
- Rationale: the designers of
std::stack
deliberately broke this "top and pop" into two separate operations, cuz copying duringtop()
may fail, due to memory constraits. (std::bad_alloc
exception). So if there's an exception, at least data is still on stack. - But we need atomic top and pop to avoid race!
- e.g
- How to save this?
- Not gut: pass in reference: this requires assignment operation, which may not be supported
stack.pop(vec); //stores the result in vec.
- Not gut: requires no-throw copy ctor or move ctor. This directly requires the user to be responsible, but not ideal.
- run time detect if a type is no throw:
std::is_nothrow_copy_constructible
,std::is_nothrow_move_constructible
- run time detect if a type is no throw:
- Not gut: Return a pointer:
- memory management is tricky: lib will be in full control of the object.
- return a shared_ptr, so the user can decide whether to make it unique... Not gut cuz needs copy ctor.
- Not gut: pass in reference: this requires assignment operation, which may not be supported
- mutable mutex:
template <typename T> class ThreadSafeStack{ public: bool empty() const{ std::lock_guard<std::mutex> lg(mtx_); // need mutable cuz locking changes its internal state. return data_stack.empty(); } private: mutable std::mutex mtx_; std::stack<T> data_stack_; };
========================================================================
========================================================================
-
Hierarchy of Guarantees
- Non-blocking: Nothing is suspended by the OS (like fut.wait()), but it can be a spin lock (so no gurantee that more than one thread can make progress?)
std::atomic_flag fl; while (fl.test_and_set(std::memory_order_acquire)){}
- Lock free: There's always someone making progress
std::atomic_flag fl; while (!fl.test_and_set(std::memory_order_acquire)){ //redo certain things }
- Might have live locks, where all threads may have to start over again. Like two ppl in a narrow corridor, trying to be polite. Can be avoided by randomly choosing a thread, or by priority
- Main difference bw deadlock and livelock is live lock won't be blocked, but it will redo certain things. And the "certain things they do" might continue the live lock
- live lock example
// process 0 flags[0] = true; while (flags[1] == true){ flags[0] = false; // this "redone" step will continue the live lock } // Process 1 flags[1] = true; while (flags[0] == true){ flags[1] = false; }
- hard to get right! Performance might decrease too
- Wait free: avoid redoing stuff, and can finish in finite number of steps
- Non-blocking: Nothing is suspended by the OS (like fut.wait()), but it can be a spin lock (so no gurantee that more than one thread can make progress?)
-
std::atomic_flag
guaranteed not to have a lock -
wait-free
is one level above lock free, since lock free can still spin.- starvation is two threads accesses the data structure but does not acquire the lock, a thread spinning is starving.
- On the other hand you might have blocking threads.
-
Queue:
- one lock per element in the queue.
- single-producer, single consumer. Lock-free queue:
- disadvantage: in low contention, cpu usage might be high since it's spinning. So this might be even worse than the first one
-
ABA
thread 0) POP(): old_top = A, new_top = B. thread 1) POP(): old_top = A, new_top = B. thread 1) // complete pop A, and set head to B thread 1) PUSH(D): old_top = B, new_top = D thread 1) // complete push D, and set head to D thread 1) PUSH(A): old_top = D, new_top = A thread 1) // complete push A, and set head to A thread 0) // complete pop A and set new_top to B, head is now B ***
========================================================================
========================================================================
- A main function, with long-running sockets, threads:
-
pause()
will put the thread into sleep.#include <unistd.h>
-
while(1){}
will be interrupted, but after that will keep running. So use pause()
-
- destruction on exit (
ctrl-c
)- local variables don't get destructed, cuz stack unwinding doesn't do shit on it.
- static variable WILL be destructed
- global variables WILL BE destructed too
-
kill -15
orkill -9
are sending SIGTERM.ctrl-c
sends SIGINT
-