C _Multithreading_Design - RicoJia/notes GitHub Wiki
Tricky deadlock
class Foo{ public: int data; std::mutex mtx_; }; void exchange_data(Foo& f1, Foo& f2){ // usually, locking two locks (for two separate things) following a specific order will work, but in this case it's prone to deadlock f1.mtx_.lock(); f2.mtx_.lock(); std::swap(f1.data, f2.data); f1.mtx_.unlock(); f2.mtx_.unlock(); } // here's the deadlock: it might be that f1 and f2 locking at the same time. and each is waiting for the other to release. Foo f1, f2; std::thread t1(exchange_data, f1, f2); std::thread t2(exchange_data, f2, f1);
- one problem with mutex is contention - threads trying to grab the same lock, to gain access to the same lock. First versions of Linux had one global kernel lock, and there was a lot of contention. Fine-grained locks are better.
to avoid deadlocks-
can emit exceptions.- why exception?
is locked before main() - will unlock when there's exception
- Recommended to use std::scoped_lock, RAII and doesn't need lock_guard
- example with
lock_guard and std::adopt_lock
void foo(Obj& o1, Obj& o2){ if(&o1 == &o2) return; std::lock(o1.m, o2.m); // Transfer ownership of mutexes to lock guard, to make sure they will be unlocked at the end of scope std::lock_guard<std::mutex>(o1.m, std::adopt_lock); std::lock_guard<std::mutex>(o2.m, std::adopt_lock); //Do stuff }
- why exception?
tries to acquire lock without blocking - if the mutex is being held by another thread, will return false.std::mutex mtx; int i = 0; void foo(){ if (mtx.try_lock()){ ++i; mtx.unlock(); } } int main(){ std::thread th1(foo); std::thread th2(foo); }
allows multiple locking by the same thread-
is locked by the same thread again, deadlock with undefined behaviour.
Avoiding deadlocks:
- avoid calling 2 mutexes at a time. If so, do this in
However, if you write general-purpose code, you can't control it - use the same order to lock and unlock: If you can find an order, technically this order will make sure you'll lock things in a sequence. The cost is you can't traverse in the other direction.
list: node A -> B -> C thread 1 lock sequence: A -> B, thread 2 lock sequence: B -> C
- Hierarchical lock:
- When you have to use nested locks on a thread for different levels of tasks, hold the high level mutex first, then hold the lower level mutexes.
- So a thread only waits for lower level functions to finish on other threads
- If a thread tries to lock a mutex for higher level task, after locking a lower level task, then it's prohibited during run time.
- An Example of the locking hirarchy
int main(){ hierarchical_mutex high_level_mtx(10000); hierarchical_mutex low_level_mtx(500); auto high_level_func = [&high_level_mtx]{ std::lock_guard<hierarchical_mutex> lg1 (high_level_mtx); // this is good because we're not locking mtx for lower level //.... high level stuff; } std::mutex th_high_level(high_level_func); auto low_level_func = [&low_level_mtx]{ std::lock_guard<hierarchical_mutex> lg1 (low_level_mtx); std::lock_guard<hierarchical_mutex> lg2 (high_level_mtx); // will cause runtime error, as locking a lower level mutex is prohibited // high_level stuff, then low level stuff } std::mutex th_high_level(low_level_func); }
- Untested Implementation of the
- has
try_lock, lock, unlock
so it can be put intostd::lock_guard
- workflow:
- init on any thread with a hierarchy val,
- on another thread if you can lock it, thread_hierarchical value will change.
- If another lock fails to be acquired, that's cuz its hierarchical val is higher than the current one
class Hierarchical_Mutex{ private: static int this_thread_hierarchy_val_; int previous_hierarchy_; int hierarchy_candidate_; std::mutex mtx_; void check_for_hierarchy_violation(){ if (this_thread_hierarchy_val_ < hierarchy_candidate_) throw std::logic_error("Low level lock cannot be acquired before a high level task is completed!") } void update_hierarchy(){ previous_hierarchy_ = this_thread_hierarchy_val_; this_thread_hierarchy_val_ = hierarchy_candidate_; } public: explicit Hierarchical_Mutex(int hierarchy): hierarchy_candidate_(hierarchy){} void lock(){ check_for_hierarchy_violation(); mtx_.lock() update_hierarchy(); } bool try_lock(){ check_for_hierarchy_violation(); if (!mtx_.try_lock()) return false; update_hierarchy(); return true; } void unlock(){ this_thread_hierarchy_val_ = previous_hierarchy_; mtx_.unlock(); } }; int Hierarchical_Mutex::this_thread_hierarchy_val = INT_MAX; // initially any thread can lock this.
- has
- avoid calling 2 mutexes at a time. If so, do this in
std::lock_guard<std::mutex>(mtx, std::adopt_lock)
on construction, assuming it's been locked by the thread. (so you can't lock it later either) -
std::unique_lock<std::mutex>(mtx, std::defer_lock)
doesn't lock on contruction, assuming later it will be lockedstd::unique_lock<std::mutex> ul1(o1.mtx, defer_lock); std::unique_lock<std::mutex> ul2(o2.mtx, defer_lock); std::lock(ul1.mtx, ul2.mtx); // putting ul1, ul2 into lock, since std::unique_lock has lock(), try_lock(), etc.
is a bit slower thanstd::lock_guard
, but you can lock it at a later time.- Internally it has a flag indicating whether the mutex is being locked by itself:
may set this flag to false. - Flag important for the RAII-ness
can check. - Because of the flag_check,
is a bit slower,
- Internally it has a flag indicating whether the mutex is being locked by itself:
- Flexiblity of std::unique_lock - ownership can be transferred: when you have one generic function to prepare some data, this function can transfer its unique_lock to a custom function
std::mutex mtx; std::unique_lock<std::mutex> prepare_data(){ std::unique_lock<std::mutex> ul(mtx); //do_stuff; return ul; } void custom_func(){ std::unique_lock<std::mutex> ul2(prepare_data()); // chaining the two funcs together; don't need std::move for NRVO // custom process }
can help reducing the time to hold two mutexes together.class Foo{ std::mutex mtx_; int i; public: int get_data(){ std::unique_lock<std::mutex> ul(mtx_); return i; // this may be a copy } }; bool operator ==(const Y& lhs, const Y& rhs){ if (&rhs == &lhs) return true; int i = lhs.get_data(); int j = rhs.get_data(); // technically, i and j may change right before this line, so that's a race condition return i==j; }
- Read-only data that needs initialization only once:
to check is CPU consuming -
may be used. -
std::once_flag may be the best choice, cuz it's designed just for initialization
- it has 3 states: execution-not-started, in-progress, started. Once it's finished, the flag is stored on L1 cache.
CAS is more expensive than this "load and acquire" - movable but not copyable
- lazy initialization: YAGNI - "You ain't gonna need it", will only initialize resource when we need it.
#include <mutex> class Foo{ private: // std::mutex mtx_; //No need for mutex! std::once_flag flag_; std::vector<int> long_vec_; void init_long_vec(){long_vec_ = std::vector<int>(100000,666); }; public: void do_stuff_on_vec(){ std::call_once(flag_, &Foo::init_long_vec, this); } };
- initialization of
static variables
in function is already thread_safe!- whoever gets to initialize the static variable will "temporarily" block other threads.
void multi_threaded_func(){ static std::vector<int> vec(10000, 0); }
- whoever gets to initialize the static variable will "temporarily" block other threads.
- Multi-threaded singleton
- use
if necessaryclass Foo{ ... static *Foo get_instance(){ std::call_once(flag_, []{instance_ = new Foo();}); return instance_; } private: static std::once_flag flag_; static Foo* instance_; };
- alternative: create a static instance inside the fucntion, instead of storing it as class member
class Foo{ public: static *Foo get_instance(){ static Foo f; // thread-safe lazy initialization in C++11 return *f; } Foo(const Foo&) = delete; Foo& operator=(const Foo&) = delete; private: Foo(){} };
- use
- A simple block implementation of threads:
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <functional>
#include <numeric>
using namespace std;
// parallel accumulation: a vector is segmented into blocks, block has n threads.
// n = 2 if returned is 0 or 1; or n = length(vec) if we don't have many elements.
// Or n = std::thread::hardware_concurrency() - 1 since you already have a thread.
// Too many threads = lots of context switching
// We store the result of each block, then we sum them up.
int main()
std::vector<int> test_vec {1,2,3,4,5,6,7,8,9,10};
unsigned int num_hardware_threads = std::thread::hardware_concurrency();
unsigned int num_threads = std::min((num_hardware_threads == 0 || num_hardware_threads == 1)? 2 : num_hardware_threads - 1, (unsigned int) test_vec.size());
std::vector<int> results(num_threads);
std::vector<std::thread> thread_pool;
for(unsigned int id = 0; id < num_threads; id++){
[&results, &test_vec, id, num_threads]{
for(auto start = id; start < test_vec.size(); start += num_threads){
results.at(id) += test_vec.at(start);
std::for_each(thread_pool.begin(), thread_pool.end(), std::mem_fn(&std::thread::join));
auto sum = std::accumulate(results.begin(), results.end(), 0);
return 0;
- threadpool: fixed num of threads, saves on overhead for thread creation, (which might be expensive).
- Notes
- future: can I bind a class function? yes.
- class function can modify class members?
- Can I use this on decoder?
- Code
must need you to catch the future, and wait for things to finish. Also, the overhead of thread_pool might be high, compared to GPU// summing over a vector using the "block" method with the threadpool, assuming at least 1 thread is available. Public method for testing. // CAUTION: it can be slower than std::accumulate, Please do some speed profiling on your machine. double multi_threaded_sum(const std::vector<double>& vec); double Particle_Filter::multi_threaded_sum(const std::vector<double>& vec){ unsigned int num_threads = thread_pool_ -> get_size(); std::vector<double> results(num_threads); std::vector<std::future<void>> fut_vec; // why void? for(unsigned int id = 0; id < num_threads; id++){ auto func = [&results, &vec, id, num_threads]{ for(auto start = id; start < vec.size(); start += num_threads){ results.at(id) += vec.at(start); } }; fut_vec.emplace_back(thread_pool_ -> enqueue(func)); } for (auto& fut : fut_vec) fut.get(); auto sum = std::accumulate(results.begin(), results.end(), 0.0); return sum; }
About destruction with thread_pool
destruct the objects first, then the thread pool
when destructing objects, stop the loops first. - certain tricks are useful:
- have a stop variable
- if you have another loop internally, make sure it can get out too.
You need a while loop in the main function to make sure your objects don't get destructed
future.get() will wait till when the function gets updated. So if you have a while loop at the end, you might be screwed
Destruction could be tricky for threadpool - In a class, destruction order is the order in declaration - if your task has a while loop inside, you must know how to terminate it properly.
- throwing an exception if necessary - If your threadpool has tasks from two different objects, then you must terminate the two objects first
- If their destructions are dependent on each others', then don't use threadpool -tricks:
- use a std::unique_ptr, if you need to free raw pointers. That will ensure destruction sequence.
- you can explicitly call destructor on unique_ptr without worrying about double freeing
- If you haven't initialized unique_ptr, dtor won't be called
another way to push a task onto thread: use
template<typename F,typename A> std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f,A&& a) { typedef std::result_of<F(A&&)>::type result_type; std::packaged_task<result_type(A&&)> task(std::move(f))); std::future<result_type> res(task.get_future()); std::thread t(std::move(task),std::move(a)); t.detach(); return res; }
- queue never pops (mutex not acquired) - wait condition
- why? conditional variable not waken up? - because if queue push happens before queue_pop (on a different thread), then push's notify_one has no effect on pop.
- This may cause hanging if we have complex destruction order, because only explicitly calling destructor can "unblock this"
- say we have object A that uses
. A has to be destructed beforethreadsafe_queue
, because it has an infinite loop which has to be stopped first by A's own signal. Ifthreadsafe_queue
gets destructed, the infinite loop may keep running and hits a segfault. - in that case, we want to have an "stop()" function that the user can use to notify
to get out of the wait.
- say we have object A that uses
- Set up
- A singly linked list is good, with head and tail
- Use unique_ptr for head, and next, so objects can be properly destructed
- Design Choices
- two mutexes for head and tail - corner case: one node in list:
are pushing and popping to the same node. This means both functions need to lock both mutexes so we are not modifying the same node at the same time! - add dummy node to tail. When head == tail, we don't pop, but can only push.
- So when there's only the dummy node, we never modify it at the same time.
- When there's one valid node & the dummy node, we can push and pop as usual.
- So push doesn't need head - pop is the only one that accesses head. So push and pop just fights for one mutex.
- However 2 pops still need to fight for a another mutex.
- two mutexes for head and tail - corner case: one node in list:
- Race Condition Inherent to
- e.g
#include <iostream> #include <stack> #include <vector> int main() { std::stack<std::vector<int>> s; s.push(std::vector<int>{1,2,3}); // race condition may happen here, since we're saving and modifying non-atomically auto vec = s.top(); s.pop(); return 0; }
- Rationale: the designers of
deliberately broke this "top and pop" into two separate operations, cuz copying duringtop()
may fail, due to memory constraits. (std::bad_alloc
exception). So if there's an exception, at least data is still on stack. - But we need atomic top and pop to avoid race!
- e.g
- How to save this?
- Not gut: pass in reference: this requires assignment operation, which may not be supported
stack.pop(vec); //stores the result in vec.
- Not gut: requires no-throw copy ctor or move ctor. This directly requires the user to be responsible, but not ideal.
- run time detect if a type is no throw:
- run time detect if a type is no throw:
- Not gut: Return a pointer:
- memory management is tricky: lib will be in full control of the object.
- return a shared_ptr, so the user can decide whether to make it unique... Not gut cuz needs copy ctor.
- Not gut: pass in reference: this requires assignment operation, which may not be supported
- mutable mutex:
template <typename T> class ThreadSafeStack{ public: bool empty() const{ std::lock_guard<std::mutex> lg(mtx_); // need mutable cuz locking changes its internal state. return data_stack.empty(); } private: mutable std::mutex mtx_; std::stack<T> data_stack_; };
Hierarchy of Guarantees
- Non-blocking: Nothing is suspended by the OS (like fut.wait()), but it can be a spin lock (so no gurantee that more than one thread can make progress?)
std::atomic_flag fl; while (fl.test_and_set(std::memory_order_acquire)){}
- Lock free: There's always someone making progress
std::atomic_flag fl; while (!fl.test_and_set(std::memory_order_acquire)){ //redo certain things }
- Might have live locks, where all threads may have to start over again. Like two ppl in a narrow corridor, trying to be polite. Can be avoided by randomly choosing a thread, or by priority
- Main difference bw deadlock and livelock is live lock won't be blocked, but it will redo certain things. And the "certain things they do" might continue the live lock
- live lock example
// process 0 flags[0] = true; while (flags[1] == true){ flags[0] = false; // this "redone" step will continue the live lock } // Process 1 flags[1] = true; while (flags[0] == true){ flags[1] = false; }
- hard to get right! Performance might decrease too
- Wait free: avoid redoing stuff, and can finish in finite number of steps
- Non-blocking: Nothing is suspended by the OS (like fut.wait()), but it can be a spin lock (so no gurantee that more than one thread can make progress?)
guaranteed not to have a lock -
is one level above lock free, since lock free can still spin.- starvation is two threads accesses the data structure but does not acquire the lock, a thread spinning is starving.
- On the other hand you might have blocking threads.
- one lock per element in the queue.
- single-producer, single consumer. Lock-free queue:
- disadvantage: in low contention, cpu usage might be high since it's spinning. So this might be even worse than the first one
thread 0) POP(): old_top = A, new_top = B. thread 1) POP(): old_top = A, new_top = B. thread 1) // complete pop A, and set head to B thread 1) PUSH(D): old_top = B, new_top = D thread 1) // complete push D, and set head to D thread 1) PUSH(A): old_top = D, new_top = A thread 1) // complete push A, and set head to A thread 0) // complete pop A and set new_top to B, head is now B ***
- A main function, with long-running sockets, threads:
will put the thread into sleep.#include <unistd.h>
will be interrupted, but after that will keep running. So use pause()
- destruction on exit (
)- local variables don't get destructed, cuz stack unwinding doesn't do shit on it.
- static variable WILL be destructed
- global variables WILL BE destructed too
kill -15
orkill -9
are sending SIGTERM.ctrl-c
sends SIGINT