办公问答网

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 180|回复: 0

Reactor

[复制链接]

2

主题

5

帖子

7

积分

新手上路

Rank: 1

积分
7
发表于 2022-11-26 21:02:30 | 显示全部楼层 |阅读模式
Reactor

初识Reactor
! ! Reactor讲解
Reactor:响应式编程,区别于一般:Client去询问Server是否有变化并拉取数据的模式,响应式编程是后台有数据变化时主动push给订阅者,也就是Publisher-Subscriber模型。
一、特点


  • 可组合性,即可以协调多个异步任务。
  • 提供丰富的操作符。
  • 订阅之前不会产生任何的动作。(即只有订阅的时候才执行操作)
  • 冷热响应

    • 冷响应:对于每个订阅者,包括在数据源处,数据推送都会从头开始。例如,如果源包装了HTTP调用,则会为每个订阅发出一个新的HTTP请求。
    • 热响应:并不是每个订阅者都可以从头开始接收数据。后面的订阅者会在订阅后接收已推送的信号。

  • 异步非堵塞
  • 流量控制(背压控制)
    Publisher的数据推送的过于频繁,Subscriber处理数据的速度跟不上Publisher推送的速度。这时候就用到背压控制:Subscriber向Publisher发送信号来控制Publisher push数据的速度
    可以用reactive库中的WebClient来进行实现。
二、Operators

1、filter: 过滤

对Flux流中包含的元素进行过滤,只留下满足指定条件的元素。


//只留下偶数元素
Flux.range(1,10).filter(i->i%2==0).subscribe(System.out::println);

2、map: 数据映射

把Flux的数据通过某些逻辑转换成另外的形式。


//结果为A B C
Flux.just("a","b","c").map(String::toUpperCase).subscribe(System.out::println);3、index: 编号

给Flux中的每个元素加上编号。


4、window: 分组

把当前流中的元素收集到另外的 Flux 序列中,也就是分组,分组结果为多个Flux序列。


//把一个包含1-100的flux序列分组,20个为一组,每组为一个新的Flux序列
//这里window为一个包含5个Flux序列的Flux序列数组,其中每个Flux有20个元素
Flux<Flux<Integer>> window = Flux.range(1,100).window(20);
window.subscribe(System.out::println);

为什么输出的结果是UnicastProcessor?

其实,在调用window()方法的时候,其调用过程大致如下:

  • 序列得到订阅,则开始首次发送元素,这时候会创建一个新的源序列UnicastProcessor。
  • 当UnicastProcessor接受的元素数量到达20个时,执行对应的onComplete()方法。
  • 那么当第21个元素发送的时候,又会创建一个新的源序列UnicastProcessor,并向下游传递,以此类推。
注意: window和buffer的区别

(首先有个前提,我们数据都是从上游往下游进行传输的)

  • window是先创建一个UnicastProcessor序列,然后直接向下游传递。
  • buffer是先收集满20个元素,再向下游进行传递。
5、buffer:分组

对一个Flux的元素进行分组,分组结果还是1个Flux序列。


5.1 buffer:有一个参数,指定了每组所包含的元素的最大数量

//结果:输出5组数据,每一组数据包含20个元素
Flux.range(1,100).buffer(20).subscribe(System.out::println)

5.2 bufferUntil:参数表示每个集合中的元素要满足的条件。bufferUntil会一直收集,直到条件==true

//结果:输出5组数据,每一组包含2个元素
//只有当前元素为偶数的时候,才会停止当前元素的收集(当前组包含当前元素),接下来的元素另起组
Flux.range(1,10).bufferUntil(i->i%2==0).subscribe(System.out::println);

5.3 bufferTimeout:2个参数,分别制定了每组所包含的元素的最大数量、收集的时间间隔

5.4 bufferWhile:参数表示每个集合中的元素要满足的条件。bufferWhile则只有当条件==true时才会收集。一旦为false,会立即开始下一次收集。

//结果:输出5组数据,每组包含1个元素
//只有当前元素为偶数时,才进行元素收集
Flux.range(1,10).bufferWhile(i->i%2==0).subscribe(System.out::println);

6、take: 提取数据





6.1 take

//取前三个元素
//结果为1 2 3
Flux.range(1,100).take(3).subscribe(System.out::println);6.2 takeLast

//取最后三个元素
//结果为98 99 100
Flux.range(1,100).takeLast(3).subscribe(System.out::println);6.3 takeWhile

//当条件为真时才进行提取
//结果为1 2
Flux.range(1,100).takeWhile(i->i<3).subscribe(System.out::println);6.4 takeUntil

//当条件为真时停止提取
//结果为1 2
Flux.range(1,100).takeWhile(i->i==3).subscribe(System.out::println);7、zipWith:元素合并

将两个Flux流中的元素按照一对一的方式进行合并


注意:


  • 在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流。
  • 若某个序列中的元素数量偏多或者偏少,那么多余的结果并不会输出
// 结果:[a,c] [b,d]
Flux.just("a", "b")
    .zipWith(Flux.just("c", "d","e","f"))
    .subscribe(System.out::println);

