函数响应式领域建模(二)
响应式编程
通过new CallBack
从调用者和服务提供者的角度来看,阻塞、非阻塞以及同步、异步可以这么理解:
- 阻塞和非阻塞反映的是调用者的状态,当调用者调用了服务提供者的方法后,如果一直在等
待结果返回,否则无法执行后续的操作,那就是阻塞状态;如果调用之后直接返回,从而可以
继续执行后续的操作,那可以理解为非阻塞的。
- 同步和异步反映的是服务提供者的能力,当调用者调用了服务提供者的方法后,如果服务提
供者能够立马返回,并在处理完成后通过某种方式通知到调用者,那可以理解为异步的;否
则,如果只是在处理完成后才返回,或者需要调用者再去主动查询处理是否完成,就可以理解
为是同步的。
举个例子,老刘买了个洗衣机,当他启动了洗衣机后如果一直在等待洗衣机工作结束好晾衣
服,那他就是阻塞的;如果他启动洗衣机之后就去看电视了,估摸快洗完了就时不时来看看,
那他就是非阻塞的,因为老刘可以去做另一件事。但老刘不能知道这洗衣机啥时候洗完/是否洗
完,那么这台洗衣机就是同步方式工作的;老刘后来换了一台可以在洗完衣服播放音乐的洗衣
机,这样就不用时不时来看了,虽然启动之后洗衣机不能立刻返回给老刘干净的衣服,但是可
以在工作完成之后通知在看电视的老刘,所以新的洗衣机就是异步工作的。
响应式流的两个核心特点:
- 异步非阻塞
- 基于回压机制的流量控制
响应式编程=响应式流+变化传递+声明式范式
只有一个抽象方法的接口叫做函数式接口,这种接口可以用lambda来展示
在Reactor中有两种流,分别是Mono和Flux,其中:
- Mono 代表0到1个元素的数据流
- Flux 代表0到n个元素的数据流
Operator操作数据流中的每一个元素,不管是Mono还是Flux都是一个一个元素并发的操作的,直到遇到终止元素或者错误元素,一个数据流才算结束
zip操作可以把两组或多组元素,对齐发出(感觉可以用来做定时)
Mono Flux可以通过fromXXX得到对应类型组成的数据流,比如Mono.fromCallback(()->{})可以得到对应的callback生成的数据流,以及如fromArray等
通过类似ExecutorService的方式使用Schedulers来得到reactors内置的线程池,通过subscribleOn(Schedulers)将基于Callable的数据流分配给线程池来执行
只有subscrible方法调用的时候才会触发数据流
使用SSE(Server Sent Events)需要在response中加入Content-type=text/event-stream, 如果要用json,则申明application/stream+json(produces = MediaType.APPLICATION_STREAM_JSON)
在响应式编程里,每一个步骤调用都是异步非阻塞的,这样每个线程都在异步地调用别的线程,这样线程就可以一直在做事,而不是被阻塞住了,也就是说每个线程都只是不停重复地做一件事情,每个线程独立地负责一件事而不是一个线程贯穿所有,所以线程在其中是纵向的,一个完整的业务的执行由多个线程,每个线程执行一小部分来完成
其实就是A异步调用B,B异步调用C,所有线程都只做自己该做的事情,并且不被调用其他服务所卡住
通过调用Plusher的subscribe(Subscriber s)方法来触发Subscriber的onSubscribe(Subscription s)的方法,从而开始数据的处理,Subscriber通过Subscription#request(long n)来向Publisher索要数据