IT虾米网

Java多线程编程之同步器详解

admin 2018年06月15日 编程语言 372 0

同步器

为每种特定的同步问题提供了解决方案

Semaphore

Semaphore【信号标;旗语】,通过计数器控制对共享资源的访问。

测试类:

    package concurrent; 
 
    import concurrent.thread.SemaphoreThread; 
 
    import java.util.concurrent.Semaphore; 
 
    /** 
    * 拿客 
    * www.coderknock.com 
    * QQ群:213732117 
    * 创建时间:2016年08月08日 
    * 描述: 
    */ 
    public class SemaphoreTest { 
 
       public static void main(String[] args) { 
           //在Thread里声明并不是同一个对象 
           Semaphore semaphore = new Semaphore(3); 
           SemaphoreThread testA = new SemaphoreThread("A", semaphore); 
           SemaphoreThread testB = new SemaphoreThread("B", semaphore); 
           SemaphoreThread testC = new SemaphoreThread("C", semaphore); 
           SemaphoreThread testD = new SemaphoreThread("D", semaphore); 
           SemaphoreThread testE = new SemaphoreThread("E", semaphore); 
           SemaphoreThread testF = new SemaphoreThread("F", semaphore); 
           SemaphoreThread testG = new SemaphoreThread("G", semaphore); 
           testA.start(); 
           testB.start(); 
           testC.start(); 
           testD.start(); 
           testE.start(); 
           testF.start(); 
           testG.start(); 
       } 
   }

线程写法:

   package concurrent.thread; 
 
   import org.apache.logging.log4j.LogManager; 
   import org.apache.logging.log4j.Logger; 
 
   import java.util.concurrent.Semaphore; 
 
   /** 
    * 拿客 
    * www.coderknock.com 
    * QQ群:213732117 
    * 创建时间:2016年08月08日 
    * 描述: 
    */ 
   public class SemaphoreThread extends Thread { 
       private static final Logger logger = LogManager.getLogger(SemaphoreThread.class); 
       //创建有3个信号量的信号量计数器 
       public Semaphore semaphore; 
 
       public SemaphoreThread(String name, Semaphore semaphore) { 
           setName(name); 
           this.semaphore = semaphore; 
       } 
 
       @Override 
       public void run() { 
           try { 
               logger.debug(getName() + " 取号等待... " + System.currentTimeMillis()); 
               //取出一个信号 
               semaphore.acquire(); 
               logger.debug(getName() + " 提供服务... " + System.currentTimeMillis()); 
               sleep(1000); 
               logger.debug(getName() + " 完成服务... " + System.currentTimeMillis()); 
 
           } catch (InterruptedException e) { 
               e.printStackTrace(); 
           } 
           logger.debug(getName() + " 释放... " + System.currentTimeMillis()); 
           //释放一个信号 
           semaphore.release(); 
       } 
   }

执行结果【以下所有输出结果中[]中为线程名称- 后为输出的内容】:

    [C] - C 取号等待... 1470642024037 
    [F] - F 取号等待... 1470642024036 
    [E] - E 取号等待... 1470642024036 
    [B] - B 取号等待... 1470642024037 
    [D] - D 取号等待... 1470642024037 
    [A] - A 取号等待... 1470642023965 
    [D] - D 提供服务... 1470642024039 
    [C] - C 提供服务... 1470642024039 
    [G] - G 取号等待... 1470642024036 
    [F] - F 提供服务... 1470642024040 
    [D] - D 完成服务... 1470642025039 
    [C] - C 完成服务... 1470642025039 
    [D] - D 释放... 1470642025040 
    [F] - F 完成服务... 1470642025040 
    [C] - C 释放... 1470642025041 
    [B] - B 提供服务... 1470642025042 
    [A] - A 提供服务... 1470642025042 
    [F] - F 释放... 1470642025043 
    [E] - E 提供服务... 1470642025043 
    [A] - A 完成服务... 1470642026043 
    [B] - B 完成服务... 1470642026043 
    [B] - B 释放... 1470642026043 
    [A] - A 释放... 1470642026043 
    [G] - G 提供服务... 1470642026044 
    [E] - E 完成服务... 1470642026045 
    [E] - E 释放... 1470642026045 
    [G] - G 完成服务... 1470642027045 
    [G] - G 释放... 1470642027046