// 结果:a-c  b-d
//第二个参数为逻辑处理流程
Flux.just("a", "b")
    .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
    .subscribe(System.out::println);


8、reduce:累加、累积

对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列
注意:


  • 在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
  • 累积操作并不单单代表累加、累积的任意一种,而指的是对所有的元素做一个统一的操作,可能是累积、累加等一系列操作。
//1-100累加,结果为5050
Flux.range(1,100)
    .reduce(Integer::sum)
    .subscribe(System.out::println);
//1-100累加再加上100,结果为5150
Flux.range(1, 100)
    .reduceWith(() -> 100, Integer::sum)
    .subscribe(System.out::println);9、merge:流合并

把多个流合并成一个 Flux 序列
Flux.merge(
    Flux.interval(Duration.ZERO, Duration.ofMillis(100)).take(9),
    Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(2),
    Flux.interval(Duration.ofMillis(100), Duration.ofMillis(100)).take(3)
).subscribe((t) -> System.out.println("Flux.merge :" + t + ",线程:" + Thread.currentThread().getName()));
    // 让主线程睡眠2s,保证上面的输出能够完整
    Thread.sleep(2000);


注意:


  • merge() 会按照所有流中元素的实际产生顺序来合并,代码中定义了每个流中元素产生的时间间隔。
  • merge()方法的兄弟方法:mergeSequential(),按照所有流被订阅的顺序,以流为单位进行合并。
  • 代码中对Flux.interval()方法生成的流,采用了take()操作来制定获取的元素,若不采用,那么该流是一个无限序列。
10、interval:延迟

Flux.interval(xxx1,xxx2):时间类操作符,按照指定的参数来创建流。

  • xxx1:第一次执行的延迟时间。
  • xxx2:每隔多少秒发送一次事件,发送的内容是Long类型整数,从0开始。
11、flatMap:转换并合并结果

把流中的每个元素转换成一个流,再把所有流中的元素进行合并
Flux<String> stringFlux1 = Flux.just("a", "b", "c", "d", "e", "f", "g", "h", "i");

// [a,b],[c,d],[e,f],[g,h],
Flux<Flux<String>> stringFlux2 = stringFlux1.window(2);

stringFlux2.flatMap(flux1 -> flux1.map(String::toUpperCase)).subscribe(System.out::print);


12、concatMap:转换并合并结果

concatMap 操作同样是根据源中的每个元素来获取一个子源,再把所有子源进行合并。
Flux.just(5, 10)
    // 将源中的每个元素来获取一个子源,那么这里的x指的是5或者10
    // 然后以元素10为例:x=10,通过Flux.interval来生成序列,并取出10-3=7个元素
    .concatMap(x -> Flux
               //延迟x*10ms,之后每100ms开始生成从0开始的元素:0、1、2、3、4……
               .interval(Duration.ofMillis(x * 10), Duration.ofMillis(100))
               //将原本Flux流中的两个元素和interval生成的元素通过map合并
               //形式为x:i
               .map(i -> x + ": " + i)
               //取最终流中的前x-3个元素
               .take(x - 3))
    .toStream()
    .forEach(System.out::println);

注意: concatMap和flatMap有什么不同?


  • 顺序:concatMap 操作会根据初始源中的元素顺序依次将获取到的子源进行合并。而flatMap是不会的。
  • 订阅:concatMap 操作对所获取到的子源的订阅是动态进行的,而flatMap则是在合并开始之前就订阅了由父源下发的所有子源。(?不懂)
13、groupBy:分组

通过一个策略 key 将一个 Flux分割为多个组
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    //分组,偶数为一组,奇数为一组。key分别为even和odd
    .groupBy(i -> i % 2 == 0 ? "even" : "odd")
    //合并两个Flux流
    .concatMap(i -> i
               //defaultIfEmpty:如果没有任何数据完成此序列,则提供默认的唯一值
               .defaultIfEmpty(-1)
               //强制转换为String
               .map(String::valueOf)
               //不确定,应该是以i的key为每个i的前缀,这里i表示两个Flux流
               .startWith(i.key())
              )
    .subscribe(System.out::println);



注意:


  • 几个组之间的元素不会存在交集,也就是一个元素只属于其中1个组。
  • 组永远不会为空,因为如果进行分组的时候发现没有对应的组,则进行创建操作。
14、defaultIfEmpty:分配默认值

如果没有任何数据完成此序列,则提供默认的唯一值


三、流式编程Example

消息驱动模式:subcribe()之前的代码描述应该发生什么,但是都尚未发生,直到有人subscribe(代码最后一行subcribe()),才会进行处理的流程(每个.之后都是一步流程) 。也就是订阅之前不会产生任何的动作






当produces= MediaType.APPLICATION_STREAM_JSON_VALUE时,标注了stream,那么就表述在Publisher和Subcriber之间建立了一个长链接


四、Reactor Publisher

