Google Agera vs. ReactiveX
介绍
如果你经常关注Android
开发动态的话,或者关注Reactive
相关的动态,最近Google
有个重大发布。它们发布了针对于Android
的反应式编程库:Agera
。
By Google
是Google
一个从事Google Play Movies
的小组。当然,这听起来更像Google
.
不管是谁发布的,我们要关注的是,它与现有的反应式编程库RxJava
,Reactor
以及Akka-Streams
有什么区别
核心API
Agera
基于少值观察者模式:被观察者通过update()
拿到更新和变化的信号.然后响应这些变化来算出什么改变了。以下是一个无参反应式数据流,它依赖一端update
1 | interface Updatable { |
它们看起来挺合理的?不幸的事,它们也有java.util.Observable
和其他基于addListener/removeListener
的反应式API
Agera Observable
这对方法的问题是每个添加了Updatable
行为的Observable
都不得不去记住原始的Updatable
以便能移除同样的Updatable
:
1 | public final class DoOnUpdate implements Observable { |
这导致一个争论点,在管道的每个阶段独立的下游Updatabels
。是的,RxJava's Subjects
和ConnectableObservables
也有类似的争议点,但它们之后的链式操作符不会有争议。不幸的是,反应式流规范,在当前版本,Publishers
也有类似的问题。现在RxJava2.x
,Rsc
和Reactor
完全忽略了这些东西,结果是操作起来变得更严格了。
第二个微不足道的问题是你不能多次添加相同的Updatable
。因为你无法在不同的subscriptions
中通过Map
区分它们,如果这么干,会出现异常。通常这是很少发生的,因为大多数的终端需求都很单一。
第三个比较大的问题:当Updatable
不再注册在Observable
时抛出。这在终端用户触发移除操作时,而这个操作又引发别的操作,它们当中一个会抛出异常。这就是为何现在反应式变成不能够被取消的缘故。
第四个理论问题,addUpdatable
和removeUpdatable
会相互竞争,一些下游的操作符想在某个上游操作符已经调用了addUpdatable
之后断开。一个可能的问题是removeUpdate
调用抛出异常而addUpdatable
成功,这回导致信号流向任何地方和导致相关的对象不想要的持有。
Agera Updatable
我们来从消费者角度看看API。Updatable
是一个单一函数接口的方法,这让它能简单的给Observable
添加监听
1 | Observable source = ... |
相当简单,现在我们移除我们的监听
1 | source.removeUpdatable(() -> System.out.println("Something happened"); |
这会产生一个异常:两个lambdas
不是相同的对象。这在基于addListener/removeListener
的API当中是相当常见的问题。解决方案是存储lambda
,当需要的时候去用它:
1 | Updatable u = () -> System.out.println("Something happened"); |
有点不方便,但不会更糟了。如果你有很多Observables
和很多Updatables
呢?你需要记住谁注册在谁上,在相同的字段保持引用它们。Rx.NET
的原始设计有个好主意来减少这种必须的单引用:
1 | interface Removable extends Closeable { |
当然,我们在调用close()
时同样需要考虑:
1 | public static Removable registerWith(Observable source, Updatable consumer) { |
Agera MutableRepository
如果值发生变化,Agera MutableRepository
会通过update()
通知注册的Updatables
。这和BehaviorSubject
很像,区别是新值如果不调用get()
不会流到消费者:
1 | MutableRepository<integer> repo = Repositories.mutableRepository(0); |
当通过工厂方法创建仓库时,Looper
中会调用update()
。
1 | Set<Integer> set = new HashSet<>(); |
20秒之后Set
最终会有多大?有人可能会认为是100,000。事实上,值可能在1~100,000之间!原因是accept()
和get()
并发运行,如果消费者运行比较慢,accept()
会覆盖当前仓库的值。
错误处理
使用异步通常意味着会碰到异步相关的问题。RxJava
和其他类似的东西都会有这种问题:出现某种错误了,这个流程自动清掉,而开发者希望的是能够重新开始。错误和清理在某些情况下很复杂,成熟的库都花费了大量的精力来解决这些问题,因此开发者不需要在这上面浪费很多时间。
Agera
基础API不会自己处理错误,你需要自己对错误进行处理。如果你用Agera
组成多个服务,你需要建立自己的错误处理框架.由于并发和中断状态使它处理起来很笨重和延迟。
终结
Agera
对一个完成流不会发出通知,你需要自己知道它什么时候会完成。这在简单用户界面上不会有什么问题。然而,后台异步操作需要知道要发出多少信号,没有数据的话,你如何收到update()
相关通知
如何设计现代化的无参反应式API
看看以下例子:
1 | rx.Observable<Void> signaller = ... |
你很简单的就能获得它该有的功能。你如果要处理别的类型信号,将Void
替换成你想要的类型就可以了。
如果一个库由于需要学习很多操作符而让人感到很笨重的话,你可以拷贝它,删除你不需要的东西。当然,你需要更新修复问题和性能优化后的代码。
如果拷贝和修改听起来不够有吸引力,你能按照Reactive-Streams
规定开发自己的库;Publisher<Void>
,<Subscriber<Void>
和其他东西。你能很方便的与其他的Reactive-Streams
库一起工作,你能通过它的兼容性测试来测试你的库。
当然,编写反应式库很麻烦,按照Reactive-Streams
编写更麻烦。所以,综合考虑,你可以拓展API
如果你真的想要编写无参反应式流,这有几个你需要考虑的建议:
1)不要分开addListener
和removeListener
。单一的入口简化中间操作的开发。
1 | interface Observable { |
2)考虑支持取消和移除而不是返回一个可以取消的令牌或者可移除的动作:
1 | interface Observable { |
3)考虑至少添加一个错误信号接收:
1 | interface Updatable { |
4)考虑提供队列的异步操作.
结论
其实作者就是来黑Agera
的。