可以看到,当3个信号量被领取完之后,之后的线程会阻塞在领取信号的位置,当有信号量释放之后才会继续执行。

CountDownLatch

CountDownLatch【倒计时锁】,线程中调用countDownLatch.await()使进程进入阻塞状态,当达成指定次数后(通过countDownLatch.countDown())继续执行每个线程中剩余的内容。

测试类:

package concurrent.thread; 
 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.concurrent.CountDownLatch; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class package concurrent; 
 
import concurrent.thread.CountDownLatchThread; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.CyclicBarrier; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class CountDownLatchTest { 
 
    private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class); 
 
    public static void main(String[] args) throws InterruptedException { 
        //设定当达成三个计数时触发 
        CountDownLatch countDownLatch = new CountDownLatch(3); 
        new CountDownLatchThread("A", countDownLatch).start(); 
        new CountDownLatchThread("B", countDownLatch).start(); 
        new CountDownLatchThread("C", countDownLatch).start(); 
        new CountDownLatchThread("D", countDownLatch).start(); 
        new CountDownLatchThread("E", countDownLatch).start(); 
        for (int i = 3; i > 0; i--) { 
            Thread.sleep(1000); 
            logger.debug(i); 
            countDownLatch.countDown(); 
        } 
    } 
}

线程类:

package concurrent.thread; 
 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.concurrent.CountDownLatch; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class CountDownLatchThread extends Thread { 
    private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class); 
    //计数器 
    private CountDownLatch countDownLatch; 
 
    public CountDownLatchThread(String name, CountDownLatch countDownLatch) { 
        setName(name); 
        this.countDownLatch = countDownLatch; 
    } 
 
    @Override 
    public void run() { 
        logger.debug("执行操作..."); 
        try { 
            sleep(1000); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        logger.debug("等待计数器达到标准..."); 
        try { 
            //让线程进入阻塞状态,等待计数达成后释放 
            countDownLatch.await(); 
            logger.debug("计数达成,继续执行..."); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    } 
}

执行结果:

 [E] - 执行操作... 
 [B] - 执行操作... 
 [A] - 执行操作... 
 [C] - 执行操作... 
 [D] - 执行操作... 
 [main] DEBUG concurrent.CountDownLatchTest - 3 
 [B] - 等待计数器达到标准... 
 [E] - 等待计数器达到标准... 
 [C] - 等待计数器达到标准... 
 [D] - 等待计数器达到标准... 
 [A] - 等待计数器达到标准... 
 [main] DEBUG concurrent.CountDownLatchTest - 2 
 [main] DEBUG concurrent.CountDownLatchTest - 1 
 [E] - 计数达成,继续执行... 
 [C] - 计数达成,继续执行... 
 [B] - 计数达成,继续执行... 
 [D] - 计数达成,继续执行... 
 [A] - 计数达成,继续执行...
CyclicBarrier

CyclicBarrier【Cyclic周期,循环的 Barrier屏障,障碍】循环的等待阻塞的线程个数到达指定数量后使参与计数的线程继续执行并可执行特定线程(使用不同构造函数可以不设定到达后执行),其他线程仍处于阻塞等待再一次达成指定个数。

测试类:

package concurrent; 
 
import concurrent.thread.CyclicBarrierThread; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.concurrent.CyclicBarrier; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class CyclicBarrierTest { 
 
    private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class); 
 
    public static void main(String[] args) { 
          //可以使用CyclicBarrier(int parties)不设定到达后执行的内容 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { 
            logger.debug("---计数到达后执行的内容----"); 
        }); 
        new CyclicBarrierThread("A", cyclicBarrier).start(); 
        new CyclicBarrierThread("B", cyclicBarrier).start(); 
        new CyclicBarrierThread("C", cyclicBarrier).start(); 
        new CyclicBarrierThread("D", cyclicBarrier).start(); 
        new CyclicBarrierThread("E", cyclicBarrier).start(); 
        new CyclicBarrierThread("A2", cyclicBarrier).start(); 
        new CyclicBarrierThread("B2", cyclicBarrier).start(); 
        new CyclicBarrierThread("C2", cyclicBarrier).start(); 
        new CyclicBarrierThread("D2", cyclicBarrier).start(); 
        new CyclicBarrierThread("E2", cyclicBarrier).start(); 
        //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程, 
        // 那么当达到5个数量时,只会执行达到时的五个线程的内容, 
        // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束 
        // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束 
    } 
}

