Design Document Release 1.0 - RamanVerma/taskqueues GitHub Wiki

#Design Document for Release 1.0

The document contains following sections.

  • Section 1 - Aim Of The Document
  • Section 2 - Project Statement
  • Section 3 - Implementation Details
  • Section 3.1 - Design Philosophy
  • Section 3.2 - Technical Overview
  • Section 3.3 - Workflow
  • Section 3.4 - Structures In Detail
  • Section 3.5 - API Overview

###Section 1 - Aim Of The Document

Declare the project goals for release 1.0 and describe the implementation details.

###Section 2 - Project Statement

A. Provide a robust infrastructure to user space C programs for executing multiple tasks in parallel. The infrastructure must be similar to linux kernel workqueues, providing sub-structures where tasks can be queued for a thread to process(there being one thread per sub-structure). The program must be able to access the same with a simple API.

B. Take away the complexity of defining/maintaining structures required to queue tasks and to synchronize among the threads operating on these queues. Thereby allowing programmer to focus on business logic while using a fairly advanced infrastructure to parallelize/synchronize work.

C. Provide a single API call for each of the following jobs.

  • define taskqueues with a configurable number of threads.
  • queue tasks to a queue structure, one belonging to each thread.
  • synchronization call to wait for all the currently queued tasks to finish
  • destroy the whole taskqueue structure gracefully.

###Section 3 - Implementation Details

####Section 3.1 - Design Philosophy

Application programs should be able to make full use of the computing power at their disposal while executing tasks that can be parallelized. Specially the compute intensive tasks that can make use of the multi core environments widely available. While PThreads provide us with a fundamental API for creating and managing multi threaded applications, they do involve fair bit of low level details that a programmer needs to concern herself with. These are details like creation of threads, synchronization among them and such.

The programmer has to be an expert on Pthreads and the concepts of parallel computing to write a decent sized code. While it is intrinsic to being a good software developer, it is NOT the real challenge that she is trying to solve here. It is more to be seen as an enabling infrastructure to solve a problem. Hence, I want to take it out as a separate library that can be linked to by the application thereby making the code more manageable and faster to write and test.

####Section 3.2 - Technical Overview

Rather than creating/managing individual threads dealing with individual tasks, the programmer deals with something called a taskqueue. This is similar in functionality to workqueues from the Linux Kernel space.

A taskqueue has a configurable number of sub structures, appropriately named as sub_taskqueues. Also, it contains a list of special flush structures defined and used by a calling thread to sleep while all the currently queued tasks finish executing.

Each sub_taskqueue primarily contains the following elements.

  • queue structure where user defined tasks(functions) can be queued
  • PThread based thread to operate on these tasks
  • PThread based condition variable for the thread to sleep and be woken when there is a task in the queue

Each flush structure primarily contains the following elements.

  • PThread based condition variable where the calling thread sleeps while waiting for all the queued tasks to finish execution.
  • a queue containing IDs of the last task at each sub_taskqueue at the time when flush API call was made. All these tasks have to finish before the calling thread is woken up.
  • a counter for the number of tasks on which calling thread will wait.

Each task structure primarily contains the following elements.

  • pointer to the function that defines the task
  • pointer to the structure that contains data to be operated upon
  • an ID that is unique among all the tasks in the queue

####Section 3.3 - Workflow

#####Create a taskqueue:

The application creates a taskqueue structure with the following API

taskqueue_t *create_taskqueue(char *);

This call creates a taskqueue with set number of threads which is equal to the number of CPU cores in the machine. User can create a taskqueue with a configurable number of sub_taskqueues, using the following API call

taskqueue_t *create_custom_taskqueue(char *, int, int);

Or, a single threaded taskqueue, using the following API call

taskqueue_t *create_singlethread_taskqueue(char *);

NOTE: This library can enhance the performance for multi threaded programs by avoiding multiple context switches. If we randomly create a thread for each task, or create a huge number of threads, there will certainly be un necessary context switching in the kernel. Rather, we create same number of threads as the number of CPU cores(default) and make them work on queues holding tasks. Hence, the overhead of thread creation, deletion and context switch goes away!! Also, if the tasks are more IO Intensive, we can create taskqueues with a large number of threads and if the tasks are CPU Intensive, we can create the default taskqueues.

#####Queue tasks to the taskqueue:

The application then queues tasks to the taskqueue with the following API

int queue_task(taskqueue_t *, void(*)(void *), void *);

This call creates a task structure out of the user function(parameter 2) and the user data(parameter 3), and queues it to one of the sub_taskqueues. The sub_taskqueue can be selected in various interesting ways. For release 1.0, I have implemented a round robin scheme that distributes the tasks equally among these sub_taskqueues.

NOTE: Interesting work can be done around selection of the correct sub_taskqueue Think about Markov Chains !! They sure beat the crap out of me in grad school :)

#####Barriers:

The application can wait on the completion of a set of tasks before proceeding further, by the following API call

int flush_taskqueue(taskqueue_t *);

A flush structure is created from among the last tasks in all the sub_taskqueues at the time of making this call. This structure is added to a list maintained by the taskqueue structure. After executing any of the tasks, the corresponding thread belonging to the subtaskqueue checks if any flush structure mentions that task it just finished. If so, it decrements a counter on the corresponding flush structure, and wakes the sleeping threads in case the counter reaches zero. Remember, tasks can be added to the taskqueues by other threads while one thread has called flush and is waiting on it. Such tasks do not impact the waiting thread.