https://skyao.io/learning-reactor/docs/concept/flux/flux.html
https://segmentfault.com/a/1190000024499748
Publisher<T>是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>的需求推送元素。一个Publisher<T>可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。
下面这个Excel计算就能说明一些Publisher<T>的特点。


A1-A9就可以看做Publisher<T>及其提供的元素序列。A10-A13可以看作订阅者Subscriber。假如说我们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。这也是响应式的一个重要特点:当没有订阅时发布者什么也不做
FluxMono都是Publisher<T>在Reactor 3实现。
4.1 Flux(一串数据的抽象)

Flux作为Publisher,产生0-N个数据对象 ==> 数据对象交给operator()处理器进行处理 ==> 生成另外的数据对象。
Flux 是一个发出(emit)0-N个元素组成的异步序列的Publisher<T>.
在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError。

  • 正常的包含元素的消息:onNext()
  • 序列结束的消息:onComplete()
  • 序列出错的消息:onError()




  • Flux序列中可以有n个元素,每个元素都会经过operator操作进行数据的转换然后输出。
  • 如果由于某些原因发生了错误,那么会终止该序列。
4.1.1 创建Flux

(1) generate

generate()方法的结构:

generate(
    // 用于初始化值
    Callable<S> stateSupplier,
    // 生成器,也就是逻辑处理的函数
    BiFunction<S, SynchronousSink<T>, S> generator,
    // consumer,用于在序列终止的时候调用执行,比如关闭数据源   
    Consumer<? super S> stateConsumer
);
/**
简单来看,该方法的调用就是三步走:
初始化。
逻辑处理。
终止处理(fiinally)
**/   简单例子:

Flux.generate(t->{
    t.next(1);
    t.complete();
}).subscribe(System.out::println);
//创建一个Flux,有1个元素
//注意generate中next只能调用1次复杂例子:

public void testGenerate() {
    Flux.generate(
            // 1.生成一个对象,用来作为状态,即初始化操作。
            AtomicInteger::new,
            // 2.逻辑处理
            (state, sink) -> {
                // 改变状态值
                int i = state.getAndIncrement();
                sink.next("2*" + i + "=" + 2 * i);
                if (i == 5) {
                    // sink事件结束。
                    sink.complete();
                }
                return state;
            },
            // 3.序列终止时,输出状态值。
            (state) -> System.out.println("state:" + state)
    ).subscribe(System.out::println);
}





(2) create

create()方法的结构:

// 一个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数不写时默认为Buffer(背压策略之一)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
    return create(emitter, OverflowStrategy.BUFFER);
}

//两个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数为背压策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
    return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}
简单例子

Flux.create((t)->{
    t.next(1);
    t.next(2);
    t.complete();
}).subscribe(System.out::println);
//创建一个Flux,有多个元素,也就是可以多次发射复杂例子

private Flux<EventSource.Event> createFlux(FluxSink.OverflowStrategy strategy) {
        // 事件源注册了一个监听器,负责监听新事件的创建以及事件源的停止
        return Flux.create(sink -> eventSource.register(new MyListener() {
            @Override
            public void newEvent(EventSource.Event event) {
                System.out.println("上游------>数据源创建了新事件:" + event.getMsg());
                sink.next(event);
            }

            @Override
            public void eventSourceStopped() {
                sink.complete();
            }
        }), strategy); // 别忘了这里还有个背压策略的参数
    }Create() 方法跟Generate() 比较来看,有这么几个不同:

1、Create() 可以同步、也可以异步。而Generate() 是同步的。
//自定义一个事件源类EventSource:
public class EventSource {

private List<MyListener> listeners;
public EventSource() {
this.listeners = new ArrayList<>();
    }

public void register(MyListener listener){
listeners.add(listener);
    }

public void newEvent(Event event){
for (MyListener listener : listeners) {
listener.newEvent(event);
        }
    }

public void eventStopped(){
for (MyListener listener : listeners) {
listener.eventSourceStopped();
        }
    }

@Data
@AllArgsConstructor
public static class Event{
private Date time;
private String msg;
    }
}

//定义一个监听器接口MyListener
public interface MyListener<T> {
// 监听新事件
void newEvent(EventSource.Event event);
// 监听事件源的终止
void eventSourceStopped();
}

//测试方法
// 1.构建一个事件源
EventSource eventSource = new EventSource();
Flux.create(sink -> {
// 2.向事件源中添加一个监听器,负责监听事件的产生和事件源的停止。
// 相当于将事件转换成异步的事件流
eventSource.register(new MyListener() {
@Override
public void newEvent(EventSource.Event event) {
// 3.监听器收到事件回调的时候,会通过sink将事件发出
sink.next(event);
        }
@Override
public void eventSourceStopped() {
// 4.监听器收到事件源停止的回调后,由sink发出完成信号,停止sink
sink.complete();
        }
    });
}).subscribe(System.out::println);

// 5.循环产生订阅
for (int i = 0; i < 5; i++) {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
eventSource.newEvent(new EventSource.Event(new Date(), "事件" + i + "注册!"));
}
// 6.停止事件源
eventSource.eventStopped();



