数据流
数据流的链式操作是以之前为条件
根据元素数量不同有
Flux
和Mono
Flux
一个数据流由如下构成:
元素:
0~N
信号:有且仅有一个(为正常或异常)
上述弹珠图表示,数据流经过一些操作(operator
)后,产生新数据流
Mono
元素:
0~1
信号:有且仅有一个(为正常或异常)
API触发时机
doOnNext:每个数据(流的数据)到达时
doOnEach:每个数据(流的数据和信号)到达时
doOnRequest:消费者请求元素时
doOnError:数据流操作时发送异常时
doOnSubscribe:流被订阅时
doOnTerminate:流被中断(取消/异常信号)时
doOnCancel:流被取消时
doOnDiscard:流中元素被忽略时
Reactor API
subscribe
流在订阅后才能被消费,否则流什么都没干
自定义消费者
Flux<Integer> range = Flux.range(1, 10);
range.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅时触发
System.out.println("绑定了 => " + subscription);
// 向发布者获取数据
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("数据到达 => " + value);
request(1); // 继续获取数据
}
@Override
protected void hookOnComplete() {
System.out.println("流正常关闭");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流发送异常 => " + throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消");
}
/**
* 最终回调,一定会被执行
*/
@Override
protected void hookFinally(SignalType type) {
System.out.println("流最终执行 => " + type);
}
});
onErrorComplete
将错误信号转化为正常信号
buffer
Flux<List<Integer>> flux = Flux.range(1, 10)
.buffer(3);// 缓冲区,缓冲3个元素,消费者以每组3个元素为1个单位进行请求
flux.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2); // request(N)代表找发布者请求N次,总共能得到N*bufferSize个数据
}
@Override
protected void hookOnNext(List<Integer> value) {
System.out.println("元素 => " + value);
}
});
limitRate
Flux.range(1, 1000)
.log()
.limitRate(100) // 一次预取100个元素 request(100)
.subscribe();
// 75%预取策略:
// 第一次抓取100个数据,如果75%的元素已经处理了,继续抓取新的75%个元素
// request(75)
编程方式创建序列
同步环境-generate
Flux<Object> flux = Flux.generate(() -> 0, // 初始state值
(state, sink) -> {
if (state <= 10) {
sink.next(state); // 将元素传出去
} else {
sink.complete(); // 完成
}
return state + 1; // 返回迭代state值
});
flux.log().subscribe();
多线程-create
public static void main(String[] args) {
Flux.create(sink -> {
MyListener listener = new MyListener(sink);
for (int i = 0; i < 100; i++) {
listener.online("用户-" + i);
}
}).log().subscribe();
}
private static class MyListener {
FluxSink<Object> sink;
public MyListener(FluxSink<Object> sink) {
this.sink = sink;
}
// 用户登录,触发online监听器
public void online(Object username) {
System.out.println("用户登录了 => " + username);
sink.next(username); // 传入用户名称
}
}
Handle
Flux.range(1, 10)
.handle((value, sink) -> {
System.out.println("拿到的值 => " + value);
sink.next("自定义处理 - " + value); // 可以向下发送数据的通道
})
.log()
.subscribe();
自定义线程调度规则
// 流的发布、中间操作,默认使用当前线程
Flux<Integer> flux = Flux.range(1, 10)
.publishOn(Schedulers.single()) // 指定发布者所在线程池
.log();
// .subscribeOn(Schedulers.immediate()); // 指定订阅者所在线程池
// 只要不指定线程池,默认发布者用的是订阅者的线程池
new Thread(() -> flux.subscribe(System.out::println)).start();
// 调度器(线程池)
// Schedulers.immediate(); // 默认,无执行上下文,当前线程运行所有操作
// Schedulers.single(); // 使用固定的一个单线程
// 有界、弹性调度,非无限扩充的线程池,线程池中有10*CPU核心个线程,队列默认100k,存活空闲时间60s
// Schedulers.boundedElastic();
// 自定义线程池
// Schedulers.fromExecutor(new ThreadPoolExecutor(4, 8, 60,
// TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000)));
// Schedulers.parallel(); // 并行
错误处理
onErrorReturn
吞掉异常,消费者无异常感知
返回一个默认值或执行一个方法
流直接正常完成
onErrorResume
吞掉异常,消费者无异常感知
调用一个方法返回新流代替异常元素
流直接正常完成
或
吞掉异常,消费者无异常感知
捕获并动态计算一个值
Flux.just(1,2,3,5,6) .onErrorResume(e -> Flux.error(new RuntimeException(e))) .log() .subscribe();
流直接异常完成
onErrorMap
吞掉源异常,抛新异常(包装异常),消费者有异常感知
流直接异常完成
推荐使用
doOnError
捕获异常,记录特殊日志,重新抛出
不吞异常,消费者有异常感知
不影响异常继续顺着流水线传播
doFinally
不管正常、取消还是异常结束,都会执行
onErrorContinue
错误后继续处理下一个元素
Flux.just(1,2,3,0,9,8,5)
.map(i -> 10 / i)
.onErrorContinue((ex, val) -> {
System.out.println("ex => " + ex);
System.out.println("val => " + val);
})
.log()
.subscribe();
onErrorStop
错误后立刻停止流,源头中断,所有订阅者全部结束
onErrorComplete
将错误信号替换为正常信号
超时与重试
Flux.just(1)
.delayElements(Duration.ofSeconds(3))
.log()
.timeout(Duration.ofSeconds(2)) // 超时后抛出异常
.retry(3) // 把流从头到尾重新请求一次,此处是重试3次
.onErrorReturn(2)
.map(i -> i + "hh")
.subscribe(System.out::println);
Sinks
背压、单播、多播和重放
// Sinks.many(); // 发送Flux数据
// Sinks.one(); // 发送Mono数据
// Sinks => 接收器,数据管道,所有数据顺着这个管道往下流
// Sinks.many().unicast(); // 单播 => 该管道只能绑定单个订阅者
// Sinks.many().multicast(); // 多播 => 该管道能绑定多个订阅者
// 订阅者从头消费元素 或 订阅那一刻开始消费元素
// Sinks.many().replay(); // 重放 => 该管道能重放元素(是否给后来的订阅者把之前的元素依然发给它)
Sinks.Many<Object> many = Sinks.many()
.multicast()
.onBackpressureBuffer();// 背压队列,订阅者每次最大处理的元素
// 订阅者默认从订阅的那一刻开始接收元素
// 如果有2个订阅者A和B,有10个元素,每秒处理1个元素
// A在消费5个元素后,B进行订阅,则B从第6个元素开始消费
// replay会使B从头开始消费
// Sinks.many().replay();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
many.tryEmitNext("a-" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
// 订阅
many.asFlux().subscribe(v -> System.out.println("v1 => " + v));
new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
many.asFlux().subscribe(v -> System.out.println("v2 => " + v));
}).start();
缓存
Flux<Integer> cache = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1))
.cache(3);// 缓存元素,不调缓存默认缓存所有元素
cache.subscribe();
new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 5秒后从最近缓存的元素开始消费
cache.subscribe(System.out::println);
}).start();
阻塞
Integer last = Flux.just(1, 2, 4)
.map(i -> i + 10)
.blockLast();
System.out.println(last);
List<Integer> eleList = Flux.just(1, 2, 3, 4, 5)
.map(i -> i + 10)
.collectList()
.block(); // 也是一种订阅者
System.out.println(eleList);
并发
// 百万数据,8个线程,每个线程处理100个数据,进行分批处理直到结束
Flux.range(1,100_0000)
.buffer(100) // 每个线程处理100个数据
.parallel(8) // 8个线程
.runOn(Schedulers.newParallel("YY")) // 进行分批处理
.log()
.flatMap(Flux::fromIterable)
.collectSortedList(Integer::compareTo)
.subscribe(System.out::println);
Context
// ThreadLocal 在响应式编程中无法使用
// 响应式中,数据流期间共享数据 => Context 可读写 | ContextView 只读
// 必须使用支持Context的API才能使用上下文功能
// Context不可变
Flux.just(1, 2, 3)
.transformDeferredContextual((flux, ctx) -> {
System.out.println("flux=" + flux);
System.out.println("ctx=" + ctx);
return flux.map(i -> i + "+++" + ctx.get("prefix"));
})
// 上游能拿到下游的最近一次数据
.contextWrite(Context.of("prefix", "hh")) // ThreadLocal共享了数据,上游的所有人都能看到,Context由下游传播给上游
.subscribe(v -> System.out.println("v=" + v));
defer
在defer里面定义的代码,有订阅者且流被激活才会动态调用该方法
Mono.defer(() -> {
System.out.println("动态运行代码");
return Mono.just(1);
});