java8 函数式和异步编程实践

王强

常用接口

  • Suppier
@FunctionalInterface
public interface Supplier<T> {


// 调用返回内部封装的对象
    T get();
}

example.

提现初始化一些数据,或者返回结果使用

Supplier<Person> a = () -> {  
Person p = new Person();  
p.setName("21312")  
...
return p;

Person p = a.get();

Supplier<Person> a =() ->  Person::New // :: 这种写法调用默认构造方法或者调用对应的方法
  • Predicate(BiPredicate)
@FunctionalInterface
public interface Predicate<T> {


    boolean test(T t);


  // predicate组合,and 两者同时成立
    default Predicate<T> and(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) && other.test(t);
    }

  // 判断结果去反,类似 if(!A) 中的 !
    default Predicate<T> negate() {
        return (t) -> !test(t);
    }

  // predicate 组合,两者有一个成立
    default Predicate<T> or(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) || other.test(t);
    }

}

example.
各种断言判断,if else 对应的逻辑,可以拆到predicate里面,便于重用和组合

Predicate<Int> prediateA = (num)-> ...  
Predicate<Int> predicateB = (num) -> ...

predicateA.and(predicateB).test(123132)  
predicateA.or(predicateB).test(123132)

predicateA.negate()

predicateA.test(123)  
predicateA.negate.test(123)
  • Consumer(BIConsumer)
@FunctionalInterface
public interface Consumer<T> {

    // 集合后者optional执行是调用
    void accept(T t);

    // 多个consumer组合,顺序执行
    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return (T t) -> { accept(t); after.accept(t); };
    }
}

example.

集合循环的时候,进行处理,不需要返回值

List<Integer> c = new ArrayList(1,2,3,4)  
c.stream().forEach(x -> {  
System.out.println(x);  
});
  • Function(BIFunction)
@FunctionalInterface
public interface Function<T, R> {

    // 函数调用是执行,一般用在集合或者Optional的map,flatMap方法
    R apply(T t);

    // 多个函数组合 a compose b,先执行b在执行a
    default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
        Objects.requireNonNull(before);
        return (V v) -> apply(before.apply(v));
    }

    // 多个函数组合,a andThen b,先执行a在执行b
    default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
        Objects.requireNonNull(after);
        return (T t) -> after.apply(apply(t));
    }

    // 返回入参本身不做任何处理
    static <T> Function<T, T> identity() {
        return t -> t;
    }
}

example.
函数式编程第一公民就是函数, 可以将各种业务逻辑封装到含数据中去,将各个函数组合产出最终结果 复杂io调用,网络调用,封装到其中,容易产生副作用,不是pure的函数,真正的函数要纯。

函数可以加载,在使用的时候调用,可以作为参数,实现闭包,高阶函数

Function<Obj1,Obj2> aF = (obj1) -> {  
.....
}
Function<Obj2,Obj3> bf = (obj2) -> {  
...
}

bf.compose(af).apply(new Obj1())  
af.andThen(bF).apply(new Obj1())

BiFunction<String,Integer,String> biF = (o1,o2) -> {  
reutrn o1 + o2;  
}

public void doXX(String ele, int num,BiFunctioin<ele,Integer,String> ff){  
ff.apply(ele,num)  
}
doXX("aaa",11,biF);

Optional

可选值,null值很容易引起空指针,每次都要做判断非空

Optional.empty(); //构造一个空的optional,里面没有任何元素  
Optional.ofNuable(); // 构造一个有可能是空的optional,里面有可能有元素有可能没有  
Optional.map() //对里面的非空对象进行转换,生成新的结果  
Optional.flatMap() //对里面的非空对象进行转换,生成新的结果,但flatMap中返回的是Optional,比如里面的对象也有可能为空的情况  
Optional.filter() 过滤出想要的结果  
Optional.orElse() 如果不存在为空,返回个默认值  
Optional.orElseGet() 如果不存在为空,返回个Supplier  
Optional.get() 获取对应的值  
Optional.isPresent() 判断是否包含值  
Optional.ifPresent(x ->{}) 如果存在执行对应的consume方法  
Optional.orElseThrow  不存在的话抛出异常  

example.

Optioal o = Optioanl.empty();  
Optional<Integer> op = Optional.of(1231)  
op.ifPresent(x -> System.out.println(x));  
op.map(x -> x * 10)  
op.flatMap(x -> {  
    if(x%2 == 0) return Optional.of("odd") 
    return Optional.empty();
});
Optional.empty().orElse(null);  
Optional.empty().orElseGet(() -> {"hello"});  
Optional.empty().orElseThrow(() -> new RuntimeException("no elements"));  