2、Create() 可以每次发出多个元素,而Generate() 每次发出一个。
//create()
Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
sink.next(i);
    }
sink.complete();
}).subscribe(System.out::println);
//generate()
Flux.generate(t->{
t.next(1);
t.complete();
}).subscribe(System.out::println);3、Create() 不需要状态值。
4、Create() 可以在回调中触发多个事件。
5、Create() 是多线程的。
(3) push

Create有一个变体模式:push()方法,和create()方法不同的是,他是一个单线程的,即只有一个线程可以调用next(),complete()方法。
(4) just

Flux.just("1","2").subscribe(System.out::println);
//just("1","2")可以列举生成序列的所有元素(这里是1、2),
//just()创建出来的序列,在发布元素后(subscribe())会自动结束(5) from(通过Publisher创建一个Flux)

//Flux->Flux
Flux.from(Flux.just("just", "just1", "just2")).subscribe(System.out::println);
//Mono->Mono
Flux.from(Mono.just("just")).subscribe(System.out::println);(6) fromArray(创建发出数组的Flux)

// fromArray()从一个数组对象中创建一个Flux对象
    Integer[] arr = new Integer[]{1, 2, 3};
    Flux<Integer> f4 = Flux.fromArray(arr);(7) fromIterable、fromStream

// fromIterable()从一个Iterable对象中创建一个Flux对象
    Flux<String> f2 = Flux.fromIterable(Arrays.asList("str1", "str2", "str3"));

// fromStream()从一个Stream对象中创建一个Flux对象
    ArrayList<Integer> numList = new ArrayList<>();
    numList.add(1);
    Flux<Integer> f3 = Flux.fromStream(numList.stream());(8) defer(推迟发送)

Flux.defer(()->Flux.range(1,10)).subscirbe(System.out::println);(9) range

Flux.range(1,10).subscirbe(System.out::println);
//range(int starter, int counter),创建包含从starter开始的counter个Integer对象的序列
//例如:range(1,10)创建序列,包含从1起始的10个数量的Integer对象
//Integer 类在对象中包装了一个基本类型 int 的值。该类提供了多个方法:能在 int 类型和 String 类型之间互相转换,还提供了处理 int 类型时非常有用的其他一些常量和方法(10) interval(Duration period )、interval(Duration delay, Duration period)

创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.just(2, 4)
    .flatMap(x -> Flux.interval(Duration.ofMillis(1000)).take(x))
    .toStream()
    .forEach(System.out::println);
//流中的元素被转换成每隔 1000毫秒产生的数量不同的流,再进行合并https://www.jianshu.com/p/8d80dcb4e7e0—>如何使用flatMap()
(11) intervalMillis(long period)、intervalMillis(long delay, long period)

与interval()方法相同,但该方法通过毫秒数来指定时间间隔和延迟时间
Flux.intervalMillis(1000).subscirbe(System.out::println);(12) empty()、never()、error()

    // 创建一个不包含任何元素,只发布结束消息的序列。
    // 并且这种方式不会进行后续传递,需要switchIfEmpty()方法来进行处理。
    // 因为响应式编程中,流的处理是基于元素的,而empty()是没有元素的!
    Flux<Object> empty = Flux.empty();
   
    // 创建一个只包含错误消息的序列,里面的参数类型是Throwable
    Flux<Object> error = Flux.error(new Exception("error!"));
   
    // 创建一个不包含任何消息通知的序列,注意区别empty(),empty还是会发布结束消息的。
    Flux<Object> never = Flux.never();
4.1.2 订阅Flux流: subscribe()

//subscribe():订阅Flux序列,只有进行订阅后才回触发数据流,不订阅就什么都不会发生

        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);
        integerFlux.subscribe(System.out::println);// lambda表达式
        
        Flux<String> stringFlux = Flux.just("hello", "world");
        stringFlux.subscribe(System.out::println);  (1) 完整参数subcribe()方法

/**
  * consumer:即如何处理数据元素,比如可以打印输出print。
  * errorConsumer:若发生错误如何处理,即错误信号(同时终止序列)。
  * completeConsumer:即完成信号(同时终止序列)。
  * subscriptionConsumer:订阅发生时候的处理逻辑
  */
public final Disposable subscribe(@Nullable Consumer<? super T> consumer,
                                  @Nullable Consumer<? super Throwable> errorConsumer,
                                  @Nullable Runnable completeConsumer,
                                  @Nullable Consumer<? super Subscription> subscriptionConsumer)
{
  return (Disposable)this.subscribeWith(new LambdaSubscriber(consumer, errorConsumer, completeConsumer, subscriptionConsumer));
}例如:
public class MySubscriber extends BaseSubscriber {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("订阅开始!");
        // 订阅时首先向上游请求1个元素
        request(1);
    }

    @Override
    protected void hookOnNext(Object value) {
        System.out.println("获取当前元素成功!" + value);
        // 每次处理完后再去请求1个元素
        request(1);
    }
}