#####Destroy the taskqueue:

The application can destroy the taskqueue with the following API call

void destroy_taskqueue(taskqueue_t *);

The design philosophy here is to allow any queued tasks to finish before the library releases memory held by the taskqueue and its associated structures. A terminal task is added to each of the sub_taskqueues and their ID field is set to -1. This allows any sleeping threads to wake and execute this terminal task. As the thread goes over its loop to find another task, OR to sleep, it checks a condition if the ID field is set to -1. If so, if breaks out of the loop and exits. This way we make sure that any thread waiting on a flush is woken up, or atleast the tasks it was waiting upon are finished.

When we set the sub_taskqueue ID to -1, it stops any more tasks to be queued to the sub_taskqueue. There is a mutex lock that makes queuing a task and marking a sub_taskqueue for destruction as mutually exclusive operations.

Any thread waiting on flush does some cleanup when it wakes up and uses certain locks on the flushlist to do so. We ascertain that this cleanup happens before we free the taskqueue strucuture, by using a condition variable where the destroy call sleeps untill the last of the flush structures has been removed from the flush list.

####Section 3.4 - Structures In Detail

This section is more or less a copy/paste from the taskqueue.h file. It makes sense to me to copy the structure details here while you are reading the design, BUT please double check with the header file for the latest and greatest comments.

/* describes the task queue */
struct taskqueue_s {
    /* number of sub taskqueues in this taskqueue */
    int tq_num_stq;
    /* mutex to protect write access to tq_good_stq */
    pthread_mutex_t tq_count_good_stq_lock;
    /* number of usable sub taskqueues in this taskqueue */
    int tq_good_stq;
    /* array of sub taskqueues, each handled by a separate thread */
    struct sub_taskqueue_s *tq_stq; 
    /* algorithm to be used to select a sub taskqueue for adding a task */
    int tq_stq_sel_algo;
    /* mutex to be used for selecting a sub taskqueue while adding a task */
    pthread_mutex_t tq_stq_sel_lock;
    /* round robin sub taskqueue selection counter */
    int tq_next_rr_stq;
    /* lock for adding/removing flush structures from flush linked list */
    pthread_mutex_t tq_flushlist_lock;
    /* head for the linked list of flush structures */
    flush_t *tq_flushlist_head;
    /* tail for the linked list of flush structures */
    flush_t *tq_flushlist_tail;
    /* mark a taskqueue for destruction */
    int tq_marked_for_destruction;
    /* cond var for destroy_tq fn when waiting for flush structs to be freed */
    pthread_cond_t tq_yield_to_flush_structs_cond;
    /* identifier for the task queue */
    char *tq_id;
};

/* information for a thread waiting for flush_taskqueue operation */
struct flush_s {
    /* lock to protect the count variable and to be used with cond variable */
    pthread_mutex_t f_lock;
    /* condition variable for the calling thread to sleep on */
    pthread_cond_t f_condvar;
    /* counter representing number of tasks to wait upon */
    int f_num_wake_prereq;
    /* array of task sequence number from all the sub taskqueues */
    int *f_id;
    /* pointer to the next flush struct */
    struct flush_s *f_next;
};

/* describes the sub_taskqueue */
struct sub_taskqueue_s {
    /* id of the sub task queue */
    int s_id;
    /* mutex to synchronize worker and destroy/SIGKILL access to task list */
    pthread_mutex_t s_worker_lock;
    /* condition variable for thread waiting for work */
    pthread_cond_t s_more_task;
    /* thread that will execute tasks queued for this sub taskqueue */
    pthread_t s_worker;
    /* mutex to lock the taskqueue */
    pthread_mutex_t s_tasklist_lock;
    /* head of the linked list for tasks in the queue */
    task_t *s_tasklist_head;
    /* tail of the linked list for tasks in the queue */
    task_t *s_tasklist_tail;
    /* number of tasks queued, -1 signifies sub taskqueue locked */
    int s_num_tasks;
    /* pointer to the taskqueue this sub taskqueue struct belongs to */
    taskqueue_t *s_parent_tq;
};

/* describes the task in the queue */
struct task_s {
    /* identifier for the task(unique within each sub taskqueue) */
    int t_tid;
    /* pointer to the next task */
    struct task_s *t_next;
    /* pointer to the pending function */
    void(*t_fn)(void *);
    /* pointer to the data to be passed to the pending function */
    void *t_data;
    /* pointer to the sub_taskqueue where this struct is queued */
    struct sub_taskqueue_s *t_stq;
};

####Section 3.5 - API Overview

Please refer to the API Document for usage details of the API. Adding the same here for the sake of completeness.

/* funtions used to create a taskqueue */
taskqueue_t *create_taskqueue(char *);
taskqueue_t *create_singlethread_taskqueue(char *);
/* functions to queue a task */
int queue_task(taskqueue_t *, void(*)(void *), void *);
/* function to flush a task queue */
int flush_taskqueue(taskqueue_t *);
/* functions used to destroy a taskqueue */
void destroy_taskqueue(taskqueue_t *);