线程类:

package concurrent.thread; 
 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.Random; 
import java.util.concurrent.BrokenBarrierException; 
import java.util.concurrent.CyclicBarrier; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class CyclicBarrierThread extends Thread { 
 
    private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class); 
 
    private CyclicBarrier cyclicBarrier; 
 
    public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) { 
        super(name); 
        this.cyclicBarrier = cyclicBarrier; 
    } 
 
    @Override 
    public void run() { 
        logger.debug("执行操作..."); 
        try { 
            int time = new Random().nextInt(10) * 1000; 
            logger.debug("休眠" + time/1000 + "秒"); 
            sleep(time); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        logger.debug("等待计数器达到标准..."); 
        try { 
            //让线程进入阻塞状态,等待计数达成后释放 
            cyclicBarrier.await(); 
            logger.debug("计数达成,继续执行..."); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } catch (BrokenBarrierException e) { 
            e.printStackTrace(); 
        } 
    } 
}

执行结果:

 [A] - 执行操作... 
 [A] - 休眠0秒 
 [E2] - 执行操作... 
 [E2] - 休眠5秒 
 [D2] - 执行操作... 
 [D2] - 休眠4秒 
 [C2] - 执行操作... 
 [C2] - 休眠4秒 
 [B2] - 执行操作... 
 [B2] - 休眠6秒 
 [A2] - 执行操作... 
 [A2] - 休眠8秒 
 [E] - 执行操作... 
 [E] - 休眠5秒 
 [D] - 执行操作... 
 [D] - 休眠0秒 
 [C] - 执行操作... 
 [C] - 休眠3秒 
 [B] - 执行操作... 
 [B] - 休眠7秒 
 [A] - 等待计数器达到标准... 
 [D] - 等待计数器达到标准... 
 [C] - 等待计数器达到标准... 
 [D2] - 等待计数器达到标准... 
 [C2] - 等待计数器达到标准... 
 [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- 
 [C2] - 计数达成,继续执行... 
 [A] - 计数达成,继续执行... 
 [C] - 计数达成,继续执行... 
 [D2] - 计数达成,继续执行... 
 [D] - 计数达成,继续执行... 
 [E2] - 等待计数器达到标准... 
 [E] - 等待计数器达到标准... 
 [B2] - 等待计数器达到标准... 
 [B] - 等待计数器达到标准... 
 [A2] - 等待计数器达到标准... 
 [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- 
 [E] - 计数达成,继续执行... 
 [B2] - 计数达成,继续执行... 
 [E2] - 计数达成,继续执行... 
 [B] - 计数达成,继续执行... 
 [A2] - 计数达成,继续执行...

可以想象成以前不正规的长途汽车站的模式:

不正规的长途汽车站会等待座位坐满之后才发车,到达目的地之后继续等待然后循环进行。每个人都是一个Thread,上车后触发cyclicBarrier.await();,当坐满时就是达到指定达成数的时候,车辆发车就是达成后统一执行的内容,发车后车上的人们就可以聊天之类的操作了【我们暂且理解为上车后人们就都不能动了O(∩_∩)O~】。

CountDownLatch与CyclicBarrier区别:

CountDownLatch是一个或多个线程等待计数达成后继续执行,await()调用并没有参与计数。

CyclicBarrier则是N个线程等待彼此执行到零界点之后再继续执行,await()调用的同时参与了计数,并且CyclicBarrier支持条件达成后执行某个动作,而且这个过程是循环性的。

Exchanger

Exchanger<T> 用于线程间进行数据交换

测试类:

package concurrent; 
 
import concurrent.pojo.ExchangerPojo; 
import concurrent.thread.ExchangerThread; 
 
import java.util.HashMap; 
import java.util.concurrent.Exchanger; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class ExchangerTest { 
 
    public static void main(String[] args) { 
        Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>(); 
        new ExchangerThread("A", exchanger).start(); 
        new ExchangerThread("B", exchanger).start(); 
    } 
}

实体类:

package concurrent.pojo; 
 
import com.alibaba.fastjson.JSON; 
 
import java.util.Date; 
import java.util.List; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class ExchangerPojo { 
    private int intVal; 
    private String strVal; 
    private List<String> strList; 
    private Date date; 
 
    public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) { 
        this.intVal = intVal; 
        this.strVal = strVal; 
        this.strList = strList; 
        this.date = date; 
    } 
 
    public int getIntVal() { 
        return intVal; 
    } 
 
    public void setIntVal(int intVal) { 
        this.intVal = intVal; 
    } 
 
    public String getStrVal() { 
        return strVal; 
    } 
 
    public void setStrVal(String strVal) { 
        this.strVal = strVal; 
    } 
 
    public List<String> getStrList() { 
        return strList; 
    } 
 
    public void setStrList(List<String> strList) { 
        this.strList = strList; 
    } 
 
    public Date getDate() { 
        return date; 
    } 
 
    public void setDate(Date date) { 
        this.date = date; 
    } 
 
    @Override 
    public String toString() { 
        return JSON.toJSONString(this); 
    } 
}

线程类:

package concurrent.thread; 
 
import concurrent.pojo.ExchangerPojo; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.*; 
import java.util.concurrent.Exchanger; 
 
/** 
 * 拿客 
 * www.coderknock.com 
 * QQ群:213732117 
 * 创建时间:2016年08月08日 
 * 描述: 
 */ 
public class ExchangerThread extends Thread { 
    private Exchanger<HashMap<String, ExchangerPojo>> exchanger; 
 
    private static final Logger logger = LogManager.getLogger(ExchangerThread.class); 
 
    public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) { 
        super(name); 
        this.exchanger = exchanger; 
    } 
 
    @Override 
    public void run() { 
        HashMap<String, ExchangerPojo> map = new HashMap<>(); 
        logger.debug(getName() + "提供者提供数据..."); 
        Random random = new Random(); 
        for (int i = 0; i < 3; i++) { 
            int index = random.nextInt(10); 
            List<String> list = new ArrayList<>(); 
            for (int j = 0; j < index; j++) { 
                list.add("list ---> " + j); 
            } 
            ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date()); 
            map.put("第" + i + "个数据", pojo); 
        } 
        try { 
            int time = random.nextInt(10); 
            logger.debug(getName() + "等待" + time + "秒...."); 
            for (int i = time; i > 0; i--) { 
                sleep(1000); 
                logger.debug(getName() + "---->" + i); 
            } 
              //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了 
            HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map); 
            time = random.nextInt(10); 
            logger.debug(getName() + "接受到数据等待" + time + "秒...."); 
            for (int i = time; i > 0; i--) { 
                sleep(1000); 
                logger.debug(getName() + "---->" + i); 
            } 
            getMap.forEach((x, y) -> { 
                logger.debug(x + " -----> " + y.toString()); 
            }); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    } 
}

