Google Agera vs. ReactiveX

原文

介绍

如果你经常关注Android开发动态的话,或者关注Reactive相关的动态,最近Google有个重大发布。它们发布了针对于Android的反应式编程库:Agera

By GoogleGoogle一个从事Google Play Movies的小组。当然,这听起来更像Google.

不管是谁发布的,我们要关注的是,它与现有的反应式编程库RxJava,Reactor以及Akka-Streams有什么区别

核心API

Agera基于少值观察者模式:被观察者通过update()拿到更新和变化的信号.然后响应这些变化来算出什么改变了。以下是一个无参反应式数据流,它依赖一端update

1
2
3
4
5
6
7
8
interface Updatable {
void update();
}

interface Observable {
void addUpdatable(Updatable u);
void removeUpdatable(Updatable u);
}

它们看起来挺合理的?不幸的事,它们也有java.util.Observable和其他基于addListener/removeListener的反应式API

Agera Observable

这对方法的问题是每个添加了Updatable行为的Observable都不得不去记住原始的Updatable以便能移除同样的Updatable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public final class DoOnUpdate implements Observable {
final Observable source;

final Runnable action;

final ConcurrentHashMap<Updatable, DoOnUpdatable> map;

public DoOnUpdate(Observable source, Runnable action) {
this.source = source;
this.action = action;
this.map = new ConcurrentHashMap<>();
}

@Override
public void addUpdatable(Updatable u) {
DoOnUpdatable wrapper = new DoOnUpdatable(u, action);
if (map.putIfAbsent(u, wrapper) != null) {
throw new IllegalStateException("Updatable already registered");
}
source.addUpdatable(wrapper);
}

public void removeUpdatable(Updatable u) {
DoOnUpdatable wrapper = map.remove(u);
if (wrapper == null) {
throw new IllegalStateException("Updatable already removed");
}
source.removeUpdatable(wrapper);
}

static final class DoOnUpdatable {
final Updatable actual;

final Runnable run;

public DoOnUpdatable(Updatable actual, Runnable run) {
this.actual = actual;
this.run = run;
}

@Override
public void update() {
run.run();
actual.update();
}
}
}

这导致一个争论点,在管道的每个阶段独立的下游Updatabels。是的,RxJava's SubjectsConnectableObservables也有类似的争议点,但它们之后的链式操作符不会有争议。不幸的是,反应式流规范,在当前版本,Publishers也有类似的问题。现在RxJava2.x,RscReactor完全忽略了这些东西,结果是操作起来变得更严格了。

第二个微不足道的问题是你不能多次添加相同的Updatable。因为你无法在不同的subscriptions中通过Map区分它们,如果这么干,会出现异常。通常这是很少发生的,因为大多数的终端需求都很单一。

第三个比较大的问题:当Updatable不再注册在Observable时抛出。这在终端用户触发移除操作时,而这个操作又引发别的操作,它们当中一个会抛出异常。这就是为何现在反应式变成不能够被取消的缘故。

第四个理论问题,addUpdatableremoveUpdatable会相互竞争,一些下游的操作符想在某个上游操作符已经调用了addUpdatable之后断开。一个可能的问题是removeUpdate调用抛出异常而addUpdatable成功,这回导致信号流向任何地方和导致相关的对象不想要的持有。

Agera Updatable

我们来从消费者角度看看API。Updatable是一个单一函数接口的方法,这让它能简单的给Observable添加监听

1
2
Observable source = ...
source.addUpdatable(() -> System.out.println("Something happened"));

相当简单,现在我们移除我们的监听

1
source.removeUpdatable(() -> System.out.println("Something happened");

这会产生一个异常:两个lambdas不是相同的对象。这在基于addListener/removeListener的API当中是相当常见的问题。解决方案是存储lambda,当需要的时候去用它:

1
2
3
4
5
6
7
Updatable u = () -> System.out.println("Something happened");

source.addUpdatable(u);

// ...

source.removeUpdatable(u);

有点不方便,但不会更糟了。如果你有很多Observables和很多Updatables呢?你需要记住谁注册在谁上,在相同的字段保持引用它们。Rx.NET的原始设计有个好主意来减少这种必须的单引用:

1
2
3
4
5
6
7
8
9
interface Removable extends Closeable {
@Override
void close(); // remove the necessity of try-catch around close()
}

public static Removable registerWith(Observable source, Updatable consumer) {
source.addUpdatable(consumer);
return () -> source.removeUpdatable(consumer);
}

当然,我们在调用close()时同样需要考虑:

1
2
3
4
5
6
7
8
9
public static Removable registerWith(Observable source, Updatable consumer) {
source.addUpdatable(consumer);
final AtomicBoolean once = new AtomicBoolean();
return () -> {
if (once.compareAndSet(false, true)) {
source.removeUpdatable(consumer);
}
});
}

