Java多线程简略完成撤销和进展tengxun - 娱乐之横扫全球

Java多线程简略完成撤销和进展tengxun

2019-02-09 07:57:03 | 作者: 昊然 | 标签: 线程,进展,主线 | 浏览: 5746

 

一个简略的多线程调度完结,一致开端,为了使得一切线程一致开端,相似运动员在听到发令枪时一同进行,运用了CountDownLatch进行操控。

CountDownLatch beginLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(personCount);

主线程树立线程池,并进行调度,因为要在终究进行汇总成果,运用了FutureTask

List FutureTask String futureTaskList = new ArrayList FutureTask String ();
for (int i = 0; i personCount; i++) {
 futureTaskList.add(new FutureTask String (new ExecuteCallable(beginLatch, endLatch,i)));
ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 for (FutureTask String futureTask : futureTaskList) {
 execService.execute(futureTask);
beginLatch.countDown();

 这样一切线程就会一致开端履行,履行完结后,汇总成果,并封闭线程池。

 

endLatch.await();
System.out.println("");
for (FutureTask String futureTask : futureTaskList) {
 System.out.println(futureTask.get());
execService.shutdown();

 关于每个线程的履行,都需求同享变量beginLatch和endLatch,各线程代码:

public class ExecuteCallable implements Callable String {
 private int id;
 private CountDownLatch beginLatch;
 private CountDownLatch endLatch;
 public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
 Exchanger Integer exchanger, int id,
 ConcurrentTaskExecutor concurrentTaskExecutor) {
 this.beginLatch = beginLatch;
 this.endLatch = endLatch;
 this.id = id;
 @Override
 public String call() throws Exception {
 beginLatch.await();
 long millis = (long) (Math.random() * 10 * 1000);
 String result = String.format("Player :%s arrived, use %s millis", id, millis);
 Thread.sleep(millis);
 System.out.println(result);
 endLatch.countDown();
 return result;
}

 每个线程在开端等候发令枪(beginLatch),随机等候一段时刻(模仿履行时刻),终究告诉endLatch减一(履行结束告诉),并回来成果。

到这儿仅仅一个简略的完结,咱们并不能在主线程中实时了解各线程的履行情况,除非到了一切线程履行结束(endLatch免除阻塞状态)。这时候咱们运用Exchanger机制来进行线程之间数据的交流,在每个线程履行完结后,将其完结的数据量传给主线程进行改写(模仿进展条作业)。

主线程ConcurrentTaskExecutor类中:

Exchanger Integer exchanger = new Exchanger Integer 
beginLatch.countDown();
Integer totalResult = Integer.valueOf(0);
for (int i = 0; i personCount; i++) {
 Integer partialResult = exchanger.exchange(Integer.valueOf(0));
 if(partialResult != 0){
 totalResult = totalResult + partialResult;
 System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
endLatch.await();

 线程类ExecuteCallable结构函数参加exchanger

@Override
public String call() throws Exception {
 beginLatch.await();
 long millis = (long) (Math.random() * 10 * 1000);
 String result = String.format("Player :%s arrived, use %s millis", id, millis);
 Thread.sleep(millis);
 System.out.println(result);
 exchanger.exchange(1);
 endLatch.countDown();
 return result;
}

 在履行完结进行数据交流,回来本次履行进展给主线程(当时默许设置成1,可修正),主线程在一切线程履行完结前,endLatch.await()必定是阻塞状态的,这样主线程就能实时拿到子线程履行完结的进展数据。

 

下面咱们再参加一个可以撤销的功用,参加体系随机在某个时刻点进行撤销操作,那么开端履行的线程是无法进行实时呼应了,只能等候当时操作履行结束;假如线程还没有开端履行,那么就撤销其行为。

更改的ExecuteCallable履行办法如下:

@Override
public String call() throws Exception {
 beginLatch.await();
 if(concurrentTaskExecutor.isCanceled()){
 endLatch.countDown();
 exchanger.exchange(0);
 return String.format("Player :%s is given up", id);
 long millis = (long) (Math.random() * 10 * 1000);
 String result = String.format("Player :%s arrived, use %s millis", id, millis);
 Thread.sleep(millis);
 System.out.println(result);
 exchanger.exchange(1);
 endLatch.countDown();
 return result;
}

 其间concurrentTaskExecutor类中参加一个类型为boolean的canceled变量,留意这个变量有必要是volatile的,以便可以在线程间同享数据,而且该变量的setter和getter办法也是原子性的。

咱们的撤销操作不能放在主线程中操作,需求额定树立一个线程,而且这个线程也不能通过线程池进行调度,新建的InterruptRunnable类:

public class InterruptRunnable implements Runnable {
 private CountDownLatch beginLatch;
 private ConcurrentTaskExecutor concurrentTaskExecutor;
 public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
 this.beginLatch = beginLatch;
 this.concurrentTaskExecutor = currConcurrentTaskExecutor;
 @Override
 public void run() {
 try {
 beginLatch.await();
 long millis = (long) (Math.random() * 10 * 1000);
 System.out.println(String.format("System need sleep %s millis", millis));
 Thread.sleep(millis);
 } catch (InterruptedException e) {
 e.printStackTrace();
 concurrentTaskExecutor.setCanceled(true);
}

 更改后的ConcurrentTaskExecutor,在履行发令前,先让该中止线程发动,以便一同等候开端指令:

new Thread(new InterruptRunnable(this, beginLatch)).start();
beginLatch.countDown();

 终究履行成果(取决于中止线程的随机时刻长短):

System need sleep 2920 millis
Player :4 arrived, use 917 millis
Progress: 1/10
Player :5 arrived, use 1076 millis
Progress: 2/10
Player :3 arrived, use 2718 millis
Progress: 3/10
Player :1 arrived, use 4013 millis
Progress: 4/10
Player :0 arrived, use 8541 millis
Progress: 5/10
Player :2 arrived, use 8570 millis
Progress: 6/10
Player :6 arrived, use 7261 millis
Progress: 7/10
Player :7 arrived, use 7015 millis
Progress: 8/10

Player :0 arrived, use 8541 millis
Player :1 arrived, use 4013 millis
Player :2 arrived, use 8570 millis
Player :3 arrived, use 2718 millis
Player :4 arrived, use 917 millis
Player :5 arrived, use 1076 millis
Player :6 arrived, use 7261 millis
Player :7 arrived, use 7015 millis
Player :8 is given up
Player :9 is given up

 

终究,附上终究的程序代码

ConcurrentTaskExecutor:

public class ConcurrentTaskExecutor {
 private volatile boolean canceled = false;
 public void executeTask() throws Exception {
 int personCount = 10;
 int threadCount = 5;
 CountDownLatch beginLatch = new CountDownLatch(1);
 CountDownLatch endLatch = new CountDownLatch(personCount);
 Exchanger Integer exchanger = new Exchanger Integer 
 List FutureTask String futureTaskList = new ArrayList FutureTask String ();
 for (int i = 0; i personCount; i++) {
 futureTaskList.add(new FutureTask String (new ExecuteCallable(beginLatch, endLatch, exchanger, i, this)));
 ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 for (FutureTask String futureTask : futureTaskList) {
 execService.execute(futureTask);
 new Thread(new InterruptRunnable(this, beginLatch)).start();
 beginLatch.countDown();
 Integer totalResult = Integer.valueOf(0);
 for (int i = 0; i personCount; i++) {
 Integer partialResult = exchanger.exchange(Integer.valueOf(0));
 if(partialResult != 0){
 totalResult = totalResult + partialResult;
 System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
 endLatch.await();
 System.out.println("");
 for (FutureTask String futureTask : futureTaskList) {
 System.out.println(futureTask.get());
 execService.shutdown();
 public boolean isCanceled() {
 return canceled;
 public void setCanceled(boolean canceled){
 this.canceled = canceled;
 public static void main(String[] args) throws Exception {
 ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
 executor.executeTask();
}

 ExecuteCallable

public class ExecuteCallable implements Callable String {
 private int id;
 private CountDownLatch beginLatch;
 private CountDownLatch endLatch;
 private Exchanger Integer exchanger;
 private ConcurrentTaskExecutor concurrentTaskExecutor;
 public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
 Exchanger Integer exchanger, int id,
 ConcurrentTaskExecutor concurrentTaskExecutor) {
 this.beginLatch = beginLatch;
 this.endLatch = endLatch;
 this.exchanger = exchanger;
 this.id = id;
 this.concurrentTaskExecutor = concurrentTaskExecutor;
 @Override
 public String call() throws Exception {
 beginLatch.await();
 if(concurrentTaskExecutor.isCanceled()){
 endLatch.countDown();
 exchanger.exchange(0);
 return String.format("Player :%s is given up", id);
 long millis = (long) (Math.random() * 10 * 1000);
 String result = String.format("Player :%s arrived, use %s millis", id, millis);
 Thread.sleep(millis);
 System.out.println(result);
 exchanger.exchange(1);
 endLatch.countDown();
 return result;
}

 InterruptRunnable

public class InterruptRunnable implements Runnable {
 private CountDownLatch beginLatch;
 private ConcurrentTaskExecutor concurrentTaskExecutor;
 public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
 this.beginLatch = beginLatch;
 this.concurrentTaskExecutor = currConcurrentTaskExecutor;
 @Override
 public void run() {
 try {
 beginLatch.await();
 long millis = (long) (Math.random() * 10 * 1000);
 System.out.println(String.format("System need sleep %s millis", millis));
 Thread.sleep(millis);
 } catch (InterruptedException e) {
 e.printStackTrace();
 concurrentTaskExecutor.setCanceled(true);
}

 

 

 

 

 

 

 

 

 

 

 

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表娱乐之横扫全球立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章