rust impl thread pool - ltoddy/blog GitHub Wiki

用 Rust 实现线程池

依稀记得旧版本的Rust官网写着: Rust 是一种系统编程语言。 它有着惊人的运行速度,能够防止段错误,并保证线程安全。 可惜Rust的标准库中并没有提供线程池.

一种实现并发的策略: CSP(communication sequential processes)模型,天然支持并发的Go语言也是基于此模型实现的并发.

使用线程池,除了并发,更好的是为了线程的复用,当我们有大量的可独立运行的Job的时候,如果为每一个Job的运行 都创建一个线程,开销是想当然不小的,如果能够提前创建合适数量的线程,并让这些线程不停的执行Job,那么额外的 开销就会小很多.

实现思路: 使用管道(channel)作为Job的传输载体,线程池初始化时创建多个线程,每个线程等待管道传输的Job. 线程池去提交Job发送到管道中.


使用一个Message作为管道的传输数据,当线程池关闭的时候,释放线程池中的线程.

enum Message {
    Job(Box<dyn FnOnce() + Send + 'static>),
    Terminate,
}

Worker中掌握这线程勾柄,用于让线程执行任务和释放线程,当worker工作时,不断等待管道发来的Message, 收到Message后,对其消费,执行Message中的Job,如果发来的是终止消息,那么worker停止工作.

struct Worker {
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(receiver: Arc<Mutex<Receiver<Message>>>) -> Self {
        let thread = thread::spawn(move || Self::work(receiver));

        Self { thread: Some(thread) }
    }

    fn work(receiver: Arc<Mutex<Receiver<Message>>>) {
        loop {
            let message = receiver
                .lock()
                .expect("Poisoned thread")
                .recv()
                .expect("channel has shut down.");

            match message {
                Message::Job(job) => job(),
                Message::Terminate => break,
            }
        }
    }
}

线程池的执行器会创建多个worker,和管道. 当有Job提交到线程池后,线程池将Job封装成Message,发送给worker.

struct ThreadPoolExecutor {
    workers: Vec<Worker>,
    sender: SyncSender<Message>,
}

impl ThreadPoolExecutor {
    fn new() -> Self {
        Self::with_capacity(16)
    }

    fn with_capacity(capacity: usize) -> Self {
        assert!(capacity > 0);

        let (sender, receiver) = sync_channel::<Message>(1024);
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::<Worker>::with_capacity(capacity);

        for _ in 0..capacity {
            workers.push(Worker::new(Arc::clone(&receiver)));
        }

        Self { workers, sender }
    }

    fn submit<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Message::Job(Box::new(f));

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPoolExecutor {
    fn drop(&mut self) {
        for _ in 0..self.workers.len() {
            self.sender.send(Message::Terminate).unwrap();
        }

        for worker in &mut self.workers {
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

源码: https://github.com/ltoddy/threadpool

⚠️ **GitHub.com Fallback** ⚠️