public void testOut() {
    // 1.创建一个序列
    Flux<Integer> flux = Flux.range(1, 6)
            .map(i -> {
                if (i <= 4) {
                    return i * 2;
                }
                throw new RuntimeException("数据超过了5!");
            });
    // 2.订阅
    MySubscriber ss = new MySubscriber();
    flux.subscribe(System.out::println,
            error -> System.out.println("Error:" + error),
            () -> System.out.println("Complete!"),
            s -> ss.request(2));
    flux.subscribe(ss);
}

(2) Note:


  • subscribe()方法的第四个参数只能接收一个自定义的Subscriber,需要重写BaseSubscriber。
  • BaseSubscriber是一个抽象类,定义了多种用于处理不同信号的hook方法。
  • 至少要重写hookOnSubscribe()方法和hookOnNext()方法。
  • 若存在第四个参数,那么在对应的自定义订阅类中,对应重写的方法代替对应位置的lambda表达式。 比如我重写了onNext()方法,替换了原有lambda表达式中单纯的输出结果(加了几个中文字)。那么再简单点,说白了,BaseSubscriber是Lambda的替代品。
4.2 Mono(1\0个数据的抽象)

Mono作为Publisher,产生0-1个数据对象 ==> 数据对象交给operator()处理器进行处理 ==> 生成另外的数据对象
Mono 是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。
Flux 和 Mono 之间可以进行转换,例如:

  • 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象
  • 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。




  • 最多发出一项元素,并且以成功信号或者失败信号为终止信号。
//创建MONO
Mono.just("test hjf").subscribe(System.out::println);
Mono.empty().subscribe(System.out::println);
Mono.error(new Throwable()).subscribe(System.out::println);
Mono.never().subscribe(System.out::println);

//fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

//justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

//delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
Mono.delay(Duration.ofSeconds(10)).subscribe(System.out::println);

//ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
Mono.ignoreElements(null).subscribe(System.out::println);

//还可以通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);五、Lambda表达式

六、背压

两种支持异步的生成Flux的方法create()和push(),支持背压操作,两个方法的第二个参数用于声明背压策略。
// 一个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数不写时默认为Buffer(背压策略之一)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
    return create(emitter, OverflowStrategy.BUFFER);
}

//两个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数为背压策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
    return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}6.1 背压策略

public static enum OverflowStrategy {
    IGNORE,
    //忽略下游的背压请求,上游push消息过快时会直接报错:IllegalStateException
    ERROR,
    //上游push消息过快时会直接报错:IllegalStateException
    DROP,
    //下游直接丢弃处理不了的event
    LATEST,
    //让下游只从上游获取最新event
    BUFFER;
        //(默认值)以在下游无法跟上时缓冲所有信号。(这会实现无限缓冲,并可能导致OutOfMemoryError)
    private OverflowStrategy() {
    }
}6.2 request()

request()方法用于限制下游向上游的请求个数,避免下游压力过大
@Test
public void () {
    Flux<Integer> flux = Flux.range(1, 10).log();

    flux.subscribe(new BaseSubscriber<Integer>() {
        private int count = 0;
        private final int requestCount = 4;

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            //订阅时,每次只让上游发送requestCount个请求
            request(requestCount);
        }

        @SneakyThrows
        @Override
        protected void hookOnNext(Integer value) {
            count++;
            // 通过count控制每次request 4个元素
            //如果队列里已经有4个元素没处理,那么就睡眠1s
            if (count == requestCount) {
                Thread.sleep(1000);
                request(requestCount);
                count = 0;
            }
        }
    });
}结果(每个输出停顿1秒):


如果将变量requestCount改为2:结果会是2个2个输出


七、打包操作

用于调用外部公共方法。
transform()

//定义一个公共方法
//筛选出不以str开头的元素,并变成大写
Function <Flux<String>, Flux<String>> filterAndMap =
    f->f.filter(color ->!color.startsWith("str")).map(String::toUpperCase);

//定义Flux序列
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple", "yellow", "str1", "str2"))
    //调用外部函数
    .transform(filterAndMap)
    .subscribe(System.out::println);

compose()

//定义一个公共方法
AtomicInteger i = new AtomicInteger();
Function <Flux<String>,Flux<String>> function = f ->{
    //第1个线程就抛去“str1”,并转换成大写字母
    if(i.incrementAndGet()==1){
        return f.filter(color -> !color.equals("str1")).map(String::toUpperCase);
    }
    //不是第1个线程就抛去“str2”,并转换成大写字母
    return f.filter(color->!color.equals("str2")).map(String::toUpperCase);
}

//定义Flux序列
Flux<String> flux =
        Flux.fromIterable(Arrays.asList("str1", "str2", "str3", "str4"))
                // 调用外部定义好的function
                .compose(function);
//第1个线程
flux.subscribe(d -> System.out.println("订阅者1:获得数据" + d));
System.out.println("==============================================");
//第2个线程
flux.subscribe(d -> System.out.println("订阅者2:获得数据" + d));