执行结果:

 [B] - B提供者提供数据... 
 [A] - A提供者提供数据... 
 [A] - A等待2秒.... 
 [B] - B等待0秒.... 
 [A] - A---->2 
 [A] - A---->1 
 [B] - B接受到数据等待1秒.... 
 [A] - A接受到数据等待4秒.... 
 [B] - B---->1 
 [A] - A---->4 
 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"} 
 [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"} 
 [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"} 
 [A] - A---->3 
 [A] - A---->2 
 [A] - A---->1 
 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"} 
 [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"} 
 [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}

Phaser

Phaser个人感觉兼具了CountDownLatch与CyclicBarrier的功能,并提供了分阶段的能力。

实现分阶段的CyclicBarrier的功能

测试代码:

package concurrent; 
 
import concurrent.thread.PhaserThread; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.concurrent.Phaser; 
 
/** 
 * 拿客 
 * 网站:www.coderknock.com 
 * QQ群:213732117 
 * 三产 创建于 2016年08月08日 21:25:30。 
 */ 
public class PhaserTest { 
 
    private static final Logger logger = LogManager.getLogger(PhaserTest.class); 
 
    public static void main(String[] args) { 
        Phaser phaser = new Phaser() { 
            /**此方法有2个作用: 
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 
             * */ 
            @Override 
            protected boolean onAdvance(int phase, int registeredParties) { 
                logger.debug("阶段--->" + phase); 
                logger.debug("注册的线程数量--->" + registeredParties); 
                return super.onAdvance(phase, registeredParties); 
            } 
        }; 
 
        for (int i = 3; i > 0; i--) { 
            new PhaserThread("第" + i + "个", phaser).start(); 
        } 
    } 
}

线程代码:

package concurrent.thread; 
 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.Random; 
import java.util.concurrent.Phaser; 
 
/** 
 * 拿客 
 * 网站:www.coderknock.com 
 * QQ群:213732117 
 * 三产 创建于 2016年08月08日 21:16:55。 
 */ 
public class PhaserThread extends Thread { 
 
    private Phaser phaser; 
 
    private static final Logger logger = LogManager.getLogger(PhaserThread.class); 
 
    public PhaserThread(String name, Phaser phaser) { 
        super(name); 
        this.phaser = phaser; 
        //把当前线程注册到Phaser 
        this.phaser.register(); 
        logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程"); 
    } 
 
    @Override 
    public void run() { 
        logger.debug("进入..."); 
        phaser.arrive(); 
        for (int i = 6; i > 0; i--) { 
            int time = new Random().nextInt(5); 
            try { 
                logger.debug("睡眠" + time + "秒"); 
                sleep(time * 1000); 
                if (i == 1) { 
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); 
                    logger.debug("最后一次触发,并注销自身"); 
                    phaser.arriveAndDeregister(); 
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); 
                } else { 
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); 
                    logger.debug(i + "--->触发并阻塞..."); 
                    phaser.arriveAndAwaitAdvance();//相当于CyclicBarrier.await(); 
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); 
                } 
 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
        logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties()); 
    } 
}

