函数式接口
函数:
Function
public interface Function<T, R> { R apply(T t); }
消费者:
Consumer
public interface Consumer<T> { void accept(T t); }
提供者:
Supplier
public interface Supplier<T> { T get(); }
断言:
Predicate
public interface Predicate<T> { boolean test(T t); }
Stream API
流程图
流处理
List<Integer> list = List.of(1, 2, -2, 3453, 4657, 6, 876, 87978, 0, 89, -234);
list.stream() // 获取流
.filter(i -> i % 2 == 0) // 中间操作,intermediate operation
.max(Integer::compareTo) // 终止操作,terminal operation
.ifPresent(System.out::println); // 如果存在值,则执行
并发流处理
需自行解决并发安全问题
System.out.println("主线程 => " + Thread.currentThread());
List<Integer> list = List.of(1, 2, -2, 3453, 4657, 6, 876, 87978, 0, 89, -234);
list.stream() // 获取流
.parallel() // 修改流的操作为并发操作
.filter(i -> {
System.out.println("filter => " + i + ", 线程 => " + Thread.currentThread());
return i % 2 == 0;
}) // 中间操作,intermediate operation
.max((i1, i2) -> {
System.out.println("max => " + i1 + "," + i2 + ", 线程 => " + Thread.currentThread());
return i1.compareTo(i2);
}) // 终止操作,terminal operation
.ifPresent(System.out::println);
控制台输出:
主线程 => Thread[main,5,main]
filter => -234, 线程 => Thread[ForkJoinPool.commonPool-worker-8,5,main]
filter => 876, 线程 => Thread[main,5,main]
filter => 4657, 线程 => Thread[ForkJoinPool.commonPool-worker-6,5,main]
filter => 87978, 线程 => Thread[ForkJoinPool.commonPool-worker-9,5,main]
filter => 3453, 线程 => Thread[ForkJoinPool.commonPool-worker-10,5,main]
filter => -2, 线程 => Thread[ForkJoinPool.commonPool-worker-1,5,main]
filter => 1, 线程 => Thread[ForkJoinPool.commonPool-worker-7,5,main]
filter => 2, 线程 => Thread[ForkJoinPool.commonPool-worker-2,5,main]
filter => 89, 线程 => Thread[ForkJoinPool.commonPool-worker-3,5,main]
filter => 6, 线程 => Thread[ForkJoinPool.commonPool-worker-4,5,main]
filter => 0, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
max => 876,87978, 线程 => Thread[ForkJoinPool.commonPool-worker-9,5,main]
max => 2,-2, 线程 => Thread[ForkJoinPool.commonPool-worker-2,5,main]
max => 6,87978, 线程 => Thread[ForkJoinPool.commonPool-worker-9,5,main]
max => 0,-234, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
max => 87978,0, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
max => 2,87978, 线程 => Thread[ForkJoinPool.commonPool-worker-5,5,main]
87978
中间操作
filter
:过滤,挑出需要的元素takeWhile
:过滤,当不满足条件时,立刻结束流操作map
:映射,将元素a映射为元素bflatMap
:散列,一对多映射,将元素a映射为元素b,元素c等等
Reactive Stream
Flow
在java.util.concurrent.Flow
类中有java定义的响应式流API
Publisher
:发布者Subscriber
:订阅者Processor
:处理器(即是发布者亦是订阅者)Subscription
:订阅关系
响应示例
package com.tlovo.reactive_stream;
import java.util.Scanner;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo {
public static void main(String[] args) {
// 1.定义发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 2.定义中间操作
MyProcessor processor = new MyProcessor();
// 3.定义订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
/**
* 当订阅时
* @param subscription a new subscription
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread() + "订阅开始了 => " + subscription);
this.subscription = subscription;
// 从上游请求数据
subscription.request(1);
}
/**
* 当写一个元素到达时
* @param item the item
*/
@Override
public void onNext(String item) {
System.out.println(Thread.currentThread() + "订阅者,接收到数据 => " + item);
// 从上游请求数据
subscription.request(1);
}
/**
* 当发生错误时
* @param ex the exception
*/
@Override
public void onError(Throwable ex) {
System.out.println(Thread.currentThread() + "订阅者,发生异常 => " + ex);
}
/**
* 当完成时
*/
@Override
public void onComplete() {
System.out.println(Thread.currentThread() + "订阅者,接收到完成信号");
}
};
// 4.绑定发布者和处理器
publisher.subscribe(processor);
// 5.绑定处理器和订阅者
processor.subscribe(subscriber);
// 5.发布者发布数据
for (int i = 0; i < 10; i++) {
// if (i >= 9)
// publisher.closeExceptionally(new RuntimeException("num greater than 9."));
// else
// publisher.submit("p-" + i);
publisher.submit("p-" + i);
}
// 5.通道关闭
publisher.close();
new Scanner(System.in).next();
}
/**
* 自定义处理器
*/
private static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Processor订阅绑定完成");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
// 再加工数据
item = "pro+" + item;
// 发送加工的数据
submit(item);
// 继续获取上游数据
this.subscription.request(1);
}
@Override
public void onError(Throwable ex) {
}
@Override
public void onComplete() {
}
}
}