响应式编程

响应式编程

Scroll Down

响应式编程

响应式

什么是响应性?

响应性指的是服务的应用程序拥有对变化做出响应的能力,即服务的应用程序可以对外部的负载、可用性、压力等可能影响系统响应用户请求的能力。

观察者模式 事件发生 —> 通知订阅者

传统命令式调用方式:调用函数 —> 阻塞等待返回 浪费资源和时间

传统命令式编程处理并发问题解决方案:

  • 使用线程池 ,每收到一个请求就是用一个线程去处理

响应式编程调用方式: 订阅消息 —> 处理其他请求,结果主动推送回来。

相应式编程的好处:

  • 异步非阻塞
  • 使用有限(相对少量)线程,减少线程上下文切换开销
  • 通过生产者的速度来决定消费者的速度(背压)以至于当大量请求进来时服务不至于被压垮

响应式宣言

The Reactive Manifesto

基于响应式宣言的几个特性

  • 即时响应性: :只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。
  • **回弹性:**系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。
  • 弹性: 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
  • **消息驱动:**反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

响应式规范

基于响应式宣言,开发出对应的基于java的响应式规范

reactive-streams/reactive-streams-jvm

该java响应式规范抽象出4个具体的类型

  • Publisher

      public interface Publisher<T>; {
          public void subscribe(Subscriber<? super T> s);
      }
    
  • Subscriber

      public interface Subscriber<T>; {
          public void onSubscribe(Subscription s);
          public void onNext(T t);
          public void onError(Throwable t);
          public void onComplete();
      }
    
  • Subscription

      public interface Subscription {
          public void request(long n);
          public void cancel();
      }
    
  • Processor

      public interface Processor<T,R>; extends Subscriber<T>, Publisher<R> {
      }
    

响应式实现 — Project Reactor

后续spring团队实现了上面的响应式规范,开发出了project reactor 这个库,用于实现真正的响应式编程

Reactor 3 Reference Guide

编程查询

Reactor 3 Reference Guide

reactor 响应式流的生命周期

  1. 组装时(assembly-time)

组装就是reactor里的各种操作符(operator)进行组装的过程。类似java的构建器模式(builder pattern) ,例如下面的例子,演示了在假设没有Reactor api的情况下,流程组装的可能表现形式

 Flux<Integer> flux = Flux.just("123445", "123234", "123", "2345", "12")
              .map(Integer::parseInt)
              .filter(p -> p < 1000);

//如果没有Reactor 流式api的情况下,流程的组装会呈现以下形式
Flux<String> just = Flux.just("123445", "123234", "123", "2345", "12");
Flux fluxMap = new FluxMap(just,Integer::parseInt);
Flux filterFlux = new FluxFilter(fluxMap,p -> p < 1000);

所以,在底层,Flux和Mono在组装时对象是相互组合的。在组装过程后,我们得到的是一个Publisher链。每个新的Publisher包装了前一个

 //publisher链
 FluxFilter{
         FluxMap{
             FluxArray(&quot;123445&quot;, &quot;123234&quot;, &quot;123&quot;, &quot;2345&quot;, &quot;12&quot;)
         }
 }
  1. 订阅时 (subscription-time)

当我们subscribe对应的publisher链时,该响应式流就进入了下一个阶段—订阅时阶段。代码如下所示

 flux.subscribe(System.out::Println);

对一个Publisher链,一旦subscribe了顶层包装器,就开始了对整个publisher链的订阅过程。下面的伪代码展示了一个Subscriber在订阅时如何通过Subscriber链进行传播

 filterFlux.subscribe(Subscriber){
         mapFlux.subscribe(new FilterSubscriber(Subscriber)){
                 arrayFlux.subscribe(new MapSubscriber(FilterSubscriber(Subscriber))){
                         //这里开始推送真正的元素
                 }            
         }
 }

可以看到,其实就是根据Publish链条从外到里调用Publisher的subscribe()方法,并将Subscriber传进去。最后我们可以得到如下的一个订阅者序列

 //subscriber链
 ArraySubscriber{
         MapSubsacriber{
                 FilterSubscriber{
                         Subscriber
                 }    
         }
 }
  1. 运行时(runtime)

最后就来到了最后一个阶段,运行时阶段。在该阶段,我们再Publisher和Subscriber之间进行信号交换,在此时,顶端的数据源会调用onSubscribe方法,构建Subscribtion对象并传递给给定的Subscriber ,在上面的例子中,顶端数据源则是ArrayPublisher,它会将它的Subscription对象传给下面的Subscriber,代码如下

 MapSubscriber(FilterSubscriber(Subscriber)).onSubscribe(new ArraySubscription()){
         FilterSubscriber(Subscriber).onSubscribe(    new MapSubscription(ArraySubscription(...))){
                 Subscriber.onSubscribe(new FilterSubscription(MapSubscription(ArraySubscription))))
         }
 }

subscription在进入最里层后,我们就构建出了Subscription包装器的金字塔,如下所示:

 FilterSubscription{
         MapSubscription{
                 ArraySubscription()        
         }
 }

最终,调用Subscription.request方法开始请求元素,并将请求的数据返回代码如下:

 FilterSubscription(MapSubscription(ArraySubscription(...))).
         request(1){
                 MapSubscription(ArraySubscription)
                         .request(1){
                                 ArraySubscription(...)
                                         .request(1){
                                             //开始发送数据
                                                 MapSubscriber(FilterSubscriber(Subscriber)).onNext(&quot;1&quot;){
                                                         //在此处执行Map操作
                                                         FilterSubscriber(Subscriber).onNext(&quot;1&quot;){
                                                                 //在此处执行Filter操作
                                                                 Subscriber.onNext(&quot;1&quot;)
                                                         }
                                                 }
                                         }
                         }
         }