生产者消费者模型 - morris131/morris-book GitHub Wiki

生产者消费者模型

原理

生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

一个生产者和一个消费者:操作值

package com.morris.ch3;

public class OneProviderConsumerTest {

	static volatile String value = null;

	static Object lock = new Object();

	public static void main(String[] args) {
		new Provider().start();
		new Consumer().start();
	}

	static class Provider extends Thread {
		@Override
		public void run() {
			while (true) {
				synchronized (lock) {
					if (null == value) {
						value = System.nanoTime() + "";
						System.out.println("provider set value:" + value);
						lock.notify();
					} else {
						try {
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}
		}
	}

	static class Consumer extends Thread {
		@Override
		public void run() {
			while (true) {
				synchronized (lock) {
					if (null == value) {
						try {
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					} else {
						System.out.println("consumer get value:" + value);
						value = null;
						lock.notify();
					}
				}
			}
		}
	}

}

多个生产者和多个消费者:操作值

package com.morris.ch3;

public class MultiProviderAndMultiConsumerTest {
	
	static volatile String value = null;

	static Object lock = new Object();

	public static void main(String[] args) {
		new Provider("provider 1").start();
		new Provider("provider 2").start();
		new Consumer("consumer 1").start();
		new Consumer("consumer 2").start();
	}

	static class Provider extends Thread {
		
		Provider(String name) {
			super(name);
		}
		
		@Override
		public void run() {
			while (true) {
				synchronized (lock) {
					if (null == value) {
						value = System.nanoTime() + "";
						System.out.println(Thread.currentThread().getName() + " set value:" + value);
						//lock.notify();
						lock.notifyAll();
					} else {
						try {
							System.out.println(Thread.currentThread().getName() + " is waiting");
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}
		}
	}

	static class Consumer extends Thread {
		
		Consumer(String name) {
			super(name);
		}
		
		@Override
		public void run() {
			while (true) {
				synchronized (lock) {
					if (null == value) {
						try {
							System.out.println(Thread.currentThread().getName() + " is waiting");
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					} else {
						System.out.println(Thread.currentThread().getName() + " get value:" + value);
						value = null;
						//lock.notify();
						lock.notifyAll();
					}
				}
			}
		}
	}

}

使用notify会出现假死,解决办法改为notifyAll。

⚠️ **GitHub.com Fallback** ⚠️