stream

  • 流的使用方便集合的遍历
  • 转换为并行方式,加快处理速度 并行中当心不要有io,网络等阻塞的操作,不然的话莫名奇妙的越来越慢
  • fluent programming
  • lazy stream
  • 永远无法踏入同一条河流,流使用完后就没有了,不可以重复使用
    allMatch // 集合中的所有元素都满足
    anyMatch  // 结合中的任何一个元素满足
    collect // 转换生成新的集合,一般放在最后
    count() // 集合中的元素的数量
    distinct() // 结合元素驱虫
    filter() // 结合元素过滤,通常和predicate结合
    findAny() // 找到集合中任意一个
    findFirst() // 找到符合条件的任意一个
    flatMap() // 集合元素转换
    forEach() // 集合元素循环
    limit() // 取多少个元素
    map() // 集合元素转换
    max() // 集合中最大的一个,可以结合Comparator
    min() // 集合中最小的一个,可以结合Comparator
    skip() // 跳过多少个元素
    sorted() // 结果排序,可以结果Comparator
    reduce() // 结果累计

exmaple.

List<Integer> c = new ArrayList();  
for (int i = 0; i < 10000; i ++) {  
    c.add(i)
}

c.stream().map(c -> c.toString).collect(Collectors.toList());

c.stream().filter(x -> x%2 ==0).findFirst() // 返回是Optional  
c.stream().filter(x -> x%2 != 0).limit(10) // 返回是stream  
c.stream().filter(x -> x%2 != 0).skip(10) // 返回是stream  
c.stream().filter(x -> x%2 != 0).take(10) // 返回是stream  
c.stream().reduce((0,(x,y) -> x + y)  
c.stream().sorted((x,y) -> x > y).collect(Collectors.tolist());

// 遍历获取Person对象的name属性,可以通过 map(x -> x.getName)也可以直接调用person::getName
List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());

更多组合件Colectors,比如分组生成map,比如生成其他的集合可以使用ollectors.toCollection中构建其他结合
     // Accumulate names into a TreeSet
     Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new));
  • Collectors里面常用方法
        // 字符串拼接,常用
// Convert elements to strings and concatenate them, separated by commas
     String joined = things.stream()
                           .map(Object::toString)
                           .collect(Collectors.joining(", "));

     // Compute sum of salaries of employee
     // 集合中元素求和
     int total = employees.stream()
                          .collect(Collectors.summingInt(Employee::getSalary)));

     // Group employees by department
     Map<Department, List<Employee>> byDept
         = employees.stream()
                    .collect(Collectors.groupingBy(Employee::getDepartment));

     // Compute sum of salaries by department
     Map<Department, Integer> totalByDept
         = employees.stream()
                    .collect(Collectors.groupingBy(Employee::getDepartment,
                                                  Collectors.summingInt(Employee::getSalary)));

     // Partition students into passing and failing
     Map<Boolean, List<Student>> passingFailing =
         students.stream()
                 .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));

Stream.iterate(1L, i -> i + 1).limit(1000).parallel().reduce(Long::sum).get();  
ongStream.rangeClosed(1, 10000).parallel().reduce(Long::sum).getAsLong();  
  • map 重复合并

map在进行stream转换处理的时候,要注意如果key相同的话,会抛出exception,
这个时候要对value进行处理

 HashMap<String, Integer> m1 = new HashMap<>();
    m1.put("aa", 1);
    m1.put("bb", 2);
    m1.put("cc", 3);
    m1.put("dd", 4);
    m1.put("ee", 5);

    HashMap<String, Integer> m2 = new HashMap<>();
    m2.put("aa", 1);
    m2.put("ff", 2);
    m2.put("gg", 3);
    m2.put("hh", 4);
    m2.put("ii", 5);

    Map<String, Integer> collect = m1.entrySet().stream()
        .collect(Collectors.toMap(entry -> entry.getKey() + "ssss", entry -> entry.getValue()));

    Map<String, Integer> c2 = Stream.concat(m1.entrySet().stream(), m2.entrySet().stream())
        .collect(Collectors
            .toMap(entry -> entry.getKey() + "new", entry -> entry.getValue(), (v1, v2) -> v1));

CompletableFuture

1.7之前每个future的获取,必须调用get阻塞直到获取到结果,并不能对所有future进行组合,使用很不方便,无法进行高效的异步编程

  • supplyAsync 需要异步处理的结果 () -> {}
  • thenApply 对结果进行再次加工
  • thenCompose 调用别的处理结果,组装
  • join 等待结果
  • thenCombine 调用别的处理结果,组装
  • exceptionally 出现异常进行