如果将compose()改为transform(),结果如下:


结论如下:

  • compose()中打包的函数可以是有状态的,本案例中对应我们的AtomicInteger对象。
  • transform()打包的函数是无状态的。
AtomicInteger

在Java语言中,++i和i++操作并不是线程安全的而AtomicInteger是线程安全的,即使不使用synchronized关键字也能保证其是线程安全的,主要用于多线程间共享计数。
比如多个线程从列表中依次读取数据,使用AtomicInteger来计数,每次取第AtomicInteger个。而且由于AtomicInteger由硬件提供原子操作指令实现,在非激烈竞争的情况下,开销更小,速度更快。
AtomicInteger atomicInteger = new AtomicInteger();
//将原先的值进行了一个加1,但是返回的加1之前的值
atomicInteger.getAndIncrement()
//将原先的值进行了一个加1,返回的加1之后的值
atomicInteger.incrementAndGet()如果多线程想要借助AtomicInteger共享计数,需要在主线程(main)中创建并初始化AtomicInteger对象,再通过构造函数的引用传递才能实现多线程共享计数
public class TestThread extends Thread implements Runnable {
        public static void main(String[] args) {
                //使用此种方式,运行结果顺序递增,但是两个线程分别计数
         //两个线程分别新建了一个AtomicInterge实例对象,所以不会共享计数
                TestThread thread1 = new TestThread();
                TestThread thread2 = new TestThread();
                thread1.start();
                thread2.start();
               
                //使用此种方式,运行结果不一定顺序递增,但是统一计数
                AtomicInteger testAtomicInteger = new AtomicInteger(0);
        //两个线程使用同一个AtomicInterge实例对象,所以会共享计数
                TestThread thread1 = new TestThread(testAtomicInteger);
                TestThread thread2 = new TestThread(testAtomicInteger);
                thread1.start();
                thread2.start();
        }

        public TestThread() {
                // 构造函数,每次使用构造函数都会初始化一个AtomicInteger=0
                counterAtomicInteger = new AtomicInteger(0);
        }
       
        public TestThread(AtomicInteger counterAtomicInteger) {
                // 构造函数,每次使用构造函数都会初始化一个作为参数传入的AtomicInteger
                this.counterAtomicInteger = counterAtomicInteger;
        }
}八、Schedule定时器

很多情况下,我们的publisher是需要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。
Schedule是一个接口,定义了一些定时器中必须要实现的方法:
立即执行的:
Disposable schedule(Runnable task);延时执行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)定期执行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)8.1 Schedulers调度器

Schedule有一个工具类叫做Schedulers,它提供了多个创建Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创建Schedule。
Scheduler提供的静态方法用于创建如下几种线程环境:
(1) 当前线程:Schedulers.immediate()

提交的Runnable将会立马在当前线程执行。
(2) 可重用的单线程:Schedulers.single()

使用同一个线程来执行所有的任务。当各个调用者调用这个方法的时候,都会重用同一个线程,直到这个Scheduler的状态被设定为disposed。
(3) 弹性线程池:Schedulers.elastic()

该方法是一种将阻塞处理放在一个单独的线程中执行的很好的方式,即可以拿来包装一个堵塞方法,将其变为异步。
1.当首次调用这个方法的时候,会创建一个新的线程池,而且这个线程池中闲置的线程可以被重用。 2.如果一个线程的闲置时间太长(默认60s),则会被销毁。
(4) 固定大小线程池:Schedulers.parallel()

创建固定个数的工作线程,个数和CPU的核数相关。
(5) Schedulers.fromExecutorService(ExecutorService)

从一个现有的线程池创建Scheduler。
(6) Schedulers.newXXX

Schedulers提供了很多new开头的方法,来创建各种各样的Scheduler。
//可以指定特定的Scheduler来产生元素
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))8.2 注意事项


  • 某些Reactor的操作已经是默认使用了特定类型的调度器。例如Flux.interval()创建的源,使用了Schedulers.parallel()等。
  • 可以通过publishOn和subscribeOn来进行切换Scheduler的执行上下文。publishOn()切换的是元素消费操作执行时所在的线程。subscribeOn()切换的是源中元素生产操作执行时所在的线程。
九、publishOn() / subscribeOn()

Flux和Mono不会创建线程,只有当触发subscribe()操作时才会执行对应的方法。
而有些操作符,例如publishOn()subscribeOn()方法,能够创建线程。publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。
9.1 publishOn():

publishOn() 会强制下一个Operator(或许是下一个的下一个…)运行于不同的线程上。也就相当于publishOn()后面的逻辑都执行在另外一个线程上了
例1:

Flux.range(1, 2)
    .map(i -> {
         System.out.println(Thread.currentThread().getName());
         return i;
     })
     .publishOn(Schedulers.single())
     .map(i -> {
         System.out.println(Thread.currentThread().getName());
         return i;
      })
      .publishOn(Schedulers.newParallel("parallel", 4))
      .map(i -> {
          System.out.println(Thread.currentThread().getName());
          return i;
       })
       .subscribe();


