`

【Java核心-进阶】Semaphore、CountDownLatch、CyclicBarrier、Phaser

    博客分类:
  • Java
 
阅读更多

Java并发包中有很多可以实现同步的结构。以下几种就属于典型的“非显式”锁:

  • Semaphore:Java版本的信号量实现。
  • CountDownLatch:允许一个或多个线程等待某些操作完成。
  • CyclicBarrier:允许多个线程等待达到某个屏障。
  • Phaser:分阶段地等待多个线程达到某个屏障。

CountDownLatch vs CyclicBarrier

CountDownLatch 和 CyclicBarrier 都可用于等待多个线程完成相关操作(达到某个同步点)。
CountDownLatch 侧重于 让一个线程等待其它多个线程执行完成任务
CyclicBarrier 侧重于 多个线程线程相互等待至某个状态,然后它们再开始同时执行

最主要区别

  • CountDownLatch 不可以被重置,所以无法重用;
  • CyclicBarrier 可以被重置,所以可以重用。

    正常情况下会被自动重置。如果主动调用 reset() 时,有线程还在等待,等待线程会抛异常 BrokenBarrierException。

 

CountDownLatch vs Phaser

Phaser 和 CountDownLatch、CyclicBarrier 都有很相似的地方。
Phaser 顾名思义,就是可以分阶段的进行线程同步。

  • CountDownLatch 只能在创建实例时,通过构造方法指定同步数量;
  • Phaser 支持线程动态地向它注册。

    利用这个动态注册的特性,可以达到分阶段同步控制的目的:
    注册一批操作,等待它们执行结束;再注册一批操作,等它们结束...

 

Semaphore

Semaphore,信号量,就是计数器。它可以限制对资源的访问,防止过多使用者同时占用。

 

两个主要方法:acquire()release(int permits)

典型使用模式:

public static void main(String[] args) {
  Semaphore semaphore = new Semaphore(5);
  for (int i=0; i<10; i++) {
    Thread t = new Thread(new Worker(semaphore));
    t.start();
  }
}

static class Worker implements Runnable {
  private Semaphore semaphore;
  Worker(Semaphore semaphore) {
    this.semaphore = semaphore;
  }

  @Override
  public void run() {
    try {
      semaphore.acquire();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      semaphore.release();
    }
  }
}

CountDownLatch

两个主要方法:await()countDown()

典型使用模式:

public static void main(String[] args) {
  CountDownLatch latch = new CountDownLatch(5);
  for (int i=0; i<5; i++) {
    Thread t = new Thread(new Worker(latch));
    t.start();
  }

  latch.await();
}

static class Worker implements Runnable {
  private CountDownLatch latch;

  Worker(CountDownLatch latch) {
    this.latch = latch;
  }

  @Override
  public void run() {
    lathc.countDown();
  }
}

 

CyclicBarrier

主要方法:await()。正常情况下,CyclicBarrier 是被自动重置。

典型使用模式:

public static void main(String[] args) {
  CyclicBarrier barrier = new CyclicBarrier(5,
      () -> System.out.println("Action go again."));

  for (int i=0; i<5; i++) {
    String workerName = "Worker-" + i;
    Thread t = new Thread(new Worker(workerName, barrier));
    t.start();
  }
}

static class Worker implements Runnable {
  private String name;
  private CyclicBarrier barrier;

  Worker(String name, CyclicBarrier barrier) {
    this.name = name;
    this.barrier = barrier;
  }

  @Override
  public void run() {
    try {
      for (int i=0; i<3; i++) {
        System.out.println(name + " executed " + i);
        barrier.await();
      }
    } catch (BrokenBarrierException | InterruptedException e) {
      e.printStackTrace();
    }
  }
}

 

Phaser

典型使用模式:

public static void main(String[] args) {
  // 初始 party 数为1,是为了通过主线程控制 phaser 不同阶段的操作
  Phaser phaser = new Phaser(1);

  // 第一阶段,5个Worker
  for (int i = 0; i < 5; i++) {
    String workerName = "Phase-1-Worker-" + i;
    Thread t = new Thread(new Worker(workerName, phaser));
    t.start();
  }
  // 开始执行第一阶段:主线程也到达第一阶段
  phaser.arriveAndAwaitAdvance();

  // 第二阶段,3个Worker
  for (int i = 0; i < 3; i++) {
    String workerName = "Phase-2-Worker-" + i;
    Thread t = new Thread(new Worker(workerName, phaser));
    t.start();
  }
  // 开始执行第二阶段:主线程也到达第二阶段
  phaser.arriveAndAwaitAdvance();

  // 各阶段都结束:(主线程)从 phaser 撤销
  phaser.arriveAndDeregister();
}

static class Worker implements Runnable {
  private String name;
  private Phaser phaser;

  Worker(String name, Phaser phaser) {
    this.name = name;
    this.phaser = phaser;

    // 将当前线程注册到 phaser
    phaser.register();
  }

  @Override
  public void run() {
    // 到达 phaser,并等待其它线程也到达
    phaser.arriveAndAwaitAdvance();

    System.out.println(name + " done.");

    // 到达 phaser,并将 Worker 从 phaser 撤销
    phaser.arriveAndDeregister();
  }
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics