函数式接口

  • 函数:Function

    public interface Function<T, R> {
        R apply(T t);
    }
  • 消费者:Consumer

    public interface Consumer<T> {
        void accept(T t);
    }
  • 提供者:Supplier

    public interface Supplier<T> {
        T get();
    }
  • 断言:Predicate

    public interface Predicate<T> {
        boolean test(T t);
    }

Stream API

流程图


流处理

List<Integer> list = List.of(1, 2, -2, 3453, 4657, 6, 876, 87978, 0, 89, -234);
list.stream() // 获取流
    .filter(i -> i % 2 == 0) // 中间操作,intermediate operation
    .max(Integer::compareTo) // 终止操作,terminal operation
    .ifPresent(System.out::println); // 如果存在值,则执行

并发流处理

需自行解决并发安全问题

System.out.println("主线程 => " + Thread.currentThread());
​
List<Integer> list = List.of(1, 2, -2, 3453, 4657, 6, 876, 87978, 0, 89, -234);
list.stream() // 获取流
    .parallel() // 修改流的操作为并发操作
    .filter(i -> {
        System.out.println("filter => " + i + ", 线程 => " + Thread.currentThread());
        return i % 2 == 0;
    }) // 中间操作,intermediate operation
    .max((i1, i2) -> {
        System.out.println("max => " + i1 + "," + i2 + ", 线程 => " + Thread.currentThread());
        return i1.compareTo(i2);
    }) // 终止操作,terminal operation
    .ifPresent(System.out::println);

控制台输出:

主线程 => Thread[main,5,main]
filter => -234, 线程 => Thread[ForkJoinPool.commonPool-worker-8,5,main]
filter => 876, 线程 => Thread[main,5,main]
filter => 4657, 线程 => Thread[ForkJoinPool.commonPool-worker-6,5,main]
filter => 87978, 线程 => Thread[ForkJoinPool.commonPool-worker-9,5,main]
filter => 3453, 线程 => Thread[ForkJoinPool.commonPool-worker-10,5,main]
filter => -2, 线程 => Thread[ForkJoinPool.commonPool-worker-1,5,main]
filter => 1, 线程 => Thread[ForkJoinPool.commonPool-worker-7,5,main]
filter => 2, 线程 => Thread[ForkJoinPool.commonPool-worker-2,5,main]
filter => 89, 线程 => Thread[ForkJoinPool.commonPool-worker-3,5,main]
filter => 6, 线程 => Thread[ForkJoinPool.commonPool-worker-4,5,main]
filter => 0, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
max => 876,87978, 线程 => Thread[ForkJoinPool.commonPool-worker-9,5,main]
max => 2,-2, 线程 => Thread[ForkJoinPool.commonPool-worker-2,5,main]
max => 6,87978, 线程 => Thread[ForkJoinPool.commonPool-worker-9,5,main]
max => 0,-234, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
max => 87978,0, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
max => 2,87978, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
87978

中间操作

  • filter:过滤,挑出需要的元素

  • takeWhile:过滤,当不满足条件时,立刻结束流操作

  • map:映射,将元素a映射为元素b

  • flatMap:散列,一对多映射,将元素a映射为元素b,元素c等等


Reactive Stream

Flow

java.util.concurrent.Flow类中有java定义的响应式流API

  • Publisher:发布者

  • Subscriber:订阅者

  • Processor:处理器(即是发布者亦是订阅者)

  • Subscription:订阅关系


响应示例

package com.tlovo.reactive_stream;
​
import java.util.Scanner;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
​
public class FlowDemo {
    public static void main(String[] args) {
        // 1.定义发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
​
        // 2.定义中间操作
        MyProcessor processor = new MyProcessor();
​
        // 3.定义订阅者
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;
​
            /**
             * 当订阅时
             * @param subscription a new subscription
             */
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                System.out.println(Thread.currentThread() + "订阅开始了 => " + subscription);
                this.subscription = subscription;
​
                // 从上游请求数据
                subscription.request(1);
            }
​
            /**
             * 当写一个元素到达时
             * @param item the item
             */
            @Override
            public void onNext(String item) {
                System.out.println(Thread.currentThread() + "订阅者,接收到数据 => " + item);
                // 从上游请求数据
                subscription.request(1);
            }
​
            /**
             * 当发生错误时
             * @param ex the exception
             */
            @Override
            public void onError(Throwable ex) {
                System.out.println(Thread.currentThread() + "订阅者,发生异常 => " + ex);
            }
​
            /**
             * 当完成时
             */
            @Override
            public void onComplete() {
                System.out.println(Thread.currentThread() + "订阅者,接收到完成信号");
            }
        };
​
        // 4.绑定发布者和处理器
        publisher.subscribe(processor);
        // 5.绑定处理器和订阅者
        processor.subscribe(subscriber);
​
        // 5.发布者发布数据
        for (int i = 0; i < 10; i++) {
//            if (i >= 9)
//                publisher.closeExceptionally(new RuntimeException("num greater than 9."));
//            else
//                publisher.submit("p-" + i);
            publisher.submit("p-" + i);
        }
​
        // 5.通道关闭
        publisher.close();
​
        new Scanner(System.in).next();
    }
​
    /**
     * 自定义处理器
     */
    private static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
        private Flow.Subscription subscription;
​
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("Processor订阅绑定完成");
            this.subscription = subscription;
            subscription.request(1);
        }
​
        @Override
        public void onNext(String item) {
            // 再加工数据
            item = "pro+" + item;
            // 发送加工的数据
            submit(item);
            // 继续获取上游数据
            this.subscription.request(1);
        }
​
        @Override
        public void onError(Throwable ex) {
        }
​
        @Override
        public void onComplete() {
        }
    }
}

总结

规则,就是用来打破的( ̄へ ̄)!