流程是:


例2:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i + ":"+ Thread.currentThread())
    .publishOn(s)
    .map(i -> "value " + i+":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
System.out.println(Thread.currentThread());
Thread.sleep(5000);

  • 上面创建了一个名字为parallel-scheduler的scheduler。
  • 然后创建了一个Flux,Flux先做了一个map操作,然后切换执行上下文到parallel-scheduler线程,最后右执行了一次map操作。
  • 最后,我们采用一个新的线程来进行subscribe的输出。
输出结果:
Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]可以看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。
那么在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。
9.2 subscribeOn

subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i + ":" + Thread.currentThread())
    .subscribeOn(s)
    .map(i -> "value " + i + ":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
Thread.sleep(5000);例子同样使用了两个map,然后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。输出结果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]可以看到,不管哪个map,都是用的是切换过的parallel-scheduler。
9.3 publishOn和subscribeOn的区别


  • publishOn一般使用在订阅链的中间位置,并且从下游获取信号,影响调用位置起后续运算的执行位置
  • subscribeOn一般用于构造向后传播的订阅过程。并且无论放到什么位置,它始终会影响源发射的上下文。同时不会影响对publishOn的后续调用的行为。
  • publishOn会强制让下一个运算符(或者下下个)运行于不同的线程上,subscribeOn会强制让上一个(或者上上个)运算符在不同的线程上执行。
十、Processor

https://blog.csdn.net/get_set/article/details/79799895
https://blog.csdn.net/Zong_0915/article/details/115048153
Processor既是一个Publisher也是一个Subscriber,是一个接口。 所以能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列。
其中的两个实现类:

  • MonoProcessor
  • FluxProcessor
而FluxProcessor又可以衍生出多种Processor,负责应对不同的场景使用:

  • UnicastProcessor
  • DirectProcessor
  • EmitterProcessor
  • ReplayProcessor
  • TopicProcessor
  • WorkQueueProcessor
十一、Context

https://blog.csdn.net/Zong_0915/article/details/115397437
https://blog.csdn.net/LCBUSHIHAHA/article/details/114837031#:~:text=Reactor%E4%B8%AD,ue%E7%9A%84%E5%BD%A2%E5%BC%8F%E5%AD%98%E5%82%A8%E3%80%82
11.1 Context用法:


  • Mono.deferContextual(c-> c.get(key)):获得Context中指定Key对应的Value值。
  • contextWrite(c -> c.put(key, value):往Context中塞入一个键值对。
  • 简单用法的格式:Mono.deferContextual(c ->Mono.just(c.get(key)))
  • contextWrite只对它上游的操作生效,对下游的操作不生效。
  • 如果下游有多个contestWriter,上游会从邻近的contextWrite中读取上下文的信息。
11.2 Context用例

例子1

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono
             .deferContextual(ctx ->Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "World"));

  • 使用contextWrite()往Context中写入一个键值对,key:message,value:World。
  • 在faltMap()中,通过Mono.deferContextual()获得一个Context对象,可以取出对应key的value值,并进行拼接。
  • 结果是Hello World
结果说明:订阅是从下游流向上游的,contextWrite只对它上游的操作生效,对下游的操作不生效,在上边例子中也就是先调用了contextWrite(),然后才调用了Mono.flatMap()。
例子2:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "Reactor"))
    .contextWrite(ctx -> ctx.put(key, "World"));


结果是Hello Reactor
例子3 ??:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))//3
    .contextWrite(ctx -> ctx.put(key, "Reactor"))//2
    .flatMap( s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key))))//4
    .contextWrite(ctx -> ctx.put(key, "World"));//1结果为Hello Reactor World
第一个需要获取上下文的deferContextual从邻近下游读取到Reactor,第二个flatMap邻近下游读取到的World。
例子4

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
    )
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
        .contextWrite(ctx -> ctx.put(key, "Reactor"))
    )
    .contextWrite(ctx -> ctx.put(key, "World")); 结果为Hello World Reactor
因为第一个contextWrite只影响内部的操作流。第二个contextWrite只会影响外部主操作流。
11.3 Spring WebFlux处理Context

思路:根据Reactor Context只对上游生效的特性,WebFlux只用在后置处理器中做contextWrite操作就可以了。遗憾的是在Spring WebFlux中没有HandlerInterceptorAdapter,但是可以使用过滤器WebFilter。
@Component
@Slf4j
public class AuthFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain
            .filter(exchange)
            .contextWrite(ctx -> {
                log.info("设置context内容。");
                return  ctx.put("token", "xx");
                //注意这里一定要return ctx.put
                //put操作是产生一个新的context。如果是return ctx;会导致值没有设置进去
        });
    }}按照上面的处理方式,在我们业务开发中就能取到上下文的信息了。
@PostMapping(name = "测试", value = "/save")
    public Mono<Test> save(@RequestBody Mono<Test> testMono) {
        return testService
            .add(testMono)
            .log("", Level.INFO, true)
            .flatMap(test -> {
                return Mono
                    .contextWrite()
                    .map(ctx -> {
                        log.info("context:" + ctx.get("token"));
                        return test;
            });
        });
    }如果想设置多个值可以像下面这样操作。