执行结果:

 [main] - name为第3个的线程注册了1个线程 
 [main] - name为第2个的线程注册了2个线程 
 [main] - name为第1个的线程注册了3个线程 
 [第3个] - 进入... 
 [第2个] - 进入... 
 [第3个] - 睡眠2秒 
 [第2个] - 睡眠1秒 
 [第1个] - 进入... 
 [第1个] - 阶段--->0 
 [第1个] - 注册的线程数量--->3 
 [第1个] - 睡眠4秒 
 [第2个] - 未完成的线程数量:3 
 [第2个] - 6--->触发并阻塞... 
 [第3个] - 未完成的线程数量:2 
 [第3个] - 6--->触发并阻塞... 
 [第1个] - 未完成的线程数量:1 
 [第1个] - 6--->触发并阻塞... 
 [第1个] - 阶段--->1 
 [第1个] - 注册的线程数量--->3 
 [第1个] - 未完成的线程数量:3 
 [第3个] - 未完成的线程数量:3 
 [第2个] - 未完成的线程数量:3 
 [第1个] - 睡眠1秒 
 [第3个] - 睡眠0秒 
 [第2个] - 睡眠4秒 
 [第3个] - 未完成的线程数量:3 
 [第3个] - 5--->触发并阻塞... 
 [第1个] - 未完成的线程数量:2 
 [第1个] - 5--->触发并阻塞... 
 [第2个] - 未完成的线程数量:1 
 [第2个] - 5--->触发并阻塞... 
 [第2个] - 阶段--->2 
 [第2个] - 注册的线程数量--->3 
 [第2个] - 未完成的线程数量:3 
 [第3个] - 未完成的线程数量:3 
 [第1个] - 未完成的线程数量:3 
 [第2个] - 睡眠0秒 
 [第3个] - 睡眠2秒 
 [第2个] - 未完成的线程数量:3 
 [第1个] - 睡眠2秒 
 [第2个] - 4--->触发并阻塞... 
 [第3个] - 未完成的线程数量:2 
 [第1个] - 未完成的线程数量:2 
 [第3个] - 4--->触发并阻塞... 
 [第1个] - 4--->触发并阻塞... 
 [第1个] - 阶段--->3 
 [第1个] - 注册的线程数量--->3 
 [第1个] - 未完成的线程数量:3 
 [第3个] - 未完成的线程数量:3 
 [第2个] - 未完成的线程数量:3 
 [第1个] - 睡眠2秒 
 [第3个] - 睡眠1秒 
 [第2个] - 睡眠4秒 
 [第3个] - 未完成的线程数量:3 
 [第3个] - 3--->触发并阻塞... 
 [第1个] - 未完成的线程数量:2 
 [第1个] - 3--->触发并阻塞... 
 [第2个] - 未完成的线程数量:1 
 [第2个] - 3--->触发并阻塞... 
 [第2个] - 阶段--->4 
 [第2个] - 注册的线程数量--->3 
 [第2个] - 未完成的线程数量:3 
 [第3个] - 未完成的线程数量:3 
 [第1个] - 未完成的线程数量:3 
 [第2个] - 睡眠2秒 
 [第1个] - 睡眠2秒 
 [第3个] - 睡眠4秒 
 [第2个] - 未完成的线程数量:3 
 [第1个] - 未完成的线程数量:3 
 [第2个] - 2--->触发并阻塞... 
 [第1个] - 2--->触发并阻塞... 
 [第3个] - 未完成的线程数量:1 
 [第3个] - 2--->触发并阻塞... 
 [第3个] - 阶段--->5 
 [第3个] - 注册的线程数量--->3 
 [第3个] - 未完成的线程数量:3 
 [第1个] - 未完成的线程数量:3 
 [第2个] - 未完成的线程数量:3 
 [第3个] - 睡眠2秒 
 [第1个] - 睡眠3秒 
 [第2个] - 睡眠0秒 
 [第2个] - 未完成的线程数量:3 
 [第2个] - 最后一次触发,并注销自身 
 [第2个] - 未完成的线程数量:2 
 [第2个] - 注销完成之后注册的线程数量--->2 
 [第3个] - 未完成的线程数量:2 
 [第3个] - 最后一次触发,并注销自身 
 [第3个] - 未完成的线程数量:1 
 [第3个] - 注销完成之后注册的线程数量--->1 
 [第1个] - 未完成的线程数量:1 
 [第1个] - 最后一次触发,并注销自身 
 [第1个] - 阶段--->6 
 [第1个] - 注册的线程数量--->0 
 [第1个] - 未完成的线程数量:0 
 [第1个] - 注销完成之后注册的线程数量--->0

