数据流

  • 数据流的链式操作是以之前为条件

  • 根据元素数量不同有FluxMono


Flux

一个数据流由如下构成:

  • 元素:0~N

  • 信号:有且仅有一个(为正常或异常)

上述弹珠图表示,数据流经过一些操作(operator)后,产生新数据流


Mono

  • 元素:0~1

  • 信号:有且仅有一个(为正常或异常)


API触发时机

  • doOnNext:每个数据(流的数据)到达时

  • doOnEach:每个数据(流的数据和信号)到达时

  • doOnRequest:消费者请求元素时

  • doOnError:数据流操作时发送异常时

  • doOnSubscribe:流被订阅时

  • doOnTerminate:流被中断(取消/异常信号)时

  • doOnCancel:流被取消时

  • doOnDiscard:流中元素被忽略时


Reactor API

subscribe

流在订阅后才能被消费,否则流什么都没干

自定义消费者

Flux<Integer> range = Flux.range(1, 10);

range.subscribe(new BaseSubscriber<>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // 流被订阅时触发
        System.out.println("绑定了 => " + subscription);

        // 向发布者获取数据
        request(1);
    }

    @Override
    protected void hookOnNext(Integer value) {
        System.out.println("数据到达 => " + value);
        request(1); // 继续获取数据
    }

    @Override
    protected void hookOnComplete() {
        System.out.println("流正常关闭");
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        System.out.println("流发送异常 => " + throwable.getMessage());
    }

    @Override
    protected void hookOnCancel() {
        System.out.println("流被取消");
    }

    /**
     * 最终回调,一定会被执行
     */
    @Override
    protected void hookFinally(SignalType type) {
        System.out.println("流最终执行 => " + type);
    }
});

onErrorComplete

将错误信号转化为正常信号


buffer

Flux<List<Integer>> flux = Flux.range(1, 10)
    .buffer(3);// 缓冲区,缓冲3个元素,消费者以每组3个元素为1个单位进行请求
​
flux.subscribe(new BaseSubscriber<>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(2); // request(N)代表找发布者请求N次,总共能得到N*bufferSize个数据
    }
​
    @Override
    protected void hookOnNext(List<Integer> value) {
        System.out.println("元素 => " + value);
    }
});

limitRate

Flux.range(1, 1000)
    .log()
    .limitRate(100) // 一次预取100个元素 request(100)
    .subscribe();
// 75%预取策略:
//  第一次抓取100个数据,如果75%的元素已经处理了,继续抓取新的75%个元素
//  request(75)

编程方式创建序列

同步环境-generate

Flux<Object> flux = Flux.generate(() -> 0, // 初始state值
                                  (state, sink) -> {
​
                                      if (state <= 10) {
                                          sink.next(state); // 将元素传出去
                                      } else {
                                          sink.complete(); // 完成
                                      }
​
                                      return state + 1; // 返回迭代state值
                                  });
​
flux.log().subscribe();

多线程-create

public static void main(String[] args) {
    Flux.create(sink -> {
        MyListener listener = new MyListener(sink);
        for (int i = 0; i < 100; i++) {
            listener.online("用户-" + i);
        }
    }).log().subscribe();
}
​
private static class MyListener {
    FluxSink<Object> sink;
​
    public MyListener(FluxSink<Object> sink) {
        this.sink = sink;
    }
​
    // 用户登录,触发online监听器
    public void online(Object username) {
        System.out.println("用户登录了 => " + username);
        sink.next(username); // 传入用户名称
    }
}

Handle

Flux.range(1, 10)
    .handle((value, sink) -> {
        System.out.println("拿到的值 => " + value);
        sink.next("自定义处理 - " + value); // 可以向下发送数据的通道
    })
    .log()
    .subscribe();

自定义线程调度规则


// 流的发布、中间操作,默认使用当前线程
Flux<Integer> flux = Flux.range(1, 10)
    .publishOn(Schedulers.single()) // 指定发布者所在线程池
    .log();
// .subscribeOn(Schedulers.immediate()); // 指定订阅者所在线程池
// 只要不指定线程池,默认发布者用的是订阅者的线程池
new Thread(() -> flux.subscribe(System.out::println)).start();
​
// 调度器(线程池)
// Schedulers.immediate(); // 默认,无执行上下文,当前线程运行所有操作
// Schedulers.single(); // 使用固定的一个单线程
​
// 有界、弹性调度,非无限扩充的线程池,线程池中有10*CPU核心个线程,队列默认100k,存活空闲时间60s
// Schedulers.boundedElastic();
​
// 自定义线程池
// Schedulers.fromExecutor(new ThreadPoolExecutor(4, 8, 60,
//         TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000)));
​
// Schedulers.parallel(); // 并行

错误处理

onErrorReturn

  • 吞掉异常,消费者异常感知

  • 返回一个默认值或执行一个方法

  • 流直接正常完成