Agera MutableRepository

如果值发生变化,Agera MutableRepository会通过update()通知注册的Updatables。这和BehaviorSubject很像,区别是新值如果不调用get()不会流到消费者:

1
2
3
4
5
6
7
MutableRepository<integer> repo = Repositories.mutableRepository(0);

repo.addUpdatable(() -> System.out.println("Value: " + repo.get());

new Thread(() -> {
repo.accept(1);
}).start();

当通过工厂方法创建仓库时,Looper中会调用update()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Set<Integer> set = new HashSet<>();

MutableRepository<integer> repo = Repositories.mutableRepository(0);

repo.addUpdatable(() -> set.add(repo.get()));

new Thread(() -> {
for (int i = 0; i < 100_000; i++) {
repo.accept(i);
}
}).start();

Thread.sleep(20_000);

System.out.println(set.size());

20秒之后Set最终会有多大?有人可能会认为是100,000。事实上,值可能在1~100,000之间!原因是accept()get()并发运行,如果消费者运行比较慢,accept()会覆盖当前仓库的值。

错误处理

使用异步通常意味着会碰到异步相关的问题。RxJava和其他类似的东西都会有这种问题:出现某种错误了,这个流程自动清掉,而开发者希望的是能够重新开始。错误和清理在某些情况下很复杂,成熟的库都花费了大量的精力来解决这些问题,因此开发者不需要在这上面浪费很多时间。

Agera基础API不会自己处理错误,你需要自己对错误进行处理。如果你用Agera组成多个服务,你需要建立自己的错误处理框架.由于并发和中断状态使它处理起来很笨重和延迟。

终结

Agera对一个完成流不会发出通知,你需要自己知道它什么时候会完成。这在简单用户界面上不会有什么问题。然而,后台异步操作需要知道要发出多少信号,没有数据的话,你如何收到update()相关通知

如何设计现代化的无参反应式API

看看以下例子:

1
2
3
4
5
6
7
8
9
rx.Observable<Void> signaller = ...

rx.Observer<Void> consumer = ...

Subscription s = signaller.subscribe(consumer);

// ...

s.unsubscribe();

你很简单的就能获得它该有的功能。你如果要处理别的类型信号,将Void替换成你想要的类型就可以了。

如果一个库由于需要学习很多操作符而让人感到很笨重的话,你可以拷贝它,删除你不需要的东西。当然,你需要更新修复问题和性能优化后的代码。

如果拷贝和修改听起来不够有吸引力,你能按照Reactive-Streams规定开发自己的库;Publisher<Void>,<Subscriber<Void>和其他东西。你能很方便的与其他的Reactive-Streams库一起工作,你能通过它的兼容性测试来测试你的库。

当然,编写反应式库很麻烦,按照Reactive-Streams编写更麻烦。所以,综合考虑,你可以拓展API

如果你真的想要编写无参反应式流,这有几个你需要考虑的建议:

1)不要分开addListenerremoveListener单一的入口简化中间操作的开发。

1
2
3
4
5
6
7
interface Observable {
Removable register(Updatable u);
}

interface Removable {
void remove();
}

2)考虑支持取消和移除而不是返回一个可以取消的令牌或者可移除的动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
interface Observable {
void register(Updatable u);
}

interface Updatable {
void onRegister(Removable remover);
void update();
}

// or

interface Updatable {
void update(Removable remover);
}

3)考虑至少添加一个错误信号接收:

1
2
3
4
5
interface Updatable {
void onRegister(Removable remover);
void update();
void error(Throwable ex);
}

4)考虑提供队列的异步操作.

结论

其实作者就是来黑Agera的。