daily 2017 9 6 多线程并发相关操作 - wtdig/study GitHub Wiki

高并发相关知识

同步容器

在早期的JDK中,有两种现成的实现,Vector和Hashtable,可以直接new对象获取;

在JDK1.2中,引入了同步封装类,可以由Collections.synchronizedXxxx等方法创建;

同步容器类的问题

同步容器类虽然都是线程安全的,但是在某些情况下(复合操作),仍然需要加锁来保护;

常见复合操作如下:

迭代:反复访问元素,直到遍历完全部元素;

跳转:根据指定顺序寻找当前元素的下一个(下n个)元素;

条件运算:例如若没有则添加等;

volatile关键字可以使多个线程共享这个变量,使多个线程对这个变量可见,但是,不能起到线程安全的作用

new ThreadLocal,threadlocal相当于一个map集合,可以操作数据。他可以为每个线程都创建一个副本,每个线程都只会操作自己的那部分,不会导致高并发产生的脏读现象


并发容器

jdk1.5之后,使用ConcurrentHashMap代替传统的hashTable,CopyOnWriteArrayList代替Voctor

并发容器中,有一个重要的概念:Queue对象,分为:阻塞队列和非阻塞队列,非阻塞队列的代表为ConcurrentLinkedQueue,阻塞队列的代表为:LinkedBlockingQueue\ArrayBlockingQueue\SysnchonousQueue\ProrityBlockingQueque

ConcurrentMap下有2个实现,ConcurrentHashMap和ConcurrentSkipListMap(支持并发排序功能,类似treeMap)

ConcurrentHashMap实现并发的原理:将一块内存空间,分成16段,每份都是一个hashTable,这样每一块都有一个独立的锁,这样操作不同的数据时,可以并发操作。就比如,多并发时,你操作ConcurrentHashMap下标为1的数据,下标为2的数据,都是可以并发进行修改,如果是同时操作下标为1的数据,不操作其他下标的数据时,那就只有等待了

CopyOnWriteArrayList的实现原理:该容器是写时复制的容器,读写分离,当在容器中进行写,修改,删除操作时,复制该容器,生成一个新容器,在新容器中进行写,修改、删除操作(同时进行该操作是,会上锁的,只能有一个线程操作),这时,有多个线程去读取时,会去旧的容器进行读操作,当在新的容器上的写,修改、删除操作玩之后,将旧容器的指针指向新的容器,进行读操作。通俗说,就是写操作阻塞,读操作不阻塞

ConcurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁的方式,实现高并发下的该性能,它是一个基于链接节点的无界线程安全队列,遵循先进先出的原则,对队列不允许null元素。其中add()\offer()都是添加元素的方法,poll()\peek()都是去头元素节点,区别在于:前者会删除数据,后置不会

ArrayBlockingQueue是基于数组的阻塞队列实现,内部维护一个定长的数组,内部没有实现读写分离,意味着生成和消费不能完全并行执行,是个有界队列

LinkedBlockingQueue是基于链表的阻塞队列,内容采用读写分离,可以高并发处理数据,可以实现生成和消费完全并行运行,是一个无界队列

ProrityBlockingQueque是一个基于优先级的阻塞无界队列,可以通过compator进行排序

DalayQueue,带有延迟时间的阻塞队列,DalayQueue中的元素必须实现Delayed接口

SysnchonousQueue是一种没有缓存的队列,生产的数据会直接被消费者消费


多线程常用的设计模式

1、futrue模式,采用异步的方式,主线程发送请求,另起一个线程去处理该请求,主线程无需等待,进行执行,当需要获取请求的结果时,才调用callBack方法获取结果

2、Mater work模式,mater负责接受任务,分发任务,统计结果,work负责完成每一个分发的任务(一个work一个线程),通俗来说,就是并行计算

3、生产者消费者模式,生成者负责生成消息,消费者负责消费消息,2个线程同时工作,一般可以应用于消息中间件,例如MQ


并发工具类

CountDownLatch

一个线程在执行过程时,需要询问其他的线程,是否已经给我准备好资源了,只有准备好之后,该线程才可以进行执行下去,否则一直阻塞着

CycliBarrier

多个线程同时处理事情,主要当所有的线程都已经准备好了之后,一个线程发出开始,多个线程同时执行,如果有一个线程没有准备好,大家都不执行,都等待,只有全部准备好,才启动

Semaphore信号量,可以应用于限流,设置只有几个线程可以访问,意思就是:同时,有几个线程可以获取许可(记得线程执行完之后,释放掉权限)

exchanger:用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿到数据到来时,才能彼此交换数据


主要了解下:重入锁、读写锁

这种锁,是jdk1.5之后,优化锁的表现

采用lock\unLock方法替代synchronized同步方法、通过锁获取condition对象,采用condition的await方法替代wait方法、signal方法替代notify方法

注意:一个lock锁可以有多个condtion对象,这样就可以精准的await和signal某一个线程