上面代码中,当所有线程进行到arriveAndAwaitAdvance()时会触发计数并且将线程阻塞,等计数数量等于注册线程数量【即所有线程都执行到了约定的地方时,会放行,是所有线程得以继续执行,并触发onAction事件】。我们可以在onAction中根据不同阶段执行不同内容的操作。

实现分阶段的CountDownLatch的功能

只需将上面的测试类更改如下:

package concurrent; 
 
import concurrent.thread.PhaserThread; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
 
import java.util.concurrent.Phaser; 
 
import static jodd.util.ThreadUtil.sleep; 
 
/** 
 * 拿客 
 * 网站:www.coderknock.com 
 * QQ群:213732117 
 * 三产 创建于 2016年08月08日 21:25:30。 
 */ 
public class PhaserTest { 
 
    private static final Logger logger = LogManager.getLogger(PhaserTest.class); 
 
    public static void main(String[] args) { 
        //这里其实相当于已经注册了3个线程,但是并没有实际的线程 
        int coutNum=3; 
        Phaser phaser = new Phaser(coutNum) { 
            /**此方法有2个作用: 
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 
             * */ 
            @Override 
            protected boolean onAdvance(int phase, int registeredParties) { 
                logger.debug("阶段--->" + phase); 
                logger.debug("注册的线程数量--->" + registeredParties); 
                return registeredParties==coutNum;//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser 
            } 
        }; 
 
        for (int i = 3; i > 0; i--) { 
            new PhaserThread("第" + i + "个", phaser).start(); 
        } 
 
        //当phaser未终止时循环注册这块儿可以使用实际的业务处理 
        while (!phaser.isTerminated()) { 
            sleep(1000); 
            logger.debug("触发一次"); 
            phaser.arrive(); //相当于countDownLatch.countDown(); 
        } 
    } 
}
发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

Java 中 ThreadLocal 内存泄露的实例分析详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。