C _Multithreading_Tools - RicoJia/notes GitHub Wiki

Chinese Version

========================================================================

Condition Variable

======================================================================== 0. NOT RECOMMENDED because you might get notified before the listener thread starts!. use std::promise<void>

  1. Say you have two functions, that one function depends on another

    • You need an mtx, one lock guard, at least one unique_lock, and one condition variable
      • unique_lock so condition_variable can do ul.unlock();
      • You check the condition because of spurious wakeups, not just C++ has this problem!
    • Notify_one or notify_all:
      • notify_all will wake up all threads, then they will fight for the lock. Contention!
    • Basic Eg (not recommended for the above reasons)
    #include <mutex>
    #include <condition_variable>
    #include <thread>
    std::condition_variable cv; 
    std::mutex mtx; 
    int balance = 0; 
    void Foo(){
      std::lock_guard<std::mutex> lg(mtx); 
      balance += 30; 
      cv.notify_one(); 
    }
    
    void Bar(){
      // have a lock 
      std::unique_lock<std::mutex> ul(mtx); 
      // unlock, sleep, wakes up by cv.notify_one(), locks, evaluate predicate
      cv.wait(ul, [](){return balance > 60; }); //lambda has access to global variable;  
      // because Foo is executed once, cv will always sleep
      // lock is still locked here
      // process with mtx being locked
    }
    
    int main(){
      std::thread t1(Bar); 
      std::thread t2(Foo);
      t1.join(); 
      t2.join(); 
    }
    • Draw backs:
      1. you need a mutex
      2. there're spurious wakeups
      3. the Foo might finish before cv.wait even gets executed.
  2. Solution to the drawbacks of pure cond var

    1. std::atomic is better than the one before
    std::atomic<bool> flag = false; 
    void Foo(){
        ...
        flag = true; 
    }
    void Bar(){
        while (!flag);    // Polling    
    }
    • Drawback: your polling will ACTUALLY occupy a hardware thread. In condition variable, a blocking thread can be shut off and put to sleep.
    1. A "mestizo solution"
    std::condition_variable cv; 
    std::mutex mtx; 
    bool flag(false);     // you just need a bool
    void Foo(){
        ... 
        lock_guard<mutex> lg(mtx); 
        flag = true; 
        cv.notify_one(); 
      }
    void Bar(){
        ...
        std::unique_lock<mutex> ul(mtx); 
        cv.wait(ul, []{return flag;}); 
      }
    • no polling!
    • Drawback:
      1. Bar may not aquire the lock!
      2. you set a bool, but also do notifying! Not clean
  3. conditional_variable_any can work with any mutex (while conditional_variable just with std::mutex), it's larger, and may be slower as well

  4. Recommended way to wait on a conditional variable with time limit, if you're not passing in a predicate:

    auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(60);
    std::unique_lock<std::mutex> ul(m);
    while (!done){
        //it's okay if you don't pass in the predicate.
        if(cv.wait_until(lk, timeout) == std::cv_status::timeout)
          break;
    
        // if you use this, once you get a spurious wake up, you will start waiting again!
        if(cv.wait_for(lk, time) == std::cv_status::timeout) break;
    }
    return done
    • another one is std::cv_status::no_timeout

========================================================================

Conditional Variable Alternative: Promise & Future

========================================================================

std::promise

  1. Basic example. see code

    • A data channel, promise sets a value, future catches it. Data Channel: two end points, pass in std::promise and read from std::future
    • Cons: in ..., p might not be able to set value if there's an exception. Then, t1 will hang
    • Uses:
      1. A promise is meant to be used only once. A setter-getter model. So we cannot pass the same promise into multiple functions
      2. std::promise: aim to replace condition variables, good for handling a lot of small tasks that launch std::thread. E.g, TCP connections
      3. std::future was designed for "one-off" events.
        • std::thread doesn't provide a mechanism to directly transfer result back.
        • std::future<void> specialization when there's no associated data.
        • std::shared_future should be accessed by multiple threads by mutexes?
  2. each promise has a reference to a shared state, and result. The shared state can be:

    • make_ready: the result has been stored in result, or exception has been stored in shared state. Then, unblocks thread that's waiting on the future.
    • release: the promise gives up reference to the shared state. The reference might be destroyed, if this was the last reference.
    • Abandon: promise stores and error code as exception. The state will be first make_ready, then release
  3. The shared state stored on heap. In a place shared by std::promise, last instance of std::shared_future