重入锁:ReentrantLock,在需要加锁的代码上写lock(),和unLock()方法,一个加锁,一个释放锁,一般情况下,释放锁在finally里面进行

读写锁:ReentrantReadWriteLock,一个读写分离的思想,多线程在读操作时,该锁readLock.lock()允许并发执行,在写操作时,该锁writeLock.lock()是阻塞的,只能当一个线程操作完之后,另一个线程才能修改。读读不互斥、读写互斥、写写互斥


Deque双向队列:

这种队列允许在队列头和尾部进行入队出队操作,因此在功能上比Queue显然要更复杂


线程同步

1、同步方法synchronized;2、同步代码块;3、lock锁;4、Threadlocal;5、volatile

线程同步关键字解析

/***
     * 静态方法的同步方法,不同线程的静态方法互斥
     */
    public synchronized static void go1() {

    }


    /***
     * 成员方法的同步方法,同一个对象的多个线程,成员方法互斥
     */
    public synchronized void go2() {

    }

    /**
     * this的同步代码块,与成员方法互斥
     */
    public void go3() {
        synchronized (this) {

        }
    }

    /**
     * class的同步代码块,与静态方法互斥
     */
    public void go4() {
        synchronized (CodeMain.class) {

        }
    }


    /**
     * 多线程时,数据来自副本,不一定是最新的数据
     */
    private int i;

    public int getI() {
        return i;
    }

    /**
     * 多线程时,synchronized进行了主存和副本之间的数据同步,有一定的延迟,数据是最新的
     */
    private int j;

    public synchronized int getJ() {
        return j;
    }

    /**
     * 多线程时,数据取自主存中,数据是最新的
     */
    private volatile int k;

    public int getK() {
        return k;
    }

master\work模式的实现代码

参考资料

代码:

package masterwork;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class MasterThread {

    //1,用来承装任务的集合
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

    //2,用HashMap承装所有的work对象
    private Map<String, Thread> workers = new HashMap<String, Thread>();

    //3,用一个容器承装每一个worker执行任务的结果集
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    //4,构造方法
    public MasterThread(WorkerThread worker, int workerCount) {

        //每一个worker对象都需要有Master的引用 ,workQueue用于任务领取,resultMap用于结果集提交
        worker.setWorkerQueue(this.workQueue);
        worker.setResultMap(this.resultMap);

        for (int i = 0; i < workerCount; i++) {
            //key表示每一个worker的名字,value表示形成执行对象
            workers.put("子节点" + i, new Thread(worker));
        }
    }

    //5,提交方法
    public void submit(Task task) {
        workQueue.add(task);
    }

    //6,需要一个执行的方法,启动应用程序让所有的work工作
    public void execute() {
        for (Map.Entry<String, Thread> me : workers.entrySet()) {
            me.getValue().start();
        }
    }

    //7,判断线程是否执行完毕
    public boolean isComplete() {
        for (Map.Entry<String, Thread> me : workers.entrySet()) {
            if (me.getValue().getState() != Thread.State.TERMINATED) {
                return false;
            }
        }
        return true;
    }

    //8,返回结果集
    public int getResult() {
        int ret = 0;
        for (Map.Entry<String, Object> me : resultMap.entrySet()) {
            //汇总
            ret += 1;
        }
        return ret;
    }

    public void love() {
        long start = System.currentTimeMillis();
        while (true) {
            if (this.isComplete()) {
                long end = System.currentTimeMillis() - start;
                int result = this.getResult();
                System.out.println("最终结果:" + result + ",最终耗时:" + end);
                break;
            }
        }
    }

}



package masterwork;

public class Task {
    private String taskId;//任务id
    private String taskName;//任务名称

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

}


package masterwork;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class WorkerThread implements Runnable {

    private ConcurrentLinkedQueue<Task> workQueue;//用来承装任务的集合

    private ConcurrentHashMap<String, Object> resultMap;//用一个容器承装每一个worker执行任务的结果集

    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }


    public void run() {
        while (true) {
            Task task = this.workQueue.poll();
            if (task == null) break;
            //正真的去做业务处理
            Object output = handle(task);
            System.out.println(Thread.currentThread().getName() + "线程 ,执行" + output);
            this.resultMap.put(task.getTaskId(), output);
        }
    }

    //处理任务
    private Object handle(Task task) {
        Object output = null;
        try {
            //表示处理任务的耗时
            Thread.sleep(500);
            output = task.getTaskName();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }

}


package masterwork;

public class TestMasterWorker {

    public static void main(String[] args) {

        MasterThread masterThread = new MasterThread(new WorkerThread(), Runtime.getRuntime().availableProcessors());
        //添加100个任务
        for (int i = 0; i < 20; i++) {
            Task task = new Task();
            task.setTaskId("" + i);
            task.setTaskName("任务" + i);
            masterThread.submit(task);
        }
        //执行任务
        masterThread.execute();
        masterThread.love();
    }
}
⚠️ **GitHub.com Fallback** ⚠️