java_消息队列 - YUHAO-ZX/StudyCollection GitHub Wiki
1.生产消费者基本实现BlockQueue
class BoundedBuffer {
<b>final Lock lock = new ReentrantLock();</b>
final Condition notFull = <b>lock.newCondition(); </b>
final Condition notEmpty = <b>lock.newCondition(); </b>
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
<b>lock.lock();
try {</b>
while (count == items.length)
<b>notFull.await();</b>
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
<b>notEmpty.signal();</b>
<b>} finally {
lock.unlock();
}</b>
}
public Object take() throws InterruptedException {
<b>lock.lock();
try {</b>
while (count == 0)
<b>notEmpty.await();</b>
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
<b>notFull.signal();</b>
return x;
<b>} finally {
lock.unlock();
}</b>
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//如果超出了空间,从0开始继续写
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
问题:
A:blockqueue上述实现进行了加锁,通过通知的方式来唤醒待执行的线程,耗时比较高
B:如果改为忙轮询(自旋)的方式,那么会提高效率,但一直轮询会导致CPU浪费,占用过高
C:disruptor的实现中和了两种方式,规避了效率低下和CPU浪费的问题
2.disruptor
@Override
public long next(int n)
{
long nextValue = pad.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = pad.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//核心代码:unsafe.park() 将线程挂起1纳秒(已经是最小单位了)
//既节约了CPU,又最大可能的去感知队列的变化
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
pad.cachedValue = minSequence;
}
pad.nextValue = nextSequence;
return nextSequence;
}
3.SleepingWaitStrategy 相对CPU友好的实现,LockSupport.parkNanos(1L); 可能存在延时
private static final int RETRIES = 200;
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = RETRIES;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
if (counter > 100)
{
--counter;
}
else if (counter > 0)
{
--counter;
Thread.yield();
}
else
{
LockSupport.parkNanos(1L);
}
return counter;
}
4.YieldingWaitStrategy 相对效率友好的方式,比SleepingWaitStrategy 更高效,但同样CPU也相对消耗较高
private static final int SPIN_TRIES = 100;
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = SPIN_TRIES;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
if (0 == counter)
{
Thread.yield();
}
else
{
--counter;
}
return counter;
}
5.disruptor 其他优化
1.volatile 对象:内存屏障 保证总是读取到最新的值
.对对象写后会将最新值同步到所有CPU缓存
.对对象读之前会将最新值获取
有了内存屏障,那么在做自旋判断的时候将可以放心进行判断,不需要担心取到了别的线程已经改后的值
/**
* Perform a volatile read of this sequence's value.
*
* @return The current value of the sequence.
*/
public long get()
{
return unsafe.getLongVolatile(paddedValue, valueOffset);
}
2.对cpu和内存间缓存的优化
.cpu和内存间缓存目前机器一般用64byte为单位进行缓存,如缓存8byte的long,会将其前或后的变量进行缓存
.如果其中有某个变量发生了变化,那么会对整个64byte进行缓存失效或者重新加载
.这样变量的变动会相互影响
.故disruptor用占为的方式进行了优化
.代码解析
static final long INITIAL_VALUE = -1L;
private static final Unsafe unsafe;
private static final long valueOffset;
static
{
unsafe = Util.getUnsafe();
final int base = unsafe.arrayBaseOffset(long[].class);
final int scale = unsafe.arrayIndexScale(long[].class);
valueOffset = base + (scale * 7);
}
//使用15*8 = 120byte 来占位,取paddedValue[7] 做为其业务值
private final long[] paddedValue = new long[15];
//get方法
/**
* Perform a volatile read of this sequence's value.
*
* @return The current value of the sequence.
*/
public long get()
{
return unsafe.getLongVolatile(paddedValue, valueOffset);
}