std::future

  1. 3 ways to get std::future: std::async, std::packaged_task, std::promise. see code

  2. Cautions of std::future, see code

  3. Destructor of futures:

    • if it's the last future for a shared state, non-deferred task, launched via std::async, dtor will block until the task finishes
    • In all other futures (e.g, don't have shared state, or deferred task, or launched by std::packaged_tasks)
      • dtor is basically calling "detach".
      • decrementing the reference count
    • so future that's 1. moved, 2. shared_future 3. copied shared_future might point to the same shared state.
    • therefore, we basically don't know if the dtor will block, because it depends on other threads.
  4. Wait (std::future), see code

    status = future.wait_for(std::chrono::seconds(1));    // release after 1s, or the result becomes available
    if (fut.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready){}
    • there are 3 states: std::future_status::ready, std::future_status::timeout, std::future_status::deferred(if the task was deferred)

std::shared_future

  1. If you have multiple threads, use shared future, whose shared_state is known by all shared_future, like a better version of notify_all. see code
    • Pros:
      • No spurious wakeups, only condition_variables have it.
      • Bar always knows when Foo finishes, even if Foo finishes before Bar starts! (not like condition_variable)
      • No mutex needed.
      • No polling!
      • Just one variable for the "notification", very clean!
      • one application of shared_future is excel sheet, where multiple cells using formulas depend on a cell input. Once that becomes available, it should be notified.
    • Cons:
      • promise can be used only once, so this is for one-shot applications.

========================================================================

Std::packaged_tasks(C++11)

========================================================================

  1. Basic Usage: you create a future, then designate it to another thread

    1. packaged_task can wrap around a callable, i,e, a lambda, a regular function
      • you must start it on a thread
      • movable, not copyable, just like promise
    2. std::packaged_task is a high level abstraction of std::promise
    3. std::packaged_task will store the result in an internal shared state, so std::future can read it.
    4. Canonical use of packaged_task with a regular function - so it can be pushed to a thread pool. see code
  2. Usage: have a queue, and let another thread process these tasks

  • can be created with std::bind, lambda, or use std::thread(pt, 1, 2)
  #include <future>
  #include <iostream>
  #include <deque>
  #include <functional>
  #include <utility>
  int addNumbers(int a, int b) {
    return a + b;
  }
  std::deque<std::packaged_task<int()>> dq; 
  void print_num(){
    while (dq.size() > 0){
      std::packaged_task<int()> task = std::move(dq.at(0)); 
      dq.pop_front(); 
      task();     //it doesn't return int here, it's just executing. You need future to get the return value
    }
  }
  int main() {
    // std::packaged_task<int()> pt ([](){return 1+1; });
    // std::packaged_task<int()> pt = std::bind(addNumbers, 1,2);  //doesn't work, why?
    // std::threadi t1(pt, 1, 2); 
    std::packaged_task<int()> pt(std::bind(addNumbers, 1,2)); 
    dq.push_back(std::move(pt)); 
    std::future<int> fu = dq.front().get_future(); 
    std::thread t1 (print_num); 
    t1.join(); 
    std::cout<<"sum: "<<fu.get()<<std::endl;  
  }
  1. std::packaged_task() converted to std::function<void()>?. There's no explicit conversion, but there's a trick
    #include <vector>
    #include <functional>
    #include <future>
    // #include <>
    int main () 
    {
        std::vector<std::function<void()>> vec;
        using task_t = std::packaged_task<int()>; 
        task_t task( [] { return 100; } );
        auto future = task.get_future();        // will work even after moving. 
        vec.emplace_back( std::move(task) );       // ERROR: you're still passing packaged_task into std::function, which is non-copy-construstible.
        vec.emplace_back([t = std::make_shared<task_t>(std::move(task))]{
          // with a shared ptr, this lambda is copy-constructible. 
          (*task)();    // calling function inside, this is the trick to make a std::function.
        })
    }
    • of course you may want to get the std::future of packaged_task.
      • even though we have moved packaged_task, we can still see the valid result. See here
      • std::future can be moved into a shared_future too, so the data channel cannot store its result in either std::future, or std::promise. Instead, it's stored in shared state.. That's the reason why we can still get a valid result back.
    • do std::bind(std::forward<F>(f), std::forward<Args>(args)...) before making the packaged_task, this way we can have empty argument list.

========================================================================

Std::async

========================================================================

  1. See above for basic usage of std::async. see code

    • std::async can take in an move-constructed, copy-constructed, pointer, or reference.
    • launch policy
  2. Advantages of async, see code 2. No Over-subscription - when there're more software threads than hardware threads, the scheduler creates "time-slice" for each thread. When one time-slice is finished, a "context switch" is performed. -switch hardware thread -> another hardware thread for a software thread, CPU caches for that software thread contains little data & instructions. which pollutes CPU caches for other threads. - std::async used cutting-edge scheduler. 3. Less load-balancing issue? - Default is no guarantee for std::launch::async, because that's running on a different thread immediately - So we're letting the scheduler to defer some executions. 4. std::async internally uses an optimized threadpool. Tested, around the same performance as a threadpool

  3. Cons of Async

    1. Does not work well with TLS (thread local storage)
      #include <future>
      thread_local int i = 0; 
      int f(){return ++i;}
      int main(){
        i = 1; 
        auto fut = std::async(f); 
        int new_i = fut.get();  // you may get 1 as you create an independent thread, or 2 from main()'s i
      }
    2. The default async mode cannot:
      • Predict whether f is concurrently
      • f runs at all.

Task vs threads

  1. Motivation: Imagine we want to compress a file in a separate thread, then return file name and size once we're done.
    • If we use std::thread, the traidtional way,
      1. lock mutex
      2. we need shared_variables for storing the new values
      3. use a condition_variable to wake up the main thread.
    • If we want to return 100 different values in different points in time, this method is not practical. std::future will handle the mutex, condition_variable for u. If you create too many std::threads, you'll get std::system_error

========================================================================

Comparisons

========================================================================

  1. std::async vs pacakged_task vs std::promise
    1. packaged_task is like std::function, but stores result in a future, with shared_state, so it can be run asynchronously.

      • Task based programming using std::async assumes no access to the same data by threads. Haskell uses this
      • Erlang uses "Communicating Sequential Processes", which creates channels for communication between threads.
    2. std::async automatically launches on certain thread, while packaged task can be launched on any thread.

      // async
      auto as = std::async(std::lauch::deferred, sleep);
      cout<<"hehe"<<endl;     //even though as is deferred, we can still see this instantly
      // this will block
      as.get();
      
      // packaged_task
      std::packaged_task<int ()> task(sleep);
      auto fut = task.get_future();
      task();      // explicit invokation
      cout << "heee"<<endl;     // this will have to wait since we launched in main thread.
      cout<<fut.get();          //this will print the final value.
    3. std::async doesn't need to be invoked, std::packaged_task does. Also, invoke the program before calling fut.get()!!

    4. std::future from std::async destruction will block the main thread. that from std::packaged_task won't

      bool get_pizza(){
        std::this_thread::sleep_for(10s);
        return true;
      }
      std::future<bool> get_pizza_as = std::async(get_pizza);
      if (need to go)
        return;     // block even tho there's no block!
      • Reason is:
        1. cpp requires the last reference of shared_state of std::promise - std::future pair must finish executing before destruction.
        2. std::packaged_task needs explicit invokation, like a function, so that's explicit.
        3. std::promise just sets a value, std::future.get() from std::promise will block, but that's quite explicit. if not called std::future.get(), no blocking.
          std::promise<int> pr;
          auto fut = pr.get_future();   // if exit here, no blocking.
          fut.get();
        4. But future = std::async() will start immediately or a bit later, so it is hidden. Destruction will block even without future.get()
    5. As a summary, std::packaged_task is a wrapper that calls a function, and "sets" the result to future (thru its internal std::promise). std::async calls std::packaged_task automatically on a different thread.

========================================================================

atomic

======================================================================== 0. Motivation - std::atomic is much smaller than std::mutex, cuz mutex is operation system level, generating a lot of code - use std::atomic<T>::is_lock_free to check if a type is lock free cpp a.is_lock_free()

  1. Basic Usage: std::atomic
#include <atomic>
std::atomic<bool> if_write_data_(true); 
if_write_data_ = false; 
  1. std::atomic restrictions
    1. It's for concurrent, not designed for special registers. Special registers should use volatile
      volatile x;
      auto i = x
      i = 1; 
      i = 2; 
      • auto here deduces to int, because it drops cv-qualifiers
        • volatile means no optimization
        • volatile is reserved for sensors, printers, network ports, normal memories. Memory-mapped I/O
      • imagine replacing the above with std::atomic
        • those two would be "redundant load (read), and store (write)"
        • so they can be optimized.
      • If you should use std::atomic but instead you use volatile,
        • values might be changed externally
        • You get some random value while reading and writing to the same value
        • this is called "Data race"
      • But you can have volatile std::atomic, which is: atomic can be changed externally any time, but each thread has to change it atomically.
    2. std::atomic restrictions:
      1. cannot be copied (ctor, =), or moved (ctor, =)
      • Because on hardware it's very difficult to read the value and write to another register atomically.
      1. needs special load and store because of the above restriction. Also, since = means copy construction by default, initialization needs direct-initialization () or list initialization {}.
      std::atomic<int> x(1); 
      std::atomic<int> y(x.load()); 
      y.store(x.load()); 
      1. std::atomic cannot be moved, so classes with std::atomic CANNOT be moved either
    3. Other implementation details
      • std::atomic does not use mutex internally
      • std::atomic uses RMW hardware operations.
      • std::atomic ensures sequential consistancy
  2. Atomic Container
  • std::atomicstd::string doesn't exist
    • not trivially_copyable
    • trivially copyable = continguous storeage, arrays.
  • std::unique_ptr cannot be atomic because it's not copyable
  • std::shared_ptr can be atomic.
    • But there's a bunch of atomic operations shared_ptr can do see here
    • std::shared_ptr's control block is atomic, but not the resource.
    • value = std::atomic_load(ptr) atomically loads the ptr
  1. Theory: Closer look at std::atomic.

    1. everything is built on top of std::atomic_bool. which can be used to build a lock
    • it's very basic, don't use it.
    1. std::atomic is a template, has load(), store(), assignment
    • assignment: returns a copy of the value, not a reference, (otherwise race condition is easy)
      • can be used for implicit type conversions
    • store: essentially the same as assignment
    1. std::atomic may have locks(TODO)
  2. compare_exchange(T& expected, T desired):

    1. this is compare and swap (CAS) in cpp. if value is expected, load desired, and return true. Else, store the actual value in expected, and return false. (Note, it's T&)
    2. compare_exchange_weak() can return false spuriously, even though the actual value is equal to the expected. In a while loop it should be fine.
    3. compare_exchange_strong()
    4. std::mutex simple implementation
      #include <iostream>
      #include <atomic>
      #include <thread>
      #include <chrono>
      using namespace std;
      using namespace std::chrono_literals; 
      
      int main()
      {
          std::atomic<bool> rico_mtx(false);      //note deleted: std::atomic<bool> rico_mtx = false; 
          int sum = 0; 
          auto func = [&rico_mtx, &sum]{                  //note: NOT rico_mtx&! pay attention to the position of &
                          bool expected = false;          // You must have a separate variable to store the actual value
                          while(!rico_mtx.compare_exchange_weak(expected, true));     //you can use compare_exchange_strong as well
                          for (int i = 0; i < 10; ++i){
                              ++sum; 
                              std::this_thread::sleep_for(10ms);     
                          }
                          rico_mtx = false; 
                      }; 
          std::thread th1(func); 
          
          std::thread th2(func); 
          
          th1.join(); 
          th2.join(); 
          cout<<sum<<endl; 
      }

========================================================================

Memory Model

======================================================================== 0. IMPORTANT: std::atomic with std::memory_order_seq_cst does ensure order, even though change may not become immediately visible. Great answer

  1. Motivation:

    1. Before C++11, primarily everything is on single-threaded apps.
    2. Pthreads lib assume no data race. Also, it's faster because it has more relaxed rules. However, pthreads are not guraranteed to work on every single compiler in the same way. Therefore, std::thread should be implemented as a non-third-party lib
      • C++ 11 added memory model, threading, atomics,mutexes Before, it could be only done in POSIX, BOOST, or Windows Thread API
    3. reordering of operations can happen during compilation stage. Or, because writing to memory takes longer than reading from shared memory, there's a store buffer for writing which writes to memory altogether. This is due to optimization
      • Memory Ordering: "Thou shalt not modify the behavior of a single-threaded program.", so as long as the behavior of a single threaded program is not broken, the compiler achieves its goal.

      Simplified Modern Chip Architecture
    • Memory Ordering by compiler, source
      • cpp code
        int A, B;
        void foo()
        {
            A = B + 1;
            B = 0;
        }
      • without optimization:
        $ gcc -S -masm=intel foo.c
        $ cat foo.s
        ...
        mov     eax, DWORD PTR _B  (redo this at home...)
        add     eax, 1
        mov     DWORD PTR _A, eax
        mov     DWORD PTR _B, 0
        ...
      • With Optimization:
        $ gcc -O2 -S -masm=intel foo.c
        $ cat foo.s
        ...
        mov     eax, DWORD PTR B
        mov     DWORD PTR B, 0
        add     eax, 1
        mov     DWORD PTR A, eax
        ...
      • Therefore, it's possible that B (which may be atomic) can be ready before A (which may/may not be atomic)
      • To prevent, std::atomic or functions are automatically compiler fences.
  2. ensuring sequential consistency across multiple threads are costly. (or just on one thread?)

    1. code optimization by compiler
    2. instruction reordering
    3. write buffers in processors
  3. Need guarantee that write to one var by a thread must be visible to another

    1. atomic variables = volatile in Java
    2. memory_order_seq_cst
      std::atomic<int> a;
      a.store(12, std::memory_order_seq_cst); // memory order sequential consistent, the easiest, the default, and ensures a single-threaded "sequential consistency"
      int b = a.load(std::memory_order_seq_cst);
    1. 6 orders. Note that all these memory_orders are just for operations on THE SAME object!!
      1. memory_order_relaxed
      2. memory_order_acquire
      3. memory_order_release
      4. memory_order_consume
      5. memory_order_acq_rel
      6. memory_order_seq_cst
    2. seq_cst is the default, an
  4. what is memory fence? aka memory barrier, instruction that guarantees that a CPU execute an instruction before another,

  5. CAS: compare and swap

    CAS(*val, expected, newval){
        if (*val == expected)
            expected = new val; 
            return 0; 
        else{
            return 1;
        } 
    }
    • Implement mutex:
    Lock(*mutex){
        while(CAS(mutex, 1, 0));    // if another lock has called, this will wait. 
    }
    
    Unlock(*mutex){
        *mutex = 1;     // set the value to 1.  
    }
  6. Locks for synchronization is slow. Atomic is a lot faster. memory_order enum forbids the reordering of Memory access

    • the default is memory_order_seq_cst, which generates a memory fence after every single load/ memory barrier.
    • relaxed: no restrictions. i.e, atomic store, and load are still atomic actions, but they can actually happen in any arbitrary order on a single thread, doesn't have to obey the order in the source code at all.
        // Thread 1:
        r1 = y.load(std::memory_order_relaxed); // A
        x.store(r1, std::memory_order_relaxed); // B
        // Thread 2:
        r2 = x.load(std::memory_order_relaxed); // C 
        y.store(42, std::memory_order_relaxed); // D
      • You may still see r1 == r2 ==42, since it's possible to have D>A>B>C
    • E.g, can be used in implementing a counter, where we only care if the total count has passed a value. So that requires adding atomically, but not and ordering or synchronization.
    • The other models will generate fences on load, store operations on the same thread
    • I tested with my compiler, seems that memory_order_seq_cst was always enforced...

========================================================================

Make const function thread-safe

========================================================================

  • There're scenarios where const member function modifies a mutable object, and this function is called by different threads
  • So you
    • Use a std::atomic, if there's only one memory locations requiring synchronization, as std::atomic is less expensive than a lock acquiring and releasing
      • an atomic num is only movable, not copyable
    • use a lock, if there are more than one memory locations requiring synchronization
      • release and acquiring are non-const members, so the lock has to be declared as mutable
      • a lock is only movable, not copyable
    • E.g of good example: lock used for managing more than one synced variable
        class Widget{
            public: 
            int readValue() const{    // declared const as we're telling user that we're only reading value. But actually, we want to cache value at the same time if it's not there. 
               std::lock_guard<std::mutex> guard(mtx); // we place a lock here, because there're more than one vars to worry about!
               if (cacheValid) return val; 
               else{
                  val = expensive_calculation(); 
                  cacheValid = true; 
                  return val; 
                 }
              }
      
            private: 
            mutable std::mutex m;     //it's mutable because lock and release are NOT const member functions. 
            mutable int val; 
            mutable bool cacheValid = false;  // This value might be flipped multiple times. 
          }
    • E.g, bad example: using std::atomic on two synched vars:
        int readValue() const{
           if (cacheValid) return val; 
           else{
              val = expensive_calculation(); 
              cacheValid = true;    //if thread 1 and 2 are both here, there'd be more CPU usage!
              return val; 
             }
        }
        std::atomic<bool> cacheValid = false; 
        std::atomic<int> val; 

========================================================================

Reminders

========================================================================

  1. std::future::valid() checks if the future has a shared_state.

========================================================================

More Tools

========================================================================

  1. C++17: synchronized pool: each pool has chunks of contiguous arrays, when you request it, it will return a pointer to the smallest memory to u. see here ========================================================================

POSIX Threads

========================================================================

  1. access to POSIX threads (pthreads)
    pthread_getschedparam(t1.native_handle(), &policy, &sch);
⚠️ **GitHub.com Fallback** ⚠️