三、Java8的CompletableFuture,Java的多线程开发
1、CompletableFuture的常用方法
- 以后用到再加
| runAsync() :开启异步(创建线程执行任务),无返回值 | |
| supplyAsync() :开启异步(创建线程执行任务),有返回值 | |
| thenApply() :然后应用,适用于有返回值的结果,拿着返回值再去处理。 | |
| exceptionally():用于处理异步任务执行过程中出现异常的情况的一个方法:返回默认值或者一个替代的 CompletableFuture 对象,从而避免系统的崩溃或异常处理的问题。 | |
| handle():类似exceptionally() | |
| get() :阻塞线程:主要可以: ①获取线程中的异常然后处理异常、②设置等待时间 | |
| join() :阻塞线程:推荐使用 join() 方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。他自己会抛出异常。 | |
| CompletableFuture.allOf() | |
| CompletableFuture.anyOf() |
- get() 和 join() 方法区别?
- 都可以阻塞线程 —— 等所有任务都执行完了再执行后续代码。
| CompletableFuture 中的 get() 和 join() 方法都用于获取异步任务的执行结果,但是在使用时需要注意以下几点区别: | |
| 1. 抛出异常的方式不同:如果异步任务执行过程中出现异常, get() 方法会抛出 ExecutionException 异常,而 join() 方法会抛出 CompletionException 异常,这两个异常都是继承自 RuntimeException 的。 | |
| 2. 方法调用限制不同: join() 方法是不可以被中断的,一旦调用就必须等待任务执行完成才能返回结果;而 get() 方法可以在调用时设置等待的超时时间,如果超时还没有获取到结果,就会抛出 TimeoutException 异常。 | |
| 3. 返回结果类型不同: get() 方法返回的是异步任务的执行结果,该结果是泛型类型 T 的,需要强制转换才能获取真正的结果;而 join() 方法返回的是异步任务的执行结果,该结果是泛型类型 T,不需要强制转换。 | |
| 4. 推荐使用方式不同:推荐在 CompletableFuture 中使用 join() 方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。 | |
| 综上所述, get() 方法和 join() 方法都是获取异步任务的执行结果,但是在使用时需要根据具体场景选择使用哪个方法。如果需要获取执行结果并且不希望被中断,推荐使用 join() 方法;如果需要控制等待时间或者需要捕获异常,则可以使用 get() 方法。 |
- anyOf() 和 allOf() 的区别?
| CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它支持链式调用、组合和转换异步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的两个常用方法,它们的区别如下: | |
| 1. anyOf:任意一个 CompletableFuture 完成,它就会跟随这个 CompletableFuture 的结果完成,返回第一个完成的 CompletableFuture 的结果。 | |
| 2. allOf:所有的 CompletableFuture 都完成时,它才会跟随它们的结果完成,返回一个空的 CompletableFuture。 | |
| 简而言之,anyOf 和 allOf 的最大区别是:anyOf 任意一个 CompletableFuture 完成就跟着它的结果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,并返回一个空的 CompletableFuture。 | |
| 举例来说,如果有三个 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能会返回一个字符串,而 f3 可能会返回一个整数,那么: | |
| – anyOf(f1, f2, f3) 的结果是 f1、f2、f3 中任意一个 CompletableFuture 的结果; | |
| – allOf(f1, f2, f3) 的结果是一个空的 CompletableFuture,它的完成状态表示 f1、f2、f3 是否全部完成。 | |
| 总之,anyOf 和 allOf 在实际使用中可以根据不同的需求来选择,它们都是 CompletableFuture 中非常强大的组合操作。 |
回到顶部
2、使用CompletableFuture
2.1、实体类准备
| package com.cc.md.entity; | |
| import lombok.Data; | |
| /** | |
| * @author CC | |
| * @since 2023/5/24 0024 | |
| */ | |
| @Data | |
| public class UserCs { | |
| private String name; | |
| private Integer age; | |
| } |
2.2、常用方式
- 无返回值推荐:开启多线程——无返回值的——阻塞:test06
| @Resource(name = “myIoThreadPool”) | |
| private ThreadPoolTaskExecutor myIoThreadPool; | |
| //CompletableFuture开启多线程——无返回值的 | |
| @Test | |
| public void test06() throws Exception { | |
| List<CompletableFuture> futures = new ArrayList(); | |
| //循环,模仿很多任务 | |
| for (int i = 0; i < 1000; i++) { | |
| int finalI = i; | |
| CompletableFuture future = CompletableFuture.runAsync(() -> { | |
| //第一批创建的线程数 | |
| log.info(“打印:{}”, finalI); | |
| //模仿io流耗时 | |
| try { | |
| Thread.sleep(5000); | |
| } catch (InterruptedException e) { | |
| throw new RuntimeException(e); | |
| } | |
| }, myIoThreadPool); | |
| futures.add(future); | |
| } | |
| //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码 | |
| //如果不阻塞,上面的相当于异步执行了。 | |
| //阻塞方式1:可以获取返回的异常、设置等待时间 | |
| // futures.forEach(future -> { | |
| // try { | |
| // future.get(); | |
| // } catch (Exception e) { | |
| // throw new RuntimeException(e); | |
| // } | |
| // }); | |
| //阻塞方式2(推荐) | |
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); | |
| log.info(“打印:都执行完了。。。”); | |
| } |
- 有返回值推荐:开启多线程——有返回值的,返回一个新的List——阻塞——使用stream流的map:test09
- test07、test08 可以转化为 test09 (现在这个)
- 可以返回任务类型的值,不一定要返回下面的user对象。
| @Resource(name = “myIoThreadPool”) | |
| private ThreadPoolTaskExecutor myIoThreadPool; | |
| //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map | |
| //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值 | |
| //使用stream流的map + CompletableFuture.supplyAsync() | |
| @Test | |
| public void test09() throws Exception { | |
| //先获取数据,需要处理的任务。 | |
| List users = this.getUserCs(); | |
| //莫法处理任务 | |
| List<CompletableFuture> futures = users.stream() | |
| .map(user -> CompletableFuture.supplyAsync(() -> { | |
| // 处理数据 | |
| user.setName(user.getName() + “-改”); | |
| log.info(“打印-改:{}”, user.getName()); | |
| // 其他的业务逻辑。。。 | |
| return user; | |
| }, myIoThreadPool)).collect(Collectors.toList()); | |
| //获取futures | |
| List endList = futures.stream() | |
| //阻塞所有线程 | |
| .map(CompletableFuture::join) | |
| //取age大于10的用户 | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } |
2.3、异常处理
- exceptionally
- handle
| //CompletableFuture 异常处理 | |
| @Test | |
| public void test10() throws Exception { | |
| //先获取数据,需要处理的任务。 | |
| List users = this.getUserCs(); | |
| //莫法处理任务 | |
| List<CompletableFuture> futures = users.stream() | |
| .map(user -> CompletableFuture.supplyAsync(() -> { | |
| if (user.getAge() > 5){ | |
| int a = 1/0; | |
| } | |
| // 处理数据 | |
| user.setName(user.getName() + “-改”); | |
| log.info(“打印-改:{}”, user.getName()); | |
| // 其他的业务逻辑。。。 | |
| return user; | |
| }, myIoThreadPool) | |
| //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。 | |
| .exceptionally(throwable -> { | |
| //可以直接获取user | |
| System.out.println(“异常了:” + user); | |
| //处理异常的方法…… | |
| //1还可以进行业务处理……比如将异常数据存起来,然后导出…… | |
| //2返回默认值,如:user、null | |
| //return user; | |
| //3抛出异常 | |
| throw new RuntimeException(throwable.getMessage()); | |
| }) | |
| //处理异常方式2:类似exceptionally(不推荐) | |
| // .handle((userCs, throwable) -> { | |
| // System.out.println(“handle:” + user); | |
| // if (throwable != null) { | |
| // // 处理异常 | |
| // log.error(“处理用户信息出现异常,用户名为:” + user.getName(), throwable); | |
| // // 返回原始数据 | |
| // return userCs; | |
| // } else { | |
| // // 返回正常数据 | |
| // return userCs; | |
| // } | |
| // }) | |
| ) | |
| .collect(Collectors.toList()); | |
| //获取futures | |
| List endList = futures.stream() | |
| //阻塞所有线程 | |
| .map(CompletableFuture::join) | |
| //取age大于10的用户 | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } |
2.4、CompletableFuture的使用测试
1、推荐使用:test03、test05、test09、test10、test11
2、test07、test08就是test09的前身。
-
test01:获取当前电脑(服务器)的cpu核数
-
test02:线程池原始的使用(不推荐直接这样用)
-
test03:开启异步1 —— @Async
-
test04:开启异步2 —— CompletableFuture.runAsync()
-
test05:开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有异步方法,一起提交
-
相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。
-
-
test052:开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join
-
test06:CompletableFuture开启多线程——无返回值的
-
test07:CompletableFuture开启多线程——无返回值的——构建一个新List
-
1、相当于多线程执行任务,然后把结果插入到List中 2、接收多线程的List必须是线程安全的,ArrayList线程不安全 线程安全的List —— CopyOnWriteArrayList 替代 ArrayList
-
-
test08:CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况(基本和test07是一个方法)
-
test09:CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map
-
test10:CompletableFuture 异常处理。相当于是 test09的增强,处理异常
-
test11:CompletableFuture 异常处理:如果出现异常就舍弃任务。
-
1、想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢? 2、发现了异常任务也就完了。而且打印了异常,相当于返回了异常。 3、未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture
-
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
| package com.cc.md; | |
| import com.cc.md.entity.UserCs; | |
| import com.cc.md.service.IAsyncService; | |
| import org.junit.jupiter.api.Test; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.boot.test.context.SpringBootTest; | |
| import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | |
| import javax.annotation.Resource; | |
| import java.util.*; | |
| import java.util.concurrent.CompletableFuture; | |
| import java.util.concurrent.CopyOnWriteArrayList; | |
| import java.util.concurrent.ForkJoinPool; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.stream.Collectors; | |
| @SpringBootTest | |
| class Test01 { | |
| private static final Logger log = LoggerFactory.getLogger(Test01.class); | |
| @Resource(name = “myIoThreadPool”) | |
| private ThreadPoolTaskExecutor myIoThreadPool; | |
| /** | |
| * 异步类 | |
| */ | |
| @Resource | |
| private IAsyncService asyncService; | |
| @Test | |
| void test01() { | |
| //获取当前jdk能调用的CPU个数(当前服务器的处理器个数) | |
| int i = Runtime.getRuntime().availableProcessors(); | |
| System.out.println(i); | |
| } | |
| //线程池原始的使用 | |
| @Test | |
| void test02() { | |
| try { | |
| for (int i = 0; i < 1000; i++) { | |
| int finalI = i; | |
| myIoThreadPool.submit(() -> { | |
| //第一批创建的线程数 | |
| log.info(“打印:{}”, finalI); | |
| //模仿io流耗时 | |
| try { | |
| Thread.sleep(5000); | |
| } catch (InterruptedException e) { | |
| throw new RuntimeException(e); | |
| } | |
| }); | |
| } | |
| }catch(Exception e){ | |
| throw new RuntimeException(e); | |
| }finally { | |
| myIoThreadPool.shutdown(); | |
| } | |
| } | |
| //开启异步1 —— @Async | |
| @Test | |
| public void test03() throws Exception { | |
| log.info(“打印:{}”, “异步测试的-主方法1”); | |
| asyncService.async1(); | |
| asyncService.async2(); | |
| //不会等待异步方法执行,直接返回前端数据 | |
| log.info(“打印:{}”, “异步测试的-主方法2”); | |
| } | |
| //开启异步2 —— CompletableFuture.runAsync() | |
| @Test | |
| public void test04() throws Exception { | |
| log.info(“打印:{}”, “异步测试的-主方法1”); | |
| CompletableFuture.runAsync(() -> { | |
| log.info(“打印:{}”, “异步方法1!”); | |
| //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| this.async2(“异步方法1!-end”); | |
| }, myIoThreadPool); | |
| //不会等待异步方法执行,直接返回前端数据 | |
| log.info(“打印:{}”, “异步测试的-主方法2”); | |
| } | |
| //异步需要执行的方法,可以写在同一个类中。 | |
| private void async2(String msg) { | |
| //模仿io流耗时 | |
| try { | |
| Thread.sleep(5000); | |
| } catch (InterruptedException e) { | |
| throw new RuntimeException(e); | |
| } | |
| log.info(“打印:{}”, msg); | |
| } | |
| //开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有异步方法,一起提交 | |
| //相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。 | |
| @Test | |
| public void test05() throws Exception { | |
| log.info(“打印:{}”, “异步测试的-主方法1”); | |
| //异步执行1 | |
| CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { | |
| log.info(“打印:{}”, “异步方法1!”); | |
| //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| this.async2(“异步方法1-end”); | |
| return “异步方法1”; | |
| }, myIoThreadPool); | |
| //异步执行2 | |
| CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { | |
| log.info(“打印:{}”, “异步方法2!”); | |
| //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| this.async2(“异步方法2-end”); | |
| return “异步方法2”; | |
| }, myIoThreadPool); | |
| //异步执行3,不用我们自己的线程池 —— 用的就是系统自带的 ForkJoinPool 线程池 | |
| CompletableFuture future3 = CompletableFuture.runAsync(() -> { | |
| log.info(“打印:{}”, “异步方法3!”); | |
| //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| this.async2(“异步方法3-end”); | |
| }); | |
| //阻塞所有异步方法,一起提交后才走下面的代码 | |
| CompletableFuture.allOf(future1, future2, future3).join(); | |
| log.info(“打印:{}”, “异步-阻塞-测试的-主方法2-end”); | |
| } | |
| //开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join | |
| // CompletableFuture 的 get 和 join 方法区别: | |
| // get:①可以获取线程中的异常、②设置等待时间 | |
| // join:推荐在 CompletableFuture 中使用 join() 方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。 | |
| @Test | |
| public void test052() throws Exception { | |
| log.info(“打印:{}”, “异步测试的-主方法1”); | |
| //异步执行1 | |
| CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { | |
| log.info(“打印:{}”, “异步方法1!”); | |
| // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| String str = “异步方法1-end”; | |
| this.async2(str); | |
| return str; | |
| }, myIoThreadPool); | |
| // 异步执行2 – 无返回值 —— 分开写的方式 | |
| CompletableFuture future2 = future1.thenAccept(str1 -> { | |
| log.info(“打印:{}”, “异步方法2!”); | |
| // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| this.async2(String.format(“%s-加-异步方法2! – 无返回值 – “,str1)); | |
| }); | |
| // 异步执行3 – 有返回值 —— 分开写future1,连写future3方式 | |
| CompletableFuture future3 = future1.thenApply(str2 -> { | |
| log.info(“打印:{}”, “异步方法3!”); | |
| // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。 | |
| this.async2(String.format(“%s-加-异步方法3! – 有返回值 – “, str2)); | |
| return “异步执行3 – 有返回值 “; | |
| //连写的方式。 | |
| }).thenApply(str3 -> { | |
| String format = String.format(“%s- end”, str3); | |
| log.error(“异步3然后应用 – {}”, format); | |
| //返回后面的应用 | |
| return format; | |
| }); | |
| // 获取future3的返回值: | |
| //如果需要捕获异常、设置等待超时时间,则用get | |
| log.info(“future3的返回值(不阻塞):{}”, future3.get()); | |
| // log.info(“future3的返回值(不阻塞-设置等待时间,超时报错:TimeoutException):{}”, | |
| // future3.get(2, TimeUnit.SECONDS)); | |
| //推荐使用 join方法 | |
| // log.info(“future3的返回值(阻塞):{}”, future3.join()); | |
| //阻塞所有异步方法,一起提交后才走下面的代码 | |
| CompletableFuture.allOf(future1, future2).join(); | |
| log.info(“打印:{}”, “异步-阻塞-测试的-主方法2-end”); | |
| } | |
| //CompletableFuture开启多线程——无返回值的 | |
| @Test | |
| public void test06() throws Exception { | |
| List<CompletableFuture> futures = new ArrayList(); | |
| //循环,模仿很多任务 | |
| for (int i = 0; i < 1000; i++) { | |
| int finalI = i; | |
| CompletableFuture future = CompletableFuture.runAsync(() -> { | |
| //第一批创建的线程数 | |
| log.info(“打印:{}”, finalI); | |
| //模仿io流耗时 | |
| try { | |
| Thread.sleep(5000); | |
| } catch (InterruptedException e) { | |
| throw new RuntimeException(e); | |
| } | |
| }, myIoThreadPool); | |
| futures.add(future); | |
| } | |
| //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码 | |
| //如果不阻塞,上面的相当于异步执行了。 | |
| //阻塞方式1:可以获取返回的异常、设置等待时间 | |
| // futures.forEach(future -> { | |
| // try { | |
| // future.get(); | |
| // } catch (Exception e) { | |
| // throw new RuntimeException(e); | |
| // } | |
| // }); | |
| //阻塞方式2(推荐) | |
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); | |
| log.info(“打印:都执行完了。。。”); | |
| } | |
| //CompletableFuture开启多线程——无返回值的——构建一个新List | |
| //相当于多线程执行任务,然后把结果插入到List中 | |
| //接收多线程的List必须是线程安全的,ArrayList线程不安全 | |
| //线程安全的List —— CopyOnWriteArrayList 替代 ArrayList | |
| @Test | |
| public void test07() throws Exception { | |
| List<CompletableFuture> futures = new ArrayList(); | |
| //存数据的List | |
| List addList = new CopyOnWriteArrayList(); | |
| //循环,模仿很多任务 | |
| for (int i = 0; i < 1000; i++) { | |
| int finalI = i; | |
| CompletableFuture future = CompletableFuture.runAsync(() -> { | |
| log.info(“打印:{}”, finalI); | |
| UserCs userCs = new UserCs(); | |
| userCs.setName(String.format(“姓名-%s”, finalI)); | |
| userCs.setAge(finalI); | |
| addList.add(userCs); | |
| }, myIoThreadPool); | |
| futures.add(future); | |
| } | |
| //阻塞 | |
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); | |
| //返回新的List:endList,取age大于10的用户 | |
| List endList = addList.stream() | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } | |
| //CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况 | |
| //用CopyOnWriteArrayList 替代 ArrayList接收 | |
| @Test | |
| public void test08() throws Exception { | |
| //先获取数据,需要处理的任务。 | |
| List users = this.getUserCs(); | |
| //开启多线程 | |
| List<CompletableFuture> futures = new ArrayList(); | |
| //存数据的List | |
| List addList = new CopyOnWriteArrayList(); | |
| //莫法处理任务 | |
| users.forEach(user -> { | |
| CompletableFuture future = CompletableFuture.runAsync(() -> { | |
| //添加数据 | |
| user.setName(user.getName() + “-改”); | |
| addList.add(user); | |
| log.info(“打印-改:{}”, user.getName()); | |
| //其他的业务逻辑。。。 | |
| }, myIoThreadPool); | |
| futures.add(future); | |
| }); | |
| //阻塞 | |
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); | |
| //返回新的List:endList | |
| List endList = addList.stream() | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } | |
| //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map | |
| //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值 | |
| //使用stream流的map + CompletableFuture.supplyAsync() | |
| @Test | |
| public void test09() throws Exception { | |
| //先获取数据,需要处理的任务。 | |
| List users = this.getUserCs(); | |
| //莫法处理任务 | |
| List<CompletableFuture> futures = users.stream() | |
| .map(user -> CompletableFuture.supplyAsync(() -> { | |
| // 处理数据 | |
| user.setName(user.getName() + “-改”); | |
| log.info(“打印-改:{}”, user.getName()); | |
| // 其他的业务逻辑。。。 | |
| return user; | |
| }, myIoThreadPool)).collect(Collectors.toList()); | |
| //获取futures | |
| List endList = futures.stream() | |
| //阻塞所有线程 | |
| .map(CompletableFuture::join) | |
| //取age大于10的用户 | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } | |
| //基础数据 | |
| private List getUserCs() { | |
| List users = new ArrayList(); | |
| for (int i = 0; i < 1000; i++) { | |
| UserCs userCs = new UserCs(); | |
| userCs.setName(String.format(“姓名-%s”, i)); | |
| userCs.setAge(i); | |
| users.add(userCs); | |
| } | |
| return users; | |
| } | |
| //CompletableFuture 异常处理 | |
| @Test | |
| public void test10() throws Exception { | |
| //先获取数据,需要处理的任务。 | |
| List users = this.getUserCs(); | |
| //莫法处理任务 | |
| List<CompletableFuture> futures = users.stream() | |
| .map(user -> CompletableFuture.supplyAsync(() -> { | |
| if (user.getAge() > 5){ | |
| int a = 1/0; | |
| } | |
| // 处理数据 | |
| user.setName(user.getName() + “-改”); | |
| log.info(“打印-改:{}”, user.getName()); | |
| // 其他的业务逻辑。。。 | |
| return user; | |
| }, myIoThreadPool) | |
| //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。 | |
| .exceptionally(throwable -> { | |
| //可以直接获取user | |
| System.out.println(“异常了:” + user); | |
| //处理异常的方法…… | |
| //1还可以进行业务处理……比如将异常数据存起来,然后导出…… | |
| //2返回默认值,如:user、null | |
| //return user; | |
| //3抛出异常 | |
| throw new RuntimeException(throwable.getMessage()); | |
| }) | |
| //处理异常方式2:类似exceptionally(不推荐) | |
| // .handle((userCs, throwable) -> { | |
| // System.out.println(“handle:” + user); | |
| // if (throwable != null) { | |
| // // 处理异常 | |
| // log.error(“处理用户信息出现异常,用户名为:” + user.getName(), throwable); | |
| // // 返回原始数据 | |
| // return userCs; | |
| // } else { | |
| // // 返回正常数据 | |
| // return userCs; | |
| // } | |
| // }) | |
| ) | |
| .collect(Collectors.toList()); | |
| //获取futures | |
| List endList = futures.stream() | |
| //阻塞所有线程 | |
| .map(CompletableFuture::join) | |
| //取age大于10的用户 | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } | |
| //CompletableFuture 异常处理:如果出现异常就舍弃任务。 | |
| // 想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢? | |
| // 发现了异常任务也就完了。而且打印了异常,相当于返回了异常。 | |
| // 未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture | |
| @Test | |
| public void test11() { | |
| List users = getUserCs(); | |
| List<CompletableFuture> futures = users.stream() | |
| .map(user -> CompletableFuture.supplyAsync(() -> { | |
| if (user.getAge() > 15) { | |
| int a = 1 / 0; | |
| } | |
| user.setName(user.getName() + “-改”); | |
| log.info(“打印-改:{}”, user.getName()); | |
| return user; | |
| }, myIoThreadPool) | |
| //处理异常 | |
| .exceptionally(throwable -> { | |
| //其他处理异常的逻辑 | |
| return null; | |
| }) | |
| ) | |
| //舍弃返回的对象是null的 CompletableFuture | |
| .filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList()); | |
| //获取futures | |
| List endList = futures.stream() | |
| //阻塞所有线程 | |
| .map(CompletableFuture::join) | |
| //取age大于10的用户 | |
| .filter(user -> user.getAge() > 10) | |
| //按照age升序排序 | |
| .sorted(Comparator.comparing(UserCs::getAge)) | |
| .collect(Collectors.toList()); | |
| log.info(“打印:都执行完了。。。{}”, endList); | |
| } | |
| } | |
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/a0daded54b.html