@Component
public class WebFluxFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain
            .filter(exchange)
            .contextWrite(ctx -> {
                return ctx.put("token", "123123123123");
            })
            .contextWrite(ctx -> {
                return ctx.put("liu", "hah");
            });
    }
}十二、Reactor doOn系列函数

在Publisher使用subscribe()方法的时候,Subscriber触发回触发一系列的on方法,如onSubscribe();为了更好的监控以及观测异步序列的传递情况,设置了一系列的doOn方法,在触发onxxx方法的时候作为其副作\附加行为,用于监控行为的运行情况。
12.1 官网doOn方法解释


  • Without modifying the final sequence, I want to:

    • get notified of / execute additional behavior (sometimes referred to as "side-effects") on:
      (也就是在不改变Flux或Mono的情况下,以下情况发生时想被通知/想执行额外的行为)

      • emissions: doOnNext (Flux|Mono)
      • completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)
      • error termination: doOnError (Flux|Mono)
      • cancellation: doOnCancel (Flux|Mono)
      • "start" of the sequence: doFirst (Flux|Mono)

        • this is tied to Publisher#subscribe(Subscriber)

      • post-subscription : doOnSubscribe (Flux|Mono)

        • Subscription acknowledgment after subscribe
        • this is tied to Subscriber#onSubscribe(Subscription)

      • request: doOnRequest (Flux|Mono)
      • completion or error: doOnTerminate (Flux|Mono)

        • but after it has been propagated downstream: doAfterTerminate (Flux|Mono)

      • any type of signal, represented as a Signal: doOnEach (Flux|Mono)
      • any terminating condition (complete, error, cancel): doFinally (Flux|Mono)

    • log what happens internally: log (Flux|Mono)

  • I want to know of all events:

    • each represented as Signal object:

      • in a callback outside the sequence: doOnEach (Flux|Mono)
      • instead of the original onNext emissions: materialize (Flux|Mono)

        • …and get back to the onNexts: dematerialize (Flux|Mono)


    • as a line in a log: log (Flux|Mono)

12.1 常见doOn方法


  • doOnEach(): 对每一个元素对应的single对象进行监控





  • doOnSubscribe(): 用以监控onSubscribe()方法的执行
/**
Add behavior (side-effect) triggered when the Flux is subscribed.
param onSubscribe the callback to call on onSubscribe
*/
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe");
return doOnSignal(this, onSubscribe, null, null, null, null, null, null);
    }

  • doOnRequest:对request行为监控产生副作用
  • doOnNext:onNext副作用





  • doOnError:出现error时的副作用,用于监控报错,可以通过错误类型进行筛选
  • doOnComplete:完成时触犯
  • doOnCancel:取消时触发
  • doOnTerminate:终止时触发,无论是成功还是出现异常
例子:
@Test
public void doOnWithMono () {
Mono.just("ffzs")
    .map(String::toUpperCase)
    .doOnSubscribe(subscription -> log.info("test do on subscribe"))
    .doOnRequest(longNumber -> log.info("test do on request"))
    .doOnNext(next -> log.info("test do on next1, value is {}", next))
    .map(String::toLowerCase)
    .doOnNext(next -> log.info("test do on next2, value is {}", next))
    .doOnSuccess(success -> log.info("test do on success: {}", success))
    .subscribe();
    }

@Test
public void doOnWithFlux () {
    Flux.range(1,10)
        .map(i -> {
            if (i == 3) throw new RuntimeException("fake a mistake");
            else return String.valueOf(i);
        })
        .doOnError(error -> log.error("test do on error, error msg is: {}", error.getMessage()))
        .doOnEach(info -> log.info("do on Each: {}", info.get()))
        .doOnComplete(() -> log.info("test do on complete"))  // 因为error没有完成时不触发
        .doOnTerminate(() -> log.info("test do on terminate")) // 无论完成与否,只要终止就触发
        .subscribe();
}
12.2 log()

reactor提供了一个很便利的监控方法:log()。在编写publisher的时候加上log(),在subscriber调用的时候会将触发的每一个behavior以日志的形式打印出来:
@Test
public void logTest () {
    Flux.range(1,5)
        .map(i -> {
            if (i == 3) throw new RuntimeException("fake a mistake");
            else return String.valueOf(i);
        })
        .onErrorContinue((e, val) -> log.error("error type is: {}, msg is : {}", e.getClass(), e.getMessage()))
        .log()
        .subscribe();
}效果如下,日志内容很详细,线程使用,onNext,request这些都会标明

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|办公问答网

GMT+8, 2025-3-16 12:26 , Processed in 0.090531 second(s), 22 queries .

Powered by Discuz! X3.4

© 2001-2013 Comsenz Inc. Templated By 【未来科技 www.veikei.com】设计

快速回复 返回顶部 返回列表