private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),  
                                                   new Shop("LetsSaveBig"),
                                                   new Shop("MyFavoriteShop"),
                                                   new Shop("BuyItAll"),
                                                   new Shop("ShopEasy"));

    private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

    // stream 方式,最常用方式
    public List<String> findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }

    // parallel stream 方式,并行执行,加快结合转换
    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }

    // 
    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> priceFutures = findPricesStream(product)
                .collect(Collectors.<CompletableFuture<String>>toList());

        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    // complefuture 进行组合
    public Stream<CompletableFuture<String>> findPricesStream(String product) {
        return shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
    }

    public void printPricesStream(String product) {
        long start = System.nanoTime();
        CompletableFuture[] futures = findPricesStream(product)
                .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
        System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    }
 public List<String> findPricesInUSD3(String product) {
        // Here, the for loop has been replaced by a mapping function...
        Stream<CompletableFuture<String>> priceFuturesStream = shops
            .stream()
            .map(shop -> CompletableFuture
                .supplyAsync(() -> shop.getPrice(product))
                .thenCombine(
                    CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD)),
                    (price, rate) -> price * rate)
                .thenApply(price -> shop.getName() + " price is " + price));

        List<CompletableFuture<String>> priceFutures = priceFuturesStream.collect(Collectors.toList());
        List<String> prices = priceFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        return prices;
    }

异常处理(exceptionally or handle)

 CompletableFuture<Object> e1 = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
      }
      throw new RuntimeException("error1");
    }).exceptionally(throwable -> { // 这个地方不处理的话会排除CompleteException
      System.out.println(throwable.getMessage());
      return null;
    });

    CompletableFuture<String> e2 = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return "no exception";
    });

    CompletableFuture<Object> e3 = CompletableFuture.supplyAsync(() -> {
      try {
        TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
      }
      throw new RuntimeException("error2");
    }).handle((msg,ex) -> {  // 这个地方不处理的话会排除CompleteException
      if(msg == null) {
        return null;
      }
      return msg;
    });


    CompletableFuture<String> eResult = e1.thenCombine(e2, (a, b) -> a + b);
    CompletableFuture<String> eResult2 = e1.thenCombine(e3, (a, b) -> {
      if (a == null && b == null)
        return null;
      else {
        return "default";
      }
    });

timeout
每个Future执行设置超时时间,不支持,需要自己实现

CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"), executorService)  
                         .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),  
                                                 this::convert)
                         .acceptEither(timeoutAfter(1, TimeUnit.SECONDS), 
                                       amount -> System.out.println("The price is: " + amount + "GBP"));

## 自定义 schedule实现超时机制                                       
public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {  
    CompletableFuture<T> result = new CompletableFuture<T>();
    delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
    return result;
}

extention

like scala 特性(http://www.vavr.io/) or scala(https://www.scala-lang.org/)

Either  
Try  
Tuple  
match case  
Option  

lambda more

  • 捕获变量

lamada表达始终引用的变量都单独定义,表达式中不依赖变量中的值,而是只依赖参数中传过来的值

    int a = 10;
    int s = "aaaa"
    BiFunction<Integer,String,String> f  = (i,s) -> i + s;
    f.apply(a,s)
  • 不捕获变量

lamada表达始终引用的变量都单独定义,表达式中有一部分依赖变量中的值和参数中的值

    int a = 10;
    int s = "aaaa"
    Function<Integer,String> f  = (i) -> i + s;
    f.apply(a)

使用捕获和不捕获方式性能有些差别,不捕获变量方式,jvm最终还是要翻译成捕获变量方式,在一定程度上有部分性能开销

底层实现:invokedynamic (jdk 1.7+) 类似原理参考java.lang.invoke中的包,参考一下资料

import java.util.function.Function;

public class Lambda {  
    Function<String, Integer> f = s -> Integer.parseInt(s);
}
0: aload_0  
1: invokespecial #1 // Method java/lang/Object."<init>":()V  
4: aload_0  
5: invokedynamic #2, 0 // InvokeDynamic  
                  #0:apply:()Ljava/util/function/Function;
10: putfield #3 // Field f:Ljava/util/function/Function;  
13: return  

相关资料

https://github.com/java8  
https://docs.oracle.com/javase/8/docs/api/  
http://erajasekar.com/posts/better-exception-handling-java8-streams-using-javaslang/  
http://www.esynergy-solutions.co.uk/blog/asynchronous-timeouts-completable-future-java-8-and-9  
http://www.infoq.com/cn/articles/Invokedynamic-Javas-secret-weapon  
http://www.infoq.com/cn/articles/Java-8-Lambdas-A-Peek-Under-the-Hood