onErrorResume

  • 吞掉异常,消费者异常感知

  • 调用一个方法返回新流代替异常元素

  • 流直接正常完成

  • 吞掉异常,消费者无异常感知

  • 捕获并动态计算一个值

    Flux.just(1,2,3,5,6)
        .onErrorResume(e -> Flux.error(new RuntimeException(e)))
        .log()
        .subscribe();
  • 流直接异常完成


onErrorMap

  • 吞掉源异常,抛新异常(包装异常),消费者异常感知

  • 流直接异常完成

  • 推荐使用


doOnError

捕获异常,记录特殊日志,重新抛出

  • 不吞异常,消费者异常感知

  • 不影响异常继续顺着流水线传播


doFinally

不管正常、取消还是异常结束,都会执行


onErrorContinue

错误后继续处理下一个元素

Flux.just(1,2,3,0,9,8,5)
    .map(i -> 10 / i)
    .onErrorContinue((ex, val) -> {
        System.out.println("ex => " + ex);
        System.out.println("val => " + val);
    })
    .log()
    .subscribe();

onErrorStop

错误后立刻停止流,源头中断,所有订阅者全部结束


onErrorComplete

将错误信号替换为正常信号


超时与重试

Flux.just(1)
    .delayElements(Duration.ofSeconds(3))
    .log()
    .timeout(Duration.ofSeconds(2)) // 超时后抛出异常
    .retry(3) // 把流从头到尾重新请求一次,此处是重试3次
    .onErrorReturn(2)
    .map(i -> i + "hh")
    .subscribe(System.out::println);

Sinks

背压、单播、多播和重放

// Sinks.many(); // 发送Flux数据
// Sinks.one(); // 发送Mono数据
​
// Sinks => 接收器,数据管道,所有数据顺着这个管道往下流
// Sinks.many().unicast(); // 单播 => 该管道只能绑定单个订阅者
// Sinks.many().multicast(); // 多播 => 该管道能绑定多个订阅者
// 订阅者从头消费元素 或 订阅那一刻开始消费元素
// Sinks.many().replay(); // 重放 => 该管道能重放元素(是否给后来的订阅者把之前的元素依然发给它)
​
Sinks.Many<Object> many = Sinks.many()
    .multicast()
    .onBackpressureBuffer();// 背压队列,订阅者每次最大处理的元素
​
// 订阅者默认从订阅的那一刻开始接收元素
//  如果有2个订阅者A和B,有10个元素,每秒处理1个元素
//  A在消费5个元素后,B进行订阅,则B从第6个元素开始消费
// replay会使B从头开始消费
// Sinks.many().replay();
​
new Thread(() -> {
    for (int i = 0; i < 10; i++) {
        many.tryEmitNext("a-" + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}).start();
​
// 订阅
many.asFlux().subscribe(v -> System.out.println("v1 => " + v));
​
new Thread(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
​
    many.asFlux().subscribe(v -> System.out.println("v2 => " + v));
}).start();

缓存

Flux<Integer> cache = Flux.range(1, 10)
    .delayElements(Duration.ofSeconds(1))
    .cache(3);// 缓存元素,不调缓存默认缓存所有元素
​
cache.subscribe();
​
new Thread(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
​
    // 5秒后从最近缓存的元素开始消费
    cache.subscribe(System.out::println);
}).start();

阻塞

Integer last = Flux.just(1, 2, 4)
    .map(i -> i + 10)
    .blockLast();
System.out.println(last);
​
List<Integer> eleList = Flux.just(1, 2, 3, 4, 5)
    .map(i -> i + 10)
    .collectList()
    .block(); // 也是一种订阅者
System.out.println(eleList);

并发

// 百万数据,8个线程,每个线程处理100个数据,进行分批处理直到结束
Flux.range(1,100_0000)
    .buffer(100) // 每个线程处理100个数据
    .parallel(8) // 8个线程
    .runOn(Schedulers.newParallel("YY")) // 进行分批处理
    .log()
    .flatMap(Flux::fromIterable)
    .collectSortedList(Integer::compareTo)
    .subscribe(System.out::println);

Context

// ThreadLocal 在响应式编程中无法使用
// 响应式中,数据流期间共享数据 => Context 可读写 | ContextView 只读
// 必须使用支持Context的API才能使用上下文功能
// Context不可变
Flux.just(1, 2, 3)
    .transformDeferredContextual((flux, ctx) -> {
        System.out.println("flux=" + flux);
        System.out.println("ctx=" + ctx);
        return flux.map(i -> i + "+++" + ctx.get("prefix"));
    })
    // 上游能拿到下游的最近一次数据
    .contextWrite(Context.of("prefix", "hh")) // ThreadLocal共享了数据,上游的所有人都能看到,Context由下游传播给上游
    .subscribe(v -> System.out.println("v=" + v));

defer

在defer里面定义的代码,有订阅者且流被激活才会动态调用该方法

Mono.defer(() -> {
    System.out.println("动态运行代码");
    return Mono.just(1);
});

规则,就是用来打破的( ̄へ ̄)!