[Java][Thread][Code Analysis] BoundedBuffer and PutTakeTest - Gukie/learning GitHub Wiki

PutTakeTest

This part mainly about how to use CyclicBarrier

First, let give a basic knowledge about CyclicBarrier

int pairNum  = 10;
CyclicBarrier barrier = new CyclicBarrier(pairNum * 2+1);

Above code means:

  • Barrier will be opened(other word, tripped) when pairNum * 2+1,also called parties, threads have invoked barrier.await().
  • Since doWork() is running in our main thread, so the parties number of CyclicBarriershould be pairNum * 2+1

public void doWork() {
		try {
			for (int i = 0; i < pairNum; i++) {
				executorService.execute(new Consumer());
				executorService.execute(new Producer());
			}
			System.err.println("starting awaiting");
			barrier.await(); // [1] wait all threads are ready
			barrier.await(); // [2]
			System.err.println("finished awaiting");
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			executorService.shutdownNow();
		}
	}

class Consumer implements Runnable {
		@Override
		public void run() {
			try {
				int sum =0;
				barrier.await(); // [1]
				for(int i =0;i<trialNum;i++){
					int seed = buffer.take();
					sum+=seed;
				}
				takeSum.getAndAdd(sum);
				barrier.await(); // [2]
//				System.err.println("consumed:"+sum);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	class Producer implements Runnable {

		@Override
		public void run() {
			try {
				int sum = 0;
				barrier.await(); // [1]
				int seed = (int) (this.hashCode() ^ System.nanoTime());
				for (int i = 0; i < trialNum; i++) {
					buffer.put(seed);
					sum += seed;
					seed = RandomUtils.generateFakeRandom(seed);
				}
				putSum.getAndAdd(sum);
				barrier.await(); // [2]
//				System.err.println("produced:"+sum);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

Above code is designed for following intend:

1. Code Snippet 1

Following code is waiting all sub-tasks(here is Consumer and Producer) ready, and run at the same time.

barrier.await();//[1]

2. Code Snippet 2

barrier.await();//[2]

Above code is waiting all sub-tasks to finished. If this code snippet is missing, following exception will be thrown since the main thread will be finished but the sub-task thread is not, so the sub-task thread will be interrupted, and InterruptedException will be thrown.

java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
	at java.util.concurrent.Semaphore.acquire(Unknown Source)
	at com.lokia.pm.bean.BoundedBuffer.take(BoundedBuffer.java:34)
	at com.lokia.pm.main.PutTakeTest$Consumer.run(PutTakeTest.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
	at java.util.concurrent.Semaphore.acquire(Unknown Source)
	at com.lokia.pm.bean.BoundedBuffer.put(BoundedBuffer.java:21)
	at com.lokia.pm.main.PutTakeTest$Producer.run(PutTakeTest.java:89)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)