0%

对象展示

Python中有两种方式将对象以字符串的形式表示

repr()

返回开发者想要到的字符串形式

str()

返回用户想要的字符串形式

通过实现特殊的方法__repr____str__来支持repr()str()

有两个额外的方法来支持对象的展示形式__bytes____format__.__byte__方法和__str__类似,它被bytes()调用来显示对象的字节序列。__format__用于格式化对象显示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Vector2d:
typecode = 'd'

def __init__(self, x, y):
self.x = x
self.y = y

def __iter__(self):
return (i for i in (self.x, self.y))

def __repr__(self):
class_name = type(self).__name__
return '{}({!r}, {!r})'.format(class_name, *self)

def __str__(self):
return str(tuple(self))

def __bytes__(self):
return bytes([ord(self.typecode)]) + bytes(array(self.typecode, self))

def __eq__(self, other):
return tuple(self) == tuple(other)

def __abs__(self):
return math.hypot(self.x, self.y)

def __bool__(self):
return bool(abs(self))

替代构造函数

因为能够以字节的形式导出Vector2d,因此需要一个方法能够从二进制序列中导出一个对象。标准库array中有这么一个方法frombytes

1
2
3
4
5
@classmethod
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode)
return cls(*memv)

classmethod vs staticmethod

classmethod对类而不是实例进行操作,其改变了方法调用的方式,它接受类自身作为第一个参数,最常用于替代构造函数

staticmethod改变方法以便它收到的第一个参数不是特殊参数。静态方法就像一个纯净的函数存活在类中,而不是定义在模块层次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Demo:
@classmethod
def klassmeth(*args):
return args

@staticmethod
def statmeth(*args):
return args


print(Demo.klassmeth())

print(Demo.klassmeth('spam'))

print(Demo.statmeth())

print(Demo.statmeth('spam'))


result:

(<class '__main__.Demo'>,)
(<class '__main__.Demo'>, 'spam')
()
('spam',)

格式化显示

内置方法format()实际上调用__format__(format_spec)

1
2
3
4
5
6
7
8
9
10
brl = 1/2.43
format(brl, '0.4f')
'1 BRL = {rate:0.2f} USD'.format(rate=brl) => '1 BRL = 0.41 USD'

format(42, 'b') => '101010'
format(2/3, '.1%') => '66.7%'

now = datetime.now()
format(now, '%H:%M:%S') => '18:49:05'
"It's now {:%I:%M %p}".format(now) => "It's now 06:49 PM"
  • 如果一个对象没有重写__format__那么将会调用str(),如果传入了格式化规格,那么会报错

自定义格式化

1
2
3
4
5
6
7
8
9
10
11
def __format__(self, format_spec=''):
if format_spec.endswith('p'):
format_spec = format_spec[:-1]
coords = (abs(self), self.angle())
outer_fmt = '<{}, {}>'

else:
coords = self
outer_fmt = '({}, {})'
components = (format(c, format_spec) for c in coords)
return outer_fmt.format(*components)

Hashable Vector2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@property
def x(self):
return self.__x

@property
def y(self):
return self.__y

def __iter__(self):
return (i for i in (self.__x, self.__y))

def __hash__(self):
return hash(self.__x) ^ hash(self.__y)

Python中私有和保护的属性

Python无法像Java那样创建private属性。但可以通过__前缀来表示属性是私有的,_用于表示受保护的

使用__slot__节省空间

默认情况,Python存储对象属性在__dict__。当你处理大数据量的时候,__slots__能够节省很多内存

1
__slots__ = ('__x', '__y')

通过定义__slots__来告诉解释器,这是这个所有的属性,Python会将它们存在每个对象的一个类似tuple的结构中

__slots__的弊端

  • 必须为每个子类重新定义__slots__,因为继承属性会被忽略
  • 对象只能拥有__slots__中的属性,除非将__dict__包括在__slots__
  • 对象不能够作为弱引用的目标,除非将__weakref__放到__slots__

变量不是盒子

Python变量就像Java的引用变量。Python变量应该当成是对象的标签

定义,相等,别名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
charles = {'name': 'Charles L. Dodgson', 'born': 1832}

lewis = charles
print(lewis is charles)

print(id(charles), id(lewis))

lewis['balance'] = 950
print(charles)

alex = {'name': 'Charles L. Dodgson', 'born': 1832, 'balance': 950}
print(alex == charles)
print(alex is charles)

  • charleslewis引用的是相同的对象
  • alex对象的内容与charles相同,但不是引用相同的对象,Python==,比较的是对象的属相。is操作符比较的是对象的id,对象的id是创建时就确定了,永远不会改变了。

==和is

==操作符比较对象的值

判断对象是否为None,更合理的方式是

1
x is not None

is操作符速度比==快,因为它不能被重载,因此Python不用寻找特殊的方法来计算它。只需要简单的比较它们的id就可以了。对于a==b,其实背后使用的是a.__eq__(b),__eq__是从Object继承用于比较对象的id,因此效果和is一样.但是很多内置的类型重载了__eq__,做了更多的操作比较对象属性的值。因此==可能会产生额外的计算。

Tuple相对不可修改

元组跟大多数的Python的集合–lists,dicts,sets一样,都是持有对象们的引用。如果引用类型和修改,那么元组自身不可修改,而内部条目可以修改。

1
2
3
4
5
6
7
t1 = (1, 2, [30, 40])
t2 = (1, 2, [30, 40])
print(t1 == t2)
print(id(t1[-1]))
t1[-1].append(99)
print(t1)
print(t1 == t2)
  • t1是不可修改的,t[-1]可修改
  • t1t2的内容一样。修改t2的条目的内容,这个时候t1t2不相同了

默认浅拷贝

1
2
3
4
5
6
7
l1 = [3, [55, 44], (7, 8, 9)]
l2 = list(l1)
print(l2)
print(l1 == l2)
print(l1 is l2)

l3 = l1[:]
  • l2l1拷贝,值相等,但对象不相同
  • l3l1拷贝
  • 两种方式的拷贝都属于浅拷贝。
1
2
3
4
5
6
7
8
9
10
l1 = [3, [66, 55, 44], (7, 8, 9)]
l2 = list(l1)
l1.append(100)
l1[1].remove(55)
print('l1:', l1)
print('l2:', l2)
l2[1] += [33, 22]
l2[2] += (10, 11)
print('l1:', l1)
print('l2:', l2)
  • l2l1的浅拷贝。l1[1]移除55。这也会影响到l2

任意对象的深浅拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

class Bus:
def __init__(self, passengers=None):
if passengers is None:
self.passengers = []
else:
self.passengers = list(passengers)

def pick(self, name):
self.passengers.append(name)

def drop(self, name):
self.passengers.remove(name)


import copy

bus1 = Bus(['Alice', 'Bill', 'Claire', 'David'])
bus2 = copy.copy(bus1)
bus3 = copy.deepcopy(bus1)

print(id(bus1), id(bus2), id(bus3))

bus1.drop('Bill')
print(bus1.passengers)
print(bus2.passengers)
print(id(bus1.passengers), id(bus2.passengers), id(bus3.passengers))
print(bus3.passengers)

  • bus2bus1的浅拷贝,bus3bus1的深拷贝

删除和垃圾回收

del表达式仅删除名字,不会删除对象。执行del表达式之后,对象可能会被回收。但只有在删除了最后一个引用对象的变量,才会被回收掉。CPython垃圾回收的基本算法是引用计数。CPython2.0,引入了新的垃圾回收算法,可以删除循环引用的对象组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
s1 = {1, 2, 3}
s2 = s1


def bye():
print('Gone with the wind')


ender = weakref.finalize(s1, bye)
print(ender.alive)
del s1
print(ender.alive)
s2 = 'spam'
print(ender.alive)

  • s1,s2指向相同的对象
  • 使用weakref来监控s1的状态
  • 执行del s1,对象还被s2引用没有被回收
  • 改变s2的引用,这个时候对象被回收

弱引用

弱引用不增加引用计数。弱引用不阻止垃圾回收

1
2
3
4
5
6
7
8
9
10
a_set = {0, 1}
wref = weakref.ref(a_set)
print(wref)
print(wref())

a_set = {2, 3, 4}
print(wref())
print(wref() is None)
print(wref() is None)

WeakValueDictionary

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

class Cheese:
def __init__(self, kind):
self.kind = kind

def __repr__(self):
return 'Cheese(%r)' % self.kind


import weakref

stock = weakref.WeakKeyDictionary()
catalog = [Cheese('Red Leicester'), Cheese('Tilsit'), Cheese('Brie'), Cheese('Parmesan')]

for cheese in catalog:
stock[cheese.kind] = cheese

print(sorted(stock.keys()))

del catalog
print(stock.keys())
del cheese
print(sorted(stock.keys()))

弱引用的限制

不是所有的Python对象都能使用弱引用。基本listdict都不能被引用。但它们的纯净子类可以。Set也可以

前言

函数装饰器用于标识函数来增强功能。它很强大,掌握它需要理解闭包。本文主要讲述函数装饰器如何工作。

装饰器

装饰器101

1
2
3
4
5
6
7
8
9
@decorate
def target():
print('running target()')

# the same
def target():
print('running target()')
target = decorate(target)

  • 装饰器就是以装饰的函数作为参数的函数调用
1
2
3
4
5
6
7
8
9
10
11
12
13

def deco(func):
def inner():
print('running inner()')

return inner


@deco
def target():
print('running target()')

# result running innner()

Python什么时候执行装饰器

装饰器的一项关键功能是被装饰的函数定义之后立即被执行,通常在import的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
registry = []


def register(func):
print('running register(%s)' % func)
registry.append(func)
return func


@register
def f1():
print('running f1()')


@register
def f2():
print('running f2()')


@register
def f3():
print('running f3()')


if __name__ == '__main__':
print('running main()')
print('registry ->', registry)
f1()
f2()
f3()

# result:
running register(<function f1 at 0x1022a8268>)
running register(<function f2 at 0x1022a82f0>)
running register(<function f3 at 0x1022a8378>)
running main()
registry -> [<function f1 at 0x1022a8268>, <function f2 at 0x1022a82f0>, <function f3 at 0x1022a8378>]
running f1()
running f2()
running f3()
  • 装饰器在模块载入的时候执行,但被装饰的函数只有在显式触发时才会执行

变量作用域

1
2
3
4
5
6
7
8
9
10
b = 6


def f1(a):
global b
print(a)
print(b)
# b is treat as local variable
b = 9

  • f1(a)中定于b,其会被当做本地变量,此时未赋值,直接引用会抛出异常
  • 如果在函数体内加上global关键字,那么这个变量就表示引用的是全局变量

闭包

1
2
3
4
5
6
7
8
9
10
def make_averager():
series = []

def averger(new_value):
series.append(new_value)
total = sum(series)
return total / len(series)

return averger

  • series是自由变量,能被内部函数使用。闭包就是一种函数,其会保留自由变量的绑定,以便能够之后使用
1
2
3
4
5
6
7
8
9
10
11
12
def make_averager_nonlocal():
count = 0
total = 0

def averager(new_value):
nonlocal count, total
count += 1
total += new_value
return total / count

return averager

  • nonlocalPython3引入的关键字,用于标识变量作为自由变量,当变量在函数内部需要被重新赋值使用

实现装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def clock(func):
@functools.wraps(func)
def clocked(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['%s=%r' % (k, w) for k, w in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))
arg_str = ', '.join(arg_lst)
print('[%0.8fs] %s(%s) -> %r' % (elapsed, name, arg_str, result))
return result

return clocked

@clock
def snooze(seconds):
time.sleep(seconds)


@clock
def factorial(n):
return 1 if n < 2 else n * factorial(n - 1)

标准库中的装饰器

functools.lru_cache

其实现内存优化,保存之前计算的结果,避免重复计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

@functools.lru_cache()
@clock
def fibonacci(n):
return n if n < 2 else fibonacci(n - 2) * fibonacci(n - 1)

# result 未加入lru_cache
[0.00000054s] fibonacci(0) -> 0
[0.00000082s] fibonacci(1) -> 1
[0.00008297s] fibonacci(2) -> 0
[0.00000038s] fibonacci(1) -> 1
[0.00000046s] fibonacci(0) -> 0
[0.00000041s] fibonacci(1) -> 1
[0.00001262s] fibonacci(2) -> 0
[0.00002402s] fibonacci(3) -> 0
[0.00012032s] fibonacci(4) -> 0
[0.00000041s] fibonacci(1) -> 1
[0.00000041s] fibonacci(0) -> 0
[0.00000044s] fibonacci(1) -> 1
[0.00001188s] fibonacci(2) -> 0
[0.00002324s] fibonacci(3) -> 0
[0.00000033s] fibonacci(0) -> 0
[0.00000041s] fibonacci(1) -> 1
[0.00001188s] fibonacci(2) -> 0
[0.00000039s] fibonacci(1) -> 1
[0.00000050s] fibonacci(0) -> 0
[0.00000038s] fibonacci(1) -> 1
[0.00001248s] fibonacci(2) -> 0
[0.00002375s] fibonacci(3) -> 0
[0.00004736s] fibonacci(4) -> 0
[0.00008152s] fibonacci(5) -> 0
[0.00021393s] fibonacci(6) -> 0

# result 加入lru_cache
[0.00000059s] fibonacci(0) -> 0
[0.00000067s] fibonacci(1) -> 1
[0.00006346s] fibonacci(2) -> 0
[0.00000114s] fibonacci(3) -> 0
[0.00007960s] fibonacci(4) -> 0
[0.00000084s] fibonacci(5) -> 0
[0.00009436s] fibonacci(6) -> 0

  • lru_cache的加入,减少了一些重复的计算

带着SingleDispatch的通用函数

Python3.4中引入的singledispatch。其允许每个模块定制总体方案,为类提供特定的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@singledispatch
def htmlize(obj):
content = html.escape(repr(obj))
return '<pre>{}</pre>'.format(content)


@htmlize.register(str)
def _(text):
content = html.escape(text).replace('\n', '<br>\n')
return '<p>{0}</p>'.format(content)


@htmlize.register(numbers.Integral)
def _(n):
return '<pre>{0} (0x{0:x})</pre>'.format(n)


@htmlize.register(tuple)
@htmlize.register(abc.MutableSequence)
def _(seq):
inner = '</li>\n<li>'.join(htmlize(item) for item in seq)
return '<ul>\n<li>' + inner + '</li>\n</ul>'

  • singledispatch 定义了通用模板,然后可以使用特定的方法定制化

堆放装饰器

1
2
3
4
5
6
7
8
9
10
@d1
@d2
def f():
print('f')

# same to
def f():
print('f')
f = d1(d2(f))

接收多个参数的装饰器

1
2
3
4
5
6
7
def register(active=True):
...

@register(active=True)
def f1():
print('running f1()')

前言

虽然设计模式是语言无关。但并不意味着每种设计模式都适用于任何语言。Python中可以多用一些设计模式如: 策略,命令,模板方法和访问者模式。

案例学习: Pyhton中的策略模式

Python使用策略模式能让你的代码更简单。

传统策略模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

Customer = namedtuple("Customer", 'name fidelity')


class LineItem:
def __init__(self, product, quantity, price):
self.product = product
self.quantity = quantity
self.price = price

def total(self):
return self.price * self.quantity


class Order: # the context

def __init__(self, customer, cart, promotion=None):
self.customer = customer
self.cart = list(cart)
self.promotion = promotion

def total(self):
if not hasattr(self, '__total'):
self.__total = sum(item.total() for item in self.cart)
return self.__total

def due(self):
if self.promotion is None:
discount = 0
else:
discount = self.promotion.discount(self)
return self.total() - discount

def __repr__(self):
fmt = '<Order total: {:.2f} due: {:.2f}>'
return fmt.format(self.total(), self.due())


class Promotion(ABC): # the Strategy: an abstract base class

@abstractclassmethod
def discount(self, order):
"""Return discount as a positive dollar amount"""


class FidelityPromo(Promotion): # first Concrete strategy
"""%5 discount for customers with 1000 or more fidelity points"""

def discount(self, order):
return order.total() * .05 if order.customer.fidelity >= 1000 else 0


class BulkItemPromo(Promotion): # second Concrete strategy
"""10% discount for each LineItem wit 20 or more units"""

def discount(self, order):
discount = 0
for item in order.cart:
if item.quantity >= 20:
discount += item.total * .1
return discount


class LaregOrderPromo(Promotion): # third concrete startegy
"""%7 discount for orders with 10 or more distinct items"""

def discount(self, order):
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0


if __name__ == '__main__':
joe = Customer('John Doe', 0)
ann = Customer('Ann Smith', 1100)
cart = [LineItem('banana', 4, .5),
LineItem('apple', 10, 1.5),
LineItem('watermellon', 5, 5.0)]

print(Order(joe, cart, FidelityPromo()))
  • Promotion继承ABC作为抽象基类,以便使用@abstractclassmethod装饰器

面向函数的策略模式

对于上述的例子,每个实体策略有一个单独的方法discount。进一步讲,策略对象没有状态。它们看起来就像纯净的函数。因此我们重构上述代码

对于Order类进行如下改造

1
2
3
4
5
6
7
8
9
def due(self):
if self.promotion is None:
discount = 0
else:
# discount = self.promotion.discount(self)
#2
discount = self.promotion(self)
return self.total() - discount

添加如下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

def fideilty_promo(order):
return order.total() * .05 if order.customer.fidelity >= 1000 else 0


def bulk_item_promo(order):
discount = 0
for item in order.cart:
if item.quantity >= 20:
discount += item.total * .1
return discount


def large_order_promo(order):
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0

  • 将原来的Promotion以及其子类由上述方法替代,不需要使用抽象函数

选择最佳策略: 简单方案

1
2
3
4
5
promos = [fideilty_promo, bulk_item_promo, large_order_promo]


def best_promo(order):
return max(promo(order) for promo in promos)
  • 使用best_promo方法来查询最佳方案

系统模块中寻找策略

内置globals

返回表示当前全局符号表的字典

1
2
3
promos = [globals()[name] for name in globals() if name.endswith('_promo') and name != 'best_promo']

promos = [func for name, func in inspect.getmembers(promos, inspect.isfunction)]

命令模式

class MacroCommand:
    def __init__(self, commands):
        self.commands = list(commands)

    def __call__(self):
        for command in self.commands:
            command()
``

函数在Python是首要类对象。
首要对象:

  • 运行时创建
  • 能赋值给变量或者数据结构中的元素
  • 接收参数
  • 返回结果

把函数当做对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

def factorial(n):
'''returns n!'''
return 1 if n < 2 else n * factorial(n - 1)


def test_factorial():
print(factorial(42))
print(factorial.__doc__)
print(type(factorial))

fact = factorial
print(fact)
print(fact(5))
print(map(factorial, range(11)))
print(list(map(fact, range(11))))

  • 创建函数,读取__doc__属性显示函数注释,使用type显示函数类型,其是个function

  • 能够将函数赋值给变量

高阶函数

函数接收一个函数作为参数,并返回一个函数作为结果,称为高阶函数

1
2
3
4
5
6
7

fruits = ['strawberry', 'fig', 'apple', 'cherry', 'raspberry', 'banana']
print(sorted(fruits, key=len))
print(sorted(fruits, key=reverse))

def reverse(word):
return word[::-1]

map, filter, reduce的现代替代品

map, filter

Python3map,filter返回生成器,在pythn2中返回序列

1
2
3
4
print(list(map(factorial, range(6))))
print(list(map(factorial, filter(lambda n: n % 2, range(6)))))
print([factorial(n) for n in range(6) if n % 2])

reduce

python3中,reduce被移到functools

1
2
3
4
from functools import reduce
from operator import add
print(reduce(add, range(100)))

  • 这里reduce的作用相当于sum做累加

匿名函数

lambda关键字创建一个匿名函数

1
print(sorted(fruits, key=lambda word: word[::-1]))

可调用对象的七种风格

用户定义函数

使用def或者lambda创建的表达式

内建的函数

C语言实现的函数

内建方法

C语言实现的方法

方法

在类体中定义的函数

当触发时,类运行__new__方法创建对象,然后在__init__中初始化。最后返回对象给调用者。因为Python中没有new操作符,因此调用类就像调用函数

类实例

if类定义了__call__方法,它的对象可能像函数一样触发

生成器函数

函数或者方法使用yield关键字。当调用时,返回生成器对象

  • 可以使用callable来检查对象是否可以调用
1
[callable(obj) for obj in (abs, str, 13)]

用户定义的可调用类型

只要实现__call__就能让对象表现得像函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class BingoCage:
def __init__(self, items):
self._items = list(items)
random.shuffle(self._items)

def pick(self):
try:
return self._items.pop()
except IndexError:
raise LookupError('pick from empty BingoCage')

def __call__(self):
return self.pick()

bingo = BingoCage(range(3))
print(bingo.pick())
print(bingo())
print(callable(bingo))

函数自省

1
2
3
4
5
6
7
8
9
class C: pass

obj = C()

def func(): pass

print(sorted(set(dir(func)) - set(dir(obj))))

result: ['__annotations__', '__call__', '__closure__', '__code__', '__defaults__', '__get__', '__globals__', '__kwdefaults__', '__name__', '__qualname__']

函数关键字参数

这是Python3的新特性, *表示序列, **表示字典,关键字参数是可选参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def tag(name, *content, cls=None, **attrs):
"""Generate one or more HTML tag"""
if cls is not None:
attrs['class'] = cls
if attrs:
attr_str = ''.join(' %s="%s"' % (attr, value) for attr, value in sorted(attrs.items()))

else:
attr_str = ''
if content:
return '\n'.join('<%s%s>%s</%s>' % (name, attr_str, c, name) for c in content)

else:
return '<%s%s />' % (name, attr_str)

函数参数信息

函数对象中,__defaults__持有位置和关键字参数的默认值,是tuple类型。关键字参数的默认值在__kwdefaults__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def clip(text, max_len=80):
"""Return text clipped at the last space before or after max_len"""
end = None
if len(text) > max_len:
space_before = text.rfind(' ', 0, max_len)
if space_before >= 0:
end = space_before
else:
space_after = text.rfind(' ', max_len)
if space_after >= 0:
end = space_after
if end is None:
end = len(text)
return text[:end].rstrip()

print(clip.__defaults__)
print(clip.__code__)
print(clip.__code__.co_argcount)

from inspect import signature
sig = signature(clip)
print(sig)
print(str(sig))
for name, param in sig.parameters.items():
print(param.kind, ':', name, '=', param.default)

使用inspect/Signature可以实现类似查看函数元数据的功能

函数注解

Python3能够添加元数据到函数声明的参数和返回值上

1
def clip(text: str, max_len: 'int > 0'=80) -> str:

函数编程的包

operator模块

1
2
3
4
5
6
7
8
from functools import reduce
from operator import mul

def fact(n):
return reduce(lambda a, b: a * b, range(1, n + 1))

def fact_op(n):
return reduce(mul, range(1, n + 1))
1
2
3
4
5
6
from operator import methodcaller
s = 'The time has come'
upcase = methodcaller('upper')
print(upcase(s))
hiphenate = methodcaller('replace', ' ', '-')
print(hiphenate(s))

使用functools.partial冻结参数

functools.partial是一种高阶函数,允许应用函数功能的一部分。给定一个函数,部分功能会产生一个新的函数

1
2
3
4
5
from operator import mul
from functools import partial
triple = partial(mul, 3)
print(triple(7))
print(list(map(triple, range(1, 10))))

总结

本文主要描述了Python中的函数。主要思想是函数能够赋值给变量,传给其他函数,将他们存在数据结构中,访问函数属性,允许框架和工具操作相关信息。高阶函数的使用。调用的七种形式

Dictionaries

DictionaryPython中最常用的数据结构之一

推导式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DIAL_CODES = [
(86, 'China'),
(91, 'India'),
(1, 'United States'),
(62, 'Indonesia'),
(55, 'Brazil'),
(92, 'Pakistan'),
(880, 'Bangladesh'),
(234, 'Nigeria'),
(7, 'Russia'),
(81, 'Japan'),
]

country_code = {country: code for code, country in DIAL_CODES}
# print(country_code)
print({code: country.upper() for country, code in country_code.items() if code < 66})

使用setdefault处理无key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import sys
import re

WORD_RE = re.compile('\w+')

index = {}
with open(sys.argv[1], encoding='utf-8') as fp:
for line_no, line in enumerate(fp, 1):
for match in WORD_RE.finditer(line):
word = match.group()
column_no = match.start() + 1
location = (line_no, column_no)
occurrences = index.get(word, [])
occurrences.append(location)
index[word] = occurrences
for word in sorted(index, key=str.upper):
print(word, index[word])
  • 可以使用get(key, defaultValue)setdefault(key, defaultValue)来处理key不存在的情况,也可以使用collections.defaultdict(value)来设置创建默认new-key的字典。采用这方式,如果dd是一个defaultdict,那么当dd[k]不存在时,会调用default_factory来创建一个默认值,但返回结果依然是None

__missing__

这个方法默认是没有定义的,当dict的子类提供了这个方法的实现,那么当key不存在时,不会抛出异常KeyError__getitem__会调用__missing__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class StrKeyDict0(dict):

def __missing__(self, key):
if isinstance(key, str):
raise KeyError(key)
return self[str(key)]

def get(self, k, d=None):
try:
return self[k]
except KeyError:
return default

def __contains__(self, item):
return key in self.keys() or str(key) in self.keys()
  • 这里需要注意isintance检查key是否为str,否则会陷入死循环。使用k in dict.keys()效率很高

各种各样的dict

除了defaultdict之外,还会有很多别的dict

collections.OrderedDict

key有顺序的dict,类似Java中的TreeMap

collections.ChainMap

将多个dict当成一个dict,按顺序搜索key,只要搜索到key,结果返回成功

collections.Counter

持有key的计数

1
2
3
4
ct = collections.Counter('dsakfjdjsfakdjfs')
print(ct)

result: Counter({'d': 3, 's': 3, 'f': 3, 'j': 3, 'a': 2, 'k': 2})
userDict

Python实现的标准dict

  • 一般来说userDict被用来集成,其他几个直接使用

UserDict派生

UserDict内部提供了很多默认的实现方法,因此直接从UserDict继承会很方便。UserDict不是继承dict,但其内部有个dict实例。这是真正数据存储的地方

1
2
3
4
5
6
7
8
9
10
11
class StrKeyDict0(collections.UserDict):
def __missing__(self, key):
if isinstance(key, str):
raise KeyError(key)
return self[str(key)]

def __contains__(self, item):
return str(item) in self.data

def __setitem__(self, key, value):
self.data[str(key)] = value

不可改Mapping

标准库提供的映射类型都是可以修改的。你可能想让你的映射不能够被修改。Python3.3之后,标准库提供了只读的MappingProxy

1
2
3
4
5
from types import MappingProxyType
d = {1: 'A'}
d_proxy = MappingProxyType(d)
print(d_proxy)
d_proxy[2] = 'x'
  • 上面的代码会抛出异常,因为d_proxy是不允许被修改的

Set

1
2
3
4
l = ['spam', 'spam', 'eggs', 'spam']
print(set(l))

result: {'spam', 'eggs'}
  • 需要注意Set的元素必须是能哈希的
  • 支持&(intersetion)

Set语法

Python3中,set标准表示方法s = {1}。使用这个方式会比使用set([1])效率高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from dis import dis
print(dis('{1}'))

result:

1 0 LOAD_CONST 0 (1)
2 BUILD_SET 1
4 RETURN_VALUE


print(dis('set([1])')

result:
1 0 LOAD_NAME 0 (set)
2 LOAD_CONST 0 (1)
4 BUILD_LIST 1
6 CALL_FUNCTION 1
8 RETURN_VALUE
  • 使用dis来查看两种方式的字节码操作。第一种方式效率明显高于第二种

Set推导式

1
2
from unicodedata import name
print({chr(i) for i in range(32, 256) if 'SIGN' in name(chr(i), '')})

其他

性能

数据量 因子 dict时间 因子
1000 1x 0.000202s 1.00x
10000 10x 0.000140s 0.69x
100000 100x 0.000228s 1.13x
1000000 1000x 0.000290s 1.44x
10000000 10000x 0.000337s 1.67x

dict的高性能多亏了hashtable

dict_hash_algorithm

好处和坏处

  • key必须要能hash,需要满足

    • 支持hash(),同一个对象返回的值总是相同
    • 支持eq()
    • 如果a==b,那么hash(a)==hash(b)
    • 用户定义的类型的hash值是id()
  • 内存开销大

  • key搜索非常快

    • dict是典型的以空间换取时间
  • key的顺序依赖插入顺序

  • 添加数据会影响已存在key的顺序

列表推导式和生成器表达式

列表推导式可读性

1
2
3
4
5
6
7
8
9
10
def test1():
codes = []
for symbol in symbols:
codes.append(ord(symbol))
print(codes)


def test2():
codes = [ord(symbol) for symbol in symbols]
print(codes)
  • 上面两个代码的效果是一样的,test2使用列表推导式的可读性会更好
1
2
3
x = 'my precious'
dummy = [x for x in 'ABC']
print(x)
  • 对于上述代码,在python2.x中,x的值是C。而在python3.x中,x的值依然是my precious。在python3.x中,列表推导式的变量不再泄露,其有自己的内部作用域

列表推导式和map,filter

列表推导式也能做到mapfilter能做到的

1
2
3
4
5
beyond_ascii = [ord(s) for s in symbols if ord(s) > 127]
print(beyond_ascii)

beyond_ascii = list(filter(lambda c: c > 127, map(ord, symbols)))
print(beyond_ascii)
  • 上述两个表达式的结果一样,但使用列表推导式会更简洁。而且它们的效率也差不多。我们来测试一下它们的效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
TIMES = 10000

SETUP = """
symbols = '$¢£¥€¤'
def non_ascii(c):
return c > 127
"""


def clock(label, cmd):
res = timeit.repeat(cmd, setup=SETUP, number=TIMES)
print(label, *('{:.3f}'.format(x) for x in res))


def test_speed():
clock('listcomp :', '[ord(s) for s in symbols if ord(s) > 127]')
clock('listcomp + func :', '[ord(s) for s in symbols if non_ascii(ord(s))]')
clock('filter + lambda :', 'list(filter(lambda c: c > 127, map(ord, symbols)))')
clock('filter + func :', 'list(filter(non_ascii, map(ord, symbols)))')
  • 运行结果:
1
2
3
4
listcomp        : 0.015 0.015 0.015
listcomp + func : 0.019 0.019 0.021
filter + lambda : 0.017 0.017 0.017
filter + func : 0.016 0.019 0.016

笛卡尔积

列表推导式能从两个或多个迭代对象中生成笛卡尔积

1
2
3
4
colors = ['black', 'white']
sizes = ['S', 'M', 'L']
tshirts = [(color, size) for color in colors for size in sizes]
print(tshirts)
  • 结果
1
[('black', 'S'), ('black', 'M'), ('black', 'L'), ('white', 'S'), ('white', 'M'), ('white', 'L')]

生成器表达式

1
2
3
4
5
colors = ['black', 'white']
sizes = ['S', 'M', 'L']

for tshirt in ('%s %s' % (c, s) for c in colors for s in sizes):
print(tshirt)
  • 通过使用生成器表达式可以省去构建数组的成本

Tuples不仅是不可变的序列

Tuple作为Records

1
2
3
4
5
6
traveler_ids = [('USA', '31195855'), ('BRA', 'CE342567'), ('ESP', 'XDA205856')]
for passport in sorted(traveler_ids):
print('%s/%s' % passport)

for country, _ in traveler_ids:
print(country)
Tuple解包
1
2
3
4
5
6
7
lax_coordinates = (33.9425, -118.408056)
latitude, longitude = lax_coordinates
print('{0}, {1}'.format(latitude, longitude))
print('%s, %s' % (latitude, longitude))

a, b, *rest = range(5)
print(a, b, rest)
  • python3.x之前还支持函数参数解包。
命名Tuple
1
2
3
4
5
6
7
8
9
from collections import namedtuple
City = namedtuple('City', 'name country population coordinates')
tokyo = City('Tokyo', 'JP', 36.933, (35.689722, 139.691667))
print(tokyo)

LatLong = namedtuple('LatLong', 'lat long')
delhi_data = ('Delhi NCR', 'IN', 21.935, LatLong(28.613889, 77.208889))
delhi = City._make(delhi_data)
print(delhi._asdict())
  • _fieldtuple类的属性名
  • _make允许使用迭代器初始化named tuple
  • _asdict返回一个用name tuple构建的排过序的字典

切片

python中所有的序列类型都支持切片操作

1
2
3
t = (1, 2, [30, 40])
t[2] += [50, 60]
print(t)
  • 执行上述代码会报错,但事实上t已经变成了[30, 40, 50, 60]
  • 不要将可变对象放到tuple
  • +=不是一个原子操作。从上面我们看到,其实异常时在它执行完之后抛出的
  • 使用dis能够很方便的查看python字节码

使用二分法管理序列

查找

1
2
3
4
5
def grade(score, breakpoints=[60, 70, 80, 90], grades='FDCBA'):
i = bisect.bisect(breakpoints, score)
return grades[i]

print([grade(score) for score in [33, 99, 77, 70, 89, 90, 100]])

排序

1
2
3
4
5
6
7
8
SIZE = 7
random.seed(1729)

my_list = []
for i in range(SIZE):
new_item = random.randrange(SIZE * 2)
bisect.insort(my_list, new_item)
print('%sd ->' % new_item, my_list)

数组

使用array.tofile和array.fromfile,效率很高。读取使用
array.tofile创建的二进制文件中的1千万个双精度浮点数仅需0.1s。这速度是从文件读取数字的60倍。写操作是7倍。并且最后生成的文件小

注意,Python3.4之后array已经没有内置的排序方法

MemoryView

1
2
3
4
5
6
7
8
numbers = array('h', [-2, -1, 0, 1, 2])
memv = memoryview(numbers)
print(len(memv))
print(memv[0])
memv_oct = memv.cast('B')
print(memv_oct.tolist())
memv_oct[5] = 4
print(numbers)

双端队列和其他队列

1
2
3
4
5
6
7
8
dq = deque(range(10), maxlen=10)
print(dq)
dq.rotate(3)
print(dq)
dq.rotate(-4)
print(dq)
dq.appendleft(-1)
print(dq)
  • 除了deque之外,其他一些标准库也实现了queues:
    • queue: Queue, LifoQueue, PriorityQueue
    • multiprocessing: JoinableQueue
    • asyncio
    • heapq

数据模型(Data Model)

python提供了很多特殊的方法

不含操作符

种类 方法名称
字符串/字节表示 __repr__, __str__, __format__, __bytes
数值转换 __abs__, __bool__, __complex, __int__, __float__, __hash__, __index__
模拟集合 __len__, __getitem__, __setitem__, __delitem, __contains__
迭代器 __iter__, __reversed__, __next__
模拟调用 __call__
上下文管理 __enter__, __exit__
实例创建和销毁 __new__, __init__, __del__
属性管理 __getattr__, __getattribute__, __setattr__, __delattr__, __dir__
属性描述器 __get__, __set__, __delete__
类服务 __prepare__, __instancecheck__, __subclasscheck__

操作符

种类 方法名和相关操作符
一元数值操作符 __neg__-, __pos__+, __abs__()
比较操作符 __lt__<, __le__<=, __eq__=, __ne__!=, __gt__>, __ge__>=
算术操作符 __add__+, __sub__-, __mul__*, __truediv__/, __floordiv__//, __mod__%, __divmod__ divmod(), __pow__ **或者(pow()), __round__ round()
反向算术操作符 __radd__, __rsub__, __rmul__, __rtruediv__, __rfloordiv__, __rmod__, __rdivmod__, __rpow__
自操作符 _iadd__, __isub__, __imul__, __itruediv__, __ifloordiv__, __imod__, __ipow__
位操作符 __invert__~, __lshift__<<, __rshift__>>, __and__&, __or__
反向位操作符 __rlshift__, __rrshift__, __rand__, __rxor__, __ror__
自增位操作符 __ilshift__, __irshift__, __iand__, __ixor__, __ior__
  • 通过实现上述特殊的方法,你自己的对象能表现得像内置类型。例如为了让对象打印出来可读性更好,这个时候需要实现__repr____str__ 区别

前言

本文主要讲述Ionic在公司最新产品中的应用

技术背景

公司计划发布新的产品。为了利用现有的资源,快速构建产品,采用ionic作为基础框架。ionic是一款H5移动开发框架,打包插件基于apache cordova

实践之路

实践过程:

  • 安装ionic, cordova框架。运行如下命令
1
2
npm install -y .
npm install -y ionic cordova
  • 创建ionic项目。使用如下命令,可以指定包名
1
ionic start xxx(项目名称) --id xxx(包名)
  • 下载需要的cordova插件
  • 获取资源文件
  • 更新配置文件
  • 添加支持平台
1
ionic platform add android
  • 构建项目
1
ionic build xxx(平台)

可持续集成

为了可持续集成,那么我这里将整个项目分为两部分,一部分固定不变的,即每次框架自动生成的文件。另一部分为我们一直需要修改的。因此这里的想法是,将改变的部分分离出去。每次运行脚本,自动生成文件。然后使用改变的部分覆盖自动生成的部分。这里改变的部分,主要包括配置文件,Native代码以及资源文件。替换完毕,打包签名出apk文件,并上传云服务。根据此思路,编写python脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 覆盖配置文件
copy_file('config.xml', subfix)

# 覆盖build.gradle文件
copy_file('build.gradle', androidPath)

# 覆盖 AndroidManifest
copy_file('AndroidManifest.xml', androidPath)

# 覆盖资源文件
# 先删除资源文件
try:
remove_tree(toDirectory + '/resources')
except OSError:
pass

# 覆盖资源文件
copy_tree(fromDirectory, toDirectory + '/resources')

# 覆盖native代码
copy_tree(fromDirectory, toDirectory)

之后签名打包,上传云服务

难点

  1. 由于默认初始化项目,会自动生成一个包名。为了指定包名,因此需要使用--id来指定包名
  2. 自动生成的config.xml文件中包含了自动生成的包名,一旦打包签名时,会根据这个包名自动生成代码,因此需要将此自动包名,修改为需要的包名

总结

此项目目前能实现从项目初始化,构建到最后打包上传完全自动化。大大减少工作量。对于我个人也是一次从新框架,新CI(bitrise)到最终的自动化构建的一次完整系统的学习。

RxJava使用观察者模式,基于事件驱动。主要包含两部分ObservableObservaber。而Rxjava最大的特色在于其灵活强大的操作符(Operators)和调度器(Scheduler)

Operators

Creating Observables

create(OnSubscribe)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.create(subscriber -> {
System.out.println("subscriber");
}).subscribe(new Subscriber<Object>() {
@Override public void onCompleted() {
System.out.println("onCompleted");
}

@Override public void onError(Throwable e) {
System.out.println("onError");
}

@Override public void onNext(Object o) {
System.out.println("onNext");
}
});
  • 这种情况下会打印subscriber
  • 这个时候加上
1
2
3
4
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(subscriber);
subscriber.onCompleted();
}
  • 那么会打印
1
2
onNext
onCompleted
  • 源码分析:
1
2
3
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
  • 通过RxJavaHooks创建OnSubscribe
1
2
3
4
5
6
7
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
  • RxJavaHooks会在首次使用的时候初始化。

之后还需要订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.java

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//省略若干代码
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
//省略异常处理代码
}

RxJavaHooks.java

onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};

RxJavaPlugins.java

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
  • 判断订阅者是否为空,之后直接交给RxJavaHooks执行onObservableStart,这里的参数observable.onSubscribe正是Create传入的,而在RxJavaHooksonObservableStart调用了RxJavaPlugin中的onSubscribeStart。这个方法直接返回原来的OnSubscribe也就是Create传入的。这个时候会回调call

  • create()有另外两种参数专门用于处理backPressure(反向压力,生产者生产速度远大于消费者消费速度)

SyncOnSubscribe里面需要关注的是SubscriptionProducer,而AsyncOnSubscribe需要关注AsyncOuterManager。这两个东西之后再讨论(TODO)

defer

直到观察者订阅之后才创建可观察对象,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.defer(Observable::empty).subscribe(new Subscriber<Object>() {
@Override public void onCompleted() {
System.out.println("onCompleted");
}

@Override public void onError(Throwable e) {
System.out.println("onError");
}

@Override public void onNext(Object o) {
System.out.println("onNext");
}
});
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.java

2.0.3

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}

1.2.4

public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
return create(new OnSubscribeDefer<T>(observableFactory));
}
  • 1.2.4采用OnSubscribeDefer,2.0.3采用ObservableDefer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
OnSubscribeDefer.java

@Override
public void call(final Subscriber<? super T> s) {
Observable<? extends T> o;
try {
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
o.unsafeSubscribe(Subscribers.wrap(s));
}

ObservableDefer.java

@Override
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}

pub.subscribe(s);
}
Empty Never Throw

Empty

  • 源码分析
1
2
3
4
5
6
EmptyObservableHolder.java

@Override
public void call(Subscriber<? super Object> child) {
child.onCompleted();
}
  • 直接调用onCompleted,这是一个枚举单例

Never

  • 源码分析
1
2
3
4
5
6
NeverObservableHolder.java

@Override
public void call(Subscriber<? super Object> child) {
// deliberately no op
}
  • 什么都不做,主要用于做测试,这是一个枚举单例

Throw

  • 源码分析:
1
2
3
4
@Override 
public void call(Subscriber<? super T> observer) {
observer.onError(exception);
}
From

Future转化成Observable,示例

1
2
3
4
5
Observable.from(Executors.newSingleThreadExecutor().submit(() -> {
System.out.println("before sleep");
Thread.sleep(5000);
return true;
})).subscribe(mSubscriber);
  • 打印结果
1
2
3
before sleep
onNext
onCompleted
  • 源码分析
1
2
3
4
5
Observable.java

public static <T> Observable<T> from(Future<? extends T> future) {
return (Observable<T>)create(OnSubscribeToObservableFuture.toObservableFuture(future));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
OnSubscribeToObservableFuture.java

public static <T> OnSubscribe<T> toObservableFuture(final Future<? extends T> that) {
return new ToObservableFuture<T>(that);
}

@Override
public void call(Subscriber<? super T> subscriber) {
//核心代码
if (subscriber.isUnsubscribed()) {
return;
}
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
subscriber.setProducer(new SingleProducer<T>(subscriber, value));
}
  • 等待future结果。将结果作为生产者发出消息.push data

将数组转换成Observable

  • 源码
1
2
3
4
5
OnSubscribeFromArray.java

public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
  • 这里面依然涉及到反向压力的处理,当请求id等于阈值时采用fastPath处理。反之采用slowPath。正常情况下请求id都是Long.MaxValue,当producer为空时,id会发生改变
Interval

定时发出事件,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final CountDownLatch latch = new CountDownLatch(5);
long startTime = System.currentTimeMillis();
Observable.interval(1, TimeUnit.SECONDS).subscribe(counter -> {
latch.countDown();
System.out.println("Got: " + counter + " time: " + (System.currentTimeMillis() - startTime));
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

结果
Got: 0 time: 1252
Got: 1 time: 2245
Got: 2 time: 3249
Got: 3 time: 4247
Got: 4 time: 5244

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
OnSubscribeTimerPeriodically.java

默认使用computation调度器
public void call(final Subscriber<? super Long> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
try {
child.onNext(counter++);
} catch (Throwable e) {
try {
worker.unsubscribe();
} finally {
Exceptions.throwOrReport(e, child);
}
}
}

}, initialDelay, period, unit);
}

  • 创建工作器。工作器串行执行。
Just

发送特定的消息,有多个重载方法,区分在于单个参数和多个参数,单个参数使用ScalarSynchronousObservable,而多个参数使用from

  • 源码分析
1
2
3
4
5
6
7
8
9
10
ScalarSynchronousObservable.java

protected ScalarSynchronousObservable(final T t) {
super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
this.t = t;
}

public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, value));
}
    • 创建生产者时区分单生产者和弱单生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
WeakSingleProducer.java

public void request(long n) {
if (once) {
return;
}
if (n < 0L) {
throw new IllegalStateException("n >= required but it was " + n);
}
if (n == 0L) {
return;
}
once = true;
Subscriber<? super T> a = actual;
if (a.isUnsubscribed()) {
return;
}
T v = value;
try {
a.onNext(v);
} catch (Throwable e) {
Exceptions.throwOrReport(e, a, v);
return;
}

if (a.isUnsubscribed()) {
return;
}
a.onCompleted();
}
  • 弱单生产者不考虑并发问题,仅仅只使用once来记录此次请求已经发出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void request(long n) {
// negative requests are bugs
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
}
// we ignore zero requests
if (n == 0) {
return;
}
// atomically change the state into emitting mode
if (compareAndSet(false, true)) {
// avoid re-reading the instance fields
final Subscriber<? super T> c = child;
// eagerly check for unsubscription
if (c.isUnsubscribed()) {
return;
}
T v = value;
// emit the value
try {
c.onNext(v);
} catch (Throwable e) {
Exceptions.throwOrReport(e, c, v);
return;
}
// eagerly check for unsubscription
if (c.isUnsubscribed()) {
return;
}
// complete the child
c.onCompleted();
}
}
  • 单生产者考虑并发问题,继承AtomicBoolean,通过compareAndSet来确保线程安全
Range

串行发出某一范围的消息

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void request(long requestedAmount) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (requestedAmount == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
// fast-path without backpressure
fastPath();
} else if (requestedAmount > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, requestedAmount);
if (c == 0L) {
// backpressure is requested
slowPath(requestedAmount);
}
}
}
  • 策略依然跟大部分producer一样,正常情况下使用fastPathslowPath则会考虑线程安全
Repeat

重复发出消息,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.range(0, 3).repeat(3).subscribe(mSubscriber);

打印结果:
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 0
onNext value: 1
onNext value: 2
onCompleted

打印三组range
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
repeat默认使用trampoline作为线程调度器,在当前线程串行执行

OnSubscribeRedo.java

public void call(final Subscriber<? super T> child) {

//订阅源消息
final Action0 subscribeToSource = new Action0() {
@Override
public void call() {
if (child.isUnsubscribed()) {
return;
}

Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
boolean done;

@Override
public void onCompleted() {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnCompleted());
}
}

@Override
public void onError(Throwable e) {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnError(e));
}
}

@Override
public void onNext(T v) {
if (!done) {
child.onNext(v);
decrementConsumerCapacity();
arbiter.produced(1);
}
}

private void decrementConsumerCapacity() {
// use a CAS loop because we don't want to decrement the
// value if it is Long.MAX_VALUE
while (true) {
long cc = consumerCapacity.get();
if (cc != Long.MAX_VALUE) {
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
break;
}
} else {
break;
}
}
}

@Override
public void setProducer(Producer producer) {
arbiter.setProducer(producer);
}
};
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
sourceSubscriptions.set(terminalDelegatingSubscriber);
source.unsafeSubscribe(terminalDelegatingSubscriber);
}
};

//重复发送源消息
final Observable<?> restarts = controlHandlerFunction.call(
terminals.lift(new Operator<Notification<?>, Notification<?>>() {
@Override
public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
return new Subscriber<Notification<?>>(filteredTerminals) {
@Override
public void onCompleted() {
filteredTerminals.onCompleted();
}

@Override
public void onError(Throwable e) {
filteredTerminals.onError(e);
}

@Override
public void onNext(Notification<?> t) {
if (t.isOnCompleted() && stopOnComplete) {
filteredTerminals.onCompleted();
} else if (t.isOnError() && stopOnError) {
filteredTerminals.onError(t.getThrowable());
} else {
filteredTerminals.onNext(t);
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
};
}
}));
//订阅重复发送的源消息
worker.schedule(new Action0() {
@Override
public void call() {
restarts.unsafeSubscribe(new Subscriber<Object>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(Object t) {
if (!child.isUnsubscribed()) {
// perform a best endeavours check on consumerCapacity
// with the intent of only resubscribing immediately
// if there is outstanding capacity
if (consumerCapacity.get() > 0) {
worker.schedule(subscribeToSource);
} else {
// set this to true so that on next request
// subscribeToSource will be scheduled
resumeBoundary.compareAndSet(false, true);
}
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
});
}
});
}
  • 流程
    1. 订阅源消息
    2. 重复发送源消息
    3. 订阅重复发送的源消息
Timer

延时特定时间后发出一个消息,默认使用computation作为线程调度器

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void call(final Subscriber<? super Long> child) {
Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedule(new Action0() {
@Override
public void call() {
try {
child.onNext(0L);
} catch (Throwable t) {
Exceptions.throwOrReport(t, child);
return;
}
child.onCompleted();
}
}, time, unit);
}

Transforming Observables

Buffer

缓存源数据发出的消息,再一起发出。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Integer[] integers = new Integer[9];
for(int i = 0; i < integers.length; i++) {
integers[i] = i;
}
Observable.from(integers).buffer(2).subscribe(mSubscriber);

结果:2个值一组

onNext value: [0, 1]
onNext value: [2, 3]
onNext value: [4, 5]
onNext value: [6, 7]
onNext value: [8]
onCompleted
  • 源码分析
buffer(int count, int skip)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
OperatorBufferWithSize.java

public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
if (skip == count) {
BufferExact<T> parent = new BufferExact<T>(child, count);

child.add(parent);
child.setProducer(parent.createProducer());

return parent;
}
if (skip > count) {
BufferSkip<T> parent = new BufferSkip<T>(child, count, skip);

child.add(parent);
child.setProducer(parent.createProducer());

return parent;
}
BufferOverlap<T> parent = new BufferOverlap<T>(child, count, skip);

child.add(parent);
child.setProducer(parent.createProducer());

return parent;
}
  • skip等于count(默认情况)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
BufferExact

public void onNext(T t) {
List<T> b = buffer;
if (b == null) {
b = new ArrayList<T>(count);
buffer = b;
}

b.add(t);

if (b.size() == count) {
buffer = null;
actual.onNext(b);
}
}
  • 创建大小为count的数组,将接收到的数组添加到数组中,数组满,将数组作为数据发出

  • skip大于count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
BufferSkip.java

public void onNext(T t) {
long i = index;
List<T> b = buffer;
if (i == 0) {
b = new ArrayList<T>(count);
buffer = b;
}
i++;
if (i == skip) {
index = 0;
} else {
index = i;
}

if (b != null) {
b.add(t);

if (b.size() == count) {
buffer = null;
actual.onNext(b);
}
}
}


  • 每组count个,接收skip个数据,填满count个,剩余丢弃

  • count大于skip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
BufferOverlap.java

public void onNext(T t) {
long i = index;
if (i == 0) {
List<T> b = new ArrayList<T>(count);
queue.offer(b);
}
i++;
if (i == skip) {
index = 0;
} else {
index = i;
}

for (List<T> list : queue) {
list.add(t);
}

List<T> b = queue.peek();
if (b != null && b.size() == count) {
queue.poll();
produced++;
actual.onNext(b);
}
}
  • 使用ArrayDeque存储每组数据。ArrayDeque会储存之前的数据。
buffer(Func0)
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
OperatorBufferWithSingleObservable.java

public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
Observable<? extends TClosing> closing;
try {
closing = bufferClosingSelector.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, child);
return Subscribers.empty();
}
final BufferingSubscriber s = new BufferingSubscriber(new SerializedSubscriber<List<T>>(child));

Subscriber<TClosing> closingSubscriber = new Subscriber<TClosing>() {

@Override
public void onNext(TClosing t) {
s.emit();
}

@Override
public void onError(Throwable e) {
s.onError(e);
}

@Override
public void onCompleted() {
s.onCompleted();
}
};

child.add(closingSubscriber);
child.add(s);

closing.unsafeSubscribe(closingSubscriber);

return s;
}


  • bufferClosingSelector产生的Observable订阅BufferingSubscriber其内部使用chunk收集数据,当closingSubscriber被订阅,将chunk数据发出
buffer(long timespan, long timeshift, TimeUnit unit)
  • 源码分析

    默认使用computation线程调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
OperatorBufferWithTime.java

if (timespan == timeshift) {
ExactSubscriber parent = new ExactSubscriber(serialized, inner);
parent.add(inner);
child.add(parent);
parent.scheduleExact();
return parent;
}

InexactSubscriber parent = new InexactSubscriber(serialized, inner);
parent.add(inner);
child.add(parent);
parent.startNewChunk();
parent.scheduleChunk();
return parent;
  • 数据刷新时间和数据块创建时间一样时使用ExactSubscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void onNext(T t) {
List<T> toEmit = null;
synchronized (this) {
if (done) {
return;
}
chunk.add(t);
if (chunk.size() == count) {
toEmit = chunk;
chunk = new ArrayList<T>();
}
}
if (toEmit != null) {
child.onNext(toEmit);
}
}
  • 反之使用InexactSubscriber,其使用LinkedList作为chunk数据结构。用于存储多余的数据。数据生成的速度大于处理速度
FlatMap

转换消息成被观察对象,对map(func)的结果做merge

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Observable.java

public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}

OperatorMerge.java,合并多个Observable成一个

public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;

child.add(subscriber);
child.setProducer(producer);

return subscriber;
}

MergeSubscriber.java

public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}

  • 创建InnerSubscriber,其内部会调用tryEmit。之后调用emitLoop,循环从唤醒队列中取出消息发出
GroupBy

将源数据按特征分组,示例

1
2
3
Observable.just(1, 2, 3, 4, 5)
.groupBy(integer -> integer % 2 == 0)
.subscribe(grouped -> grouped.toList().subscribe(integers -> System.out.println(integers + " (Even: " + grouped.getKey() + ")")));
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
OperatorGroupBy.java

public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> child) {
final GroupBySubscriber<T, K, V> parent; // NOPMD
try {
parent = new GroupBySubscriber<T, K, V>(child, keySelector, valueSelector, bufferSize, delayError, mapFactory);
} catch (Throwable ex) {
//Can reach here because mapFactory.call() may throw in constructor of GroupBySubscriber
Exceptions.throwOrReport(ex, child);
Subscriber<? super T> parent2 = Subscribers.empty();
parent2.unsubscribe();
return parent2;
}

child.add(Subscriptions.create(new Action0() {
@Override
public void call() {
parent.cancel();
}
}));

child.setProducer(parent.producer);

return parent;
}

GroupBySubscriber.java

public void onNext(T t) {
K key;
try {
key = keySelector.call(t);
} catch (Throwable ex) {
unsubscribe();
errorAll(a, q, ex);
return;
}

...

if (group == null) {
// if the main has been cancelled, stop creating groups
// and skip this value
if (!cancelled.get()) {
group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
groups.put(mapKey, group);

groupCount.getAndIncrement();

notNew = false;
q.offer(group);
drain();
} else {
return;
}
}

...

V v;
try {
v = valueSelector.call(t);
} catch (Throwable ex) {
unsubscribe();
errorAll(a, q, ex);
return;
}

group.onNext(v);
}
  • 使用键选择器生成key,使用key创建GroupedUnicast存储起来。通过值选择器生成值并发出。
Map

与flatmap类似,flatmap多了merge步骤

Scan

对新发出的信息与之前发出的消息一起做某种处理。示例

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4, 5).scan((sum, value) -> sum + value).subscribe(integer -> System.out.println("Sum: " + integer));

结果:
Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: 15
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
OperatorScan.java

//没有初始值
public void onNext(T t) {
R v;
if (!once) {
once = true;
v = (R)t;
} else {
v = value;
try {
//使用之前的结果
v = accumulator.call(v, t);
} catch (Throwable e) {
Exceptions.throwOrReport(e, child, t);
return;
}
}
value = v;
child.onNext(v);
}

//有初始值
@Override
public void onNext(T currentValue) {
R v = value;
try {
v = accumulator.call(v, currentValue);
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, currentValue);
return;
}
value = v;
ip.onNext(v);
}
  • 接收到消息,使用之前计算的结果和当前值做计算并存储
Window

与buffer类似,但其缓存后是以新的对象作为消息发出(Subject)

Filtering Observables

Debounce

只发出经过特定时间从源消息接收到的消息,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.from(integers).debounce(Observable::just).subscribe(mSubscriber);

结果:
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 3
onNext value: 4
onNext value: 5
onNext value: 6
onNext value: 7
onNext value: 8
onCompleted

Observable.from(integers).debounce(1, TimeUnit.SECONDS).subscribe(mSubscriber);

结果:
onNext value: 8
onCompleted
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
OperatorDebounceWithTime.java

public void onNext(final T t) {
final int index = state.next(t);
serial.set(worker.schedule(new Action0() {
@Override
public void call() {
state.emit(index, s, self);
}
}, timeout, unit));
}


  • 接收源消息自增。定时调用emit发送消息,如果源消息的发送速度快于当前调度时间,则这些事件会丢弃。事件发送结束时会发送最后一个消息
Distinct

过滤重复的消息,示例

1
2
3
4
5
6
7
Observable.just(1, 2, 3, 4, 1, 2, 3, 4).distinct().subscribe(integer -> System.out.println("integer: " + integer));

结果:
integer: 1
integer: 2
integer: 3
integer: 4
  • 源码分析
1
2
3
4
5
6
7
8
9
10
OperatorDistinct.java

public void onNext(T t) {
U key = keySelector.call(t);
if (keyMemory.add(key)) {
child.onNext(t);
} else {
request(1);
}
}
  • 使用一个Set来存储消息
ElementAt

只发送第n个消息,示例

1
2
3
4
5
Observable.from(integers).elementAt(5).subscribe(mSubscriber);

结果:
onNext value: 5
onCompleted
  • 源码分析
1
2
3
4
5
6
7
8
9
OperatorElementAt.java

public void onNext(T value) {
if (currentIndex++ == index) {
child.onNext(value);
child.onCompleted();
unsubscribe();
}
}
  • 只有当第n个消息才发送
Filter

按特定条件过滤消息,示例

1
2
3
4
5
6
Observable.just(1, 2, 3, 4).filter(integer -> integer > 2).subscribe(integer -> System.out.println("integer: " + integer));

结果:
integer: 3
integer: 4

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
OnSubscribeFilter.java

public void onNext(T t) {
boolean result;

try {
result = predicate.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}

if (result) {
actual.onNext(t);
} else {
request(1);
}
}
  • 使用选择器过滤消息
first

发出第一个消息,示例

1
2
3
4
5
6
Observable.from(integers).first(integer -> false).subscribe(mSubscriber);

可以决定第一个消息是否发出

结果:
onError
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
OperatorTake.java

public void onNext(T i) {
if (!isUnsubscribed() && count++ < limit) {
boolean stop = count == limit;
child.onNext(i);
if (stop && !completed) {
completed = true;
try {
child.onCompleted();
} finally {
unsubscribe();
}
}
}
}
  • 只发出指定数量的消息
IgnoreElements

不发出任何消息,但发出结束的通知,示例

1
2
3
4
Observable.from(integers).ignoreElements().subscribe(mSubscriber);

结果:
onCompleted
last

只发出最后一个消息,示例

1
2
3
4
5
Observable.from(integers).last().subscribe(mSubscriber);

结果:
onNext value: 8
onCompleted
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void onNext(T t) {
//存储最新的值
value = t;
}

public void onCompleted() {
Object o = value;
if (o == EMPTY) {
complete();
} else {
//结束时,发出最后一个消息
complete((T)o);
}
}
  • 结束时,发出最后的消息
Sample

发出一段时间内最近的消息

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
OperatorSampleWithTime.java

public void onNext(T t) {
value.set(t);
}

public void onCompleted() {
emitIfNonEmpty();
subscriber.onCompleted();
unsubscribe();
}

private void emitIfNonEmpty() {
Object localValue = value.getAndSet(EMPTY_TOKEN);
if (localValue != EMPTY_TOKEN) {
try {
@SuppressWarnings("unchecked")
T v = (T)localValue;
subscriber.onNext(v);
} catch (Throwable e) {
Exceptions.throwOrReport(e, this);
}
}
}
  • 存储最新的值,结束时发出最新的值
skip

跳过n个消息不发送,示例

1
2
3
4
5
6
7
8
Observable.from(integers).skip(5).subscribe(mSubscriber);

结果:
onNext value: 5
onNext value: 6
onNext value: 7
onNext value: 8
onCompleted
  • 源码分析
1
2
3
4
5
6
7
8
9
OperatorSkip.java

public void onNext(T t) {
if (skipped >= toSkip) {
child.onNext(t);
} else {
skipped += 1;
}
}
  • 跳过n个消息
skipLast

跳过后n个消息不发送,示例

1
2
3
4
5
6
7
8
Observable.from(integers).skipLast(5).subscribe(mSubscriber);

结果:
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 3
onCompleted
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
OperatorSkipLast.java

public void onNext(T value) {
if (count == 0) {
// If count == 0, we do not need to put value into deque
// and remove it at once. We can emit the value
// directly.
subscriber.onNext(value);
return;
}
if (deque.size() == count) {
subscriber.onNext(NotificationLite.<T>getValue(deque.removeFirst()));
} else {
request(1);
}
deque.offerLast(NotificationLite.next(value));
}
  • 使用双端队列存储消息,将前面的消息存入,当数量达到要求,从双端队列中取数据发送
Take, TakeLast

发送前n个消息

  • 源码分析,前面已有

Combining Observable

And/Then/When

通过patter和plan合并多个消息源发出的数据集,不是rxjava核心库的一部分

CombineLast

使用特定方法合并两个消息源最近的数据,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CountDownLatch countDownLatch = new CountDownLatch(1);
MySubscriber subscriber = new MySubscriber();
subscriber.setCountDownLatch(countDownLatch);
Observable.combineLatest(Observable.interval(1, TimeUnit.SECONDS).take(5), Observable.from(integers), (first, second) -> first + second).subscribe(subscriber);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

结果:
onNext value: 8
onNext value: 9
onNext value: 10
onNext value: 11
onNext value: 12
onCompleted

  • 第一个消息源一秒钟发出一个消息,第二个消息源一次性发完,因此合并消息会合并最后一个消息

  • 源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
LatestCoordinator.java

public void subscribe(Observable<? extends T>[] sources) {
Subscriber<T>[] as = subscribers;
int len = as.length;
for (int i = 0; i < len; i++) {
as[i] = new CombinerSubscriber<T, R>(this, i);
}
lazySet(0); // release array contents
actual.add(this);
actual.setProducer(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
((Observable<T>)sources[i]).subscribe(as[i]);
}
}
  • 创建多个CombinerSubscriber对应多个消息源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CombinerSubscriber.java

void combine(Object value, int index) {
if (!empty) {
if (value != null && allSourcesFinished) {
//插入最新的数据
queue.offer(combinerSubscriber, latest.clone());
} else
if (value == null && error.get() != null && (o == MISSING || !delayError)) {
done = true; // if this source completed without a value
}
} else {
done = true;
}
drain()
}

drain() {
R v;
try {
v = combiner.call(array);
} catch (Throwable ex) {
cancelled = true;
cancel(q);
a.onError(ex);
return;
}

a.onNext(v);
}
  • 收集最近的数据到SpscLinkedArrayQueue
Join

只要一个数据在另一个数据定义的时间窗口内发出,合并两个数据源发出的数据

Merge

根据时间轴合并多个数据源成一个,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CountDownLatch countDownLatch = new CountDownLatch(1);
MySubscriber subscriber = new MySubscriber();
subscriber.setCountDownLatch(countDownLatch);
Observable.merge(Observable.interval(1, TimeUnit.SECONDS).take(5), Observable.from(integers)).subscribe(subscriber);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

结果:
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 3
onNext value: 4
onNext value: 5
onNext value: 6
onNext value: 7
onNext value: 8
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 3
onNext value: 4
onCompleted
StartWith

开始发送数据前发送一系列指定的数据,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.from(integers).startWith(100).subscribe(mSubscriber);

结果:
onNext value: 100
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 3
onNext value: 4
onNext value: 5
onNext value: 6
onNext value: 7
onNext value: 8
onCompleted
  • 源码分析,其使用的是concat
1
2
3
4
5
6
7
8
9
10
OnSubscribeConcatMap.java

public void onNext(T t) {
if (!queue.offer(NotificationLite.next(t))) {
unsubscribe();
onError(new MissingBackpressureException());
} else {
drain();
}
}
  • 队列未满时,往队列里插入数据,之后调用drain消费数据
Switch

转换多个数据源为单个数据源,使用最近数据转换,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.from(integers1).switchIfEmpty(Observable.from(integers)).subscribe(mSubscriber);

结果:
onNext value: 0
onNext value: 1
onNext value: 2
onNext value: 3
onNext value: 4
onNext value: 5
onNext value: 6
onNext value: 7
onNext value: 8
onCompleted
zip

按数据合并两个数据源,示例

1
2
3
4
5
6
7
8
9
10
11
Observable<Integer> evens = Observable.just(2, 4, 6, 8, 10);
Observable<Integer> odds = Observable.just(1, 3, 5, 7, 9);
Observable.zip(evens, odds, (v1, v2) -> v1 + " + " + v2 + " is: " + (v1 + v2)).subscribe(System.out::println);

结果:

2 + 1 is: 3
4 + 3 is: 7
6 + 5 is: 11
8 + 7 is: 15
10 + 9 is: 19

Error Handling Operators

Catch

从错误中恢复,继续执行,示例

onErrorReturn

出现错误时,返回一个特定的值,示例

1
2
3
4
5
Observable.from(integers).map(integer -> integer / 0).onErrorReturn(throwable -> 0).subscribe(mSubscriber);

结果:
onNext value: 0
onCompleted
onErrorResumeNext, onExceptionResumeNext

出现错误,构建另一个消息源,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.from(integers).map(integer -> integer / 0).onErrorResumeNext(throwable -> Observable.from(integers1)).subscribe(mSubscriber);

结果:
onNext value: 0
onNext value: -1
onNext value: -2
onNext value: -3
onNext value: -4
onNext value: -5
onNext value: -6
onNext value: -7
onNext value: -8
onCompleted
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
RxJavaHooks.onError(e);
return;
}
done = true;
try {
unsubscribe();

Subscriber<T> next = new Subscriber<T>() {
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void setProducer(Producer producer) {
pa.setProducer(producer);
}
};
serial.set(next);

long p = produced;
if (p != 0L) {
pa.produced(p);
}

Observable<? extends T> resume = resumeFunction.call(e);

resume.unsafeSubscribe(next);
} catch (Throwable e2) {
Exceptions.throwOrReport(e2, child);
}
}
  • 出现错误,使用外部提供的方式产生新的消息源并发出
Retry
retry

出现错误,重新订阅并重试,示例

1
2
3
4
5
6
7
8
Observable.from(integers).map(integer -> {
int a = 1;
if (integer == a) {
throw new RuntimeException("aa");
}
return integer;
}).retry(1).subscribe(mSubscriber);

  • retry也可以提供选择器来决定是否重试
retryWhen

捕获错误,生成第二个消息源,并监听此消息源,如果此消息源发出消息,则重新订阅源消息,示例

1
2
3
4
5
6
7
8
9
10
Observable.from(integers).map(integer -> {
int a = 5;
if (integer == a) {
throw new RuntimeException("aa");
}
return integer;
}).retryWhen(observable -> observable.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
})).toBlocking().forEach(System.out::println);

Observable Utility Operators

Delay

延迟发出消息,示例

1
Observable.from(integers).delay(5, TimeUnit.SECONDS).toBlocking().forEach(System.out::println);
  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
public void onNext(final T t) {
worker.schedule(new Action0() {

@Override
public void call() {
if (!done) {
child.onNext(t);
}
}

}, delay, unit);
}
  • 线程调度延时
Do

在可观察对象生命周期发生之前调用,示例

1
2
3
Observable.from(integers).doAfterTerminate(() -> {
System.out.println("terminate");
}).subscribe(mSubscriber);
Materialize/Dematerialize

对于发出的每个消息进行包装与拆包装

1
Observable.from(integers).materialize().dematerialize().subscribe(mSubscriber);
ObserveOn

指定观察者在哪个线程观察结果

  • 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ScalarSynchronousObservable.java

public Observable<T> scalarScheduleOn(final Scheduler scheduler) {
Func1<Action0, Subscription> onSchedule;
if (scheduler instanceof EventLoopsScheduler) {
final EventLoopsScheduler els = (EventLoopsScheduler) scheduler;
onSchedule = new Func1<Action0, Subscription>() {
@Override
public Subscription call(Action0 a) {
return els.scheduleDirect(a);
}
};
} else {
onSchedule = new Func1<Action0, Subscription>() {
@Override
public Subscription call(final Action0 a) {
final Scheduler.Worker w = scheduler.createWorker();
w.schedule(new Action0() {
@Override
public void call() {
try {
a.call();
} finally {
w.unsubscribe();
}
}
});
return w;
}
};
}

return create(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
}
  • 区分EventLoopsScheduler和其他调度器。如果是EventLoopsScheduler直接调度,否则创建调度器,由调度器运行接收到的消息。解除调度器订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
OperatorObserveOn.java

public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
  • 如果调度器是ImmediateScheduler(即时调度器)或者TrampolineScheduler(串行调度器)则立即执行。否则创建ObserveOnSubscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ObserveOnSubscriber.java

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}

localChild.onNext(NotificationLite.<T>getValue(v));

currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
  • 其中有个队列(SpscAtomicArrayQueue),用于接收消息,开始循环从队列中取出消息,如果发送的消息数量和请求消息的数量一致,表示此次事件发送结束
Serialize

强制串行发送消息

SubscribeOn

指定可观察对象在哪个线程上执行

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s);
}
});
  • 开启线程调度,从当前线程中发出消息到下一级订阅者
TimeInterval

将源消息源发出的消息转换成表示消息与消息直接的时间间隔,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.from(integers).timeInterval().subscribe(mSubscriber);

结果
onNext value: TimeInterval [intervalInMilliseconds=2, value=0]
onNext value: TimeInterval [intervalInMilliseconds=1, value=1]
onNext value: TimeInterval [intervalInMilliseconds=0, value=2]
onNext value: TimeInterval [intervalInMilliseconds=0, value=3]
onNext value: TimeInterval [intervalInMilliseconds=0, value=4]
onNext value: TimeInterval [intervalInMilliseconds=0, value=5]
onNext value: TimeInterval [intervalInMilliseconds=0, value=6]
onNext value: TimeInterval [intervalInMilliseconds=0, value=7]
onNext value: TimeInterval [intervalInMilliseconds=0, value=8]
onCompleted
  • 源码分析
1
2
3
4
5
public void onNext(T args) {
long nowTimestamp = scheduler.now();
subscriber.onNext(new TimeInterval<T>(nowTimestamp - lastTimestamp, args));
lastTimestamp = nowTimestamp;
}
  • 当前时间减去上次接收到消息的时间,封装成TimeIntegerval
TimeOut

如果在一定时间内没有发出消息,发出error通知

Timestamp

给每个发出的消息带上时间戳,示例

1
Observable.from(integers).timestamp().subscribe(mSubscriber);
Using

创建可分配的资源,其与可观察对象具有一样的生命周期

Conditional and Boolean Operators

All

规定是否所有的消息发出,示例

1
2
3
4
5
Observable.from(integers).all(integer -> integer < 5).subscribe(mSubscriber);

结果:
onNext value: false
onCompleted
Amb

只发送多个消息源中最先产生消息的消息源,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.amb(Observable.from(integers).delay(2, TimeUnit.SECONDS), Observable.from(integers1).delay(1, TimeUnit.SECONDS))
.toBlocking()
.forEach(System.out::println);

结果:
0
-1
-2
-3
-4
-5
-6
-7
-8
Contain

发出的消息是否包含某消息,示例

1
2
3
4
5
Observable.from(integers).contains(1).subscribe(mSubscriber);

结果:
onNext value: true
onCompleted
DefaultIfEmpty

如果没有发出任何消息,发出默认消息,示例

1
2
3
4
5
Observable.empty().defaultIfEmpty(11).subscribe(mSubscriber);

结果:
onNext value: 11
onCompleted
SequenceEqual

两个消息源是否发出相同的消息,示例

1
2
3
4
5
Observable.sequenceEqual(Observable.from(integers), Observable.from(integers)).subscribe(mSubscriber);

结果:
onNext value: true
onCompleted
SkipUntil, TakeUntil

丢弃/发送第一个消息源在第二个消息源发出消息之前发出的消息,示例

1
2
3
4
Observable.from(integers).skipUntil(Observable.from(integers1).delay(1, TimeUnit.SECONDS)).subscribe(mSubscriber);

结果:
onCompleted
SkipWhile, TakeWhile

丢弃/发送消息直到某个条件变成false,示例

1
2
3
4
Observable.from(integers).skipWhile(integer -> integer >= 0).subscribe(mSubscriber);

结果:
onCompleted

Mathematical and Aggregate Operators

Average

发出消息的平均值,示例

1
2
3
4
MathObservable.from(Observable.from(integers)).averageInteger(integer -> integer).forEach(System.out::println);

结果:
4
Concat

拼接多个消息源,按顺序发送消息,示例

1
Observable.concat(Observable.from(integers), Observable.from(integers1)).forEach(System.out::println);
Count

统计发出消息的数量,示例

1
assertEquals(Integer.valueOf(integers.length), Observable.from(integers).count().toBlocking().single());
Max/Min

获取消息中最大/最小值,示例

1
assertEquals(integers[integers.length - 1], MathObservable.from(Observable.from(integers)).max((o1, o2) -> o1 - o2).toBlocking().single());
Reduce, Sum

对每个发出的消息做操作,发送最终的值,示例

1
2
3
4
Observable.from(integers).reduce((integer, integer2) -> integer + integer2).forEach(System.out::println);

结果:
36

Backpressure Operators

  • Buffer, Sample, Debounce, Window

Connectable Observable Operators

Connect

连接ObservableSubscribers。可以指定连接几个Subscriber。只有当所有的Subscriber都订阅了才开始发送消息。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final AtomicInteger run = new AtomicInteger();

final ConnectableObservable<Integer> co = Observable.defer(() -> Observable.just(run.incrementAndGet())).publish();

//自动连接,生成可观察对象
final Observable<Integer> source = co.autoConnect();

assertEquals(0, run.get());

TestSubscriber<Integer> ts1 = TestSubscriber.create();
//订阅了一次
source.subscribe(ts1);

ts1.assertCompleted();
ts1.assertNoErrors();
ts1.assertValue(1);

assertEquals(1, run.get());

TestSubscriber<Integer> ts2 = TestSubscriber.create();
//第二次订阅,这是无效的,因为默认只指定了一次订阅
source.subscribe(ts2);

ts2.assertNotCompleted();
ts2.assertNoErrors();
ts2.assertNoValues();

assertEquals(1, run.get());
Publish

将普通Observable转换成ConnectableObservable,示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final AtomicInteger counter = new AtomicInteger();
ConnectableObservable<String> o = Observable.create(new Observable.OnSubscribe<String>() {

@Override public void call(final Subscriber<? super String> observer) {
new Thread(() -> {
counter.incrementAndGet();
observer.onNext("one");
observer.onCompleted();
}).start();
}
}).publish();

final CountDownLatch countDownLatch = new CountDownLatch(2);
o.subscribe(v -> {
assertEquals("one", v);
countDownLatch.countDown();
});

// subscribe again
o.subscribe(v -> {
assertEquals("one", v);
countDownLatch.countDown();
});

//连接,才能收到消息
final Subscription subscription = o.connect();
try {
if (!countDownLatch.await(1000, TimeUnit.MILLISECONDS)) {
fail("subscriptions did not receive values");
}
assertEquals(1, counter.get());
} finally {
subscription.unsubscribe();
}
RefCount

ConnectTableObservable表现像普通Observable。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final AtomicInteger subscribeCount = new AtomicInteger();
final AtomicInteger nextCount = new AtomicInteger();
final Observable<Integer> observable = Observable.from(integers)
.doOnSubscribe(subscribeCount::incrementAndGet)
.doOnNext(l -> nextCount.incrementAndGet())
.publish()
.refCount();
final AtomicInteger receivedCount = new AtomicInteger();
//订阅一次,doOnNext会执行
final Subscription s1 = observable.subscribe(l -> receivedCount.incrementAndGet());
//订阅第二次, doOnNext会执行
final Subscription s2 = observable.subscribe();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
s2.unsubscribe();
s1.unsubscribe();

System.out.println("onNext Count: " + nextCount.get());

assertEquals(nextCount.get(), receivedCount.get() * 2);

assertEquals(2, subscribeCount.get());

  • 普通ObservableConnectTableObservable区别在于,普通的Observable只要订阅,就会发送消息,而ConnectTableObservable会连接几个订阅者,如连接1各订阅者,但是订阅了两次,那么后面一次是不会执行的。
Replay, Cache

确保所有的观察者看到相同的消息时序,虽然它们订阅的时候,消息源可能已经开始发送消息了。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final AtomicInteger counter = new AtomicInteger();
Observable<String> o = Observable.create(new Observable.OnSubscribe<String>() {

@Override public void call(final Subscriber<? super String> observer) {
new Thread(() -> {
counter.incrementAndGet();
System.out.println("published observable being executed");
observer.onNext("one");
observer.onCompleted();
}).start();
}
}).replay().autoConnect();

// we then expect the following 2 subscriptions to get that same value
final CountDownLatch latch = new CountDownLatch(2);

// subscribe once
o.subscribe(v -> {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
});

// subscribe again
o.subscribe(v -> {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
});

if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
fail("subscriptions did not receive values");
}
assertEquals(1, counter.get());

ref

Custom Operator

流程

基本流程图

Observable

  • 流程分析:
    1. 通过类方法create构造Observable对象。create内部使用RxJavaHook方法来创建OnSubscribe
    2. 调用subscribe(Subscriber)方法,内部会将Subscriber转换成SafeSubscriber。然后调用RxJavaHooks.onObservableStart(Observable, OnSubscribe).call(subscriber)。这其实就是使用第一步设置的OnSubscribe调用call。上诉流程中RxJavaHooks其实就是对整个流程的一些关键步骤做hook以便可以由后续操作。这是最基本的RxJava流程
通用流程图

Operator

  • 流程分析
    1. 当调用subscribe时会调用离它最近的OnSubscribe,如果是Operator的话。那就会调用最近的OnSubscribeLiftcall。这时RxJavaHooks会调用onObserveLiftcall产生新的订阅者。父类OnSubscribe会调用这个新的订阅者,并通过call将这个订阅者传给父类OnSubscribe中,在其中给子订阅者设置生产者setProducer,这时生产者会调用request。然后会在这里将消息发给子订阅者。
    2. 线程调度: OperatorSubscribeOn,将每个生产者产生的所有的数据单独放到一个Runnable当中运行。 OperatorObserveOn则是使用队列。来一个消息往队列里面插入,并要求队列开始执行。典型的多生产者但但消费者模型。事件产生使用subscribeOn来切换线程。而且只有第一个subscribeOn会生效。而事件加工和消费使用observeonOn来切换线程。影响的是后续的Subscriber

Subject

Subject即可做Observable,也可以做Observer.示例

1
2
3
4
5
6
7
8
9
10
final TestScheduler scheduler = new TestScheduler();

scheduler.advanceTimeTo(100, TimeUnit.SECONDS);
final TestSubject<Object> subject = TestSubject.create(scheduler);
final Observer observer = mock(Observer.class);
subject.subscribe(observer);
subject.onNext(1);
scheduler.triggerActions();

verify(observer, times(1)).onNext(1);
  • subject作为Observable发出事件
  • 默认有四种Subject

AsyncSubject

发出源消息的最后一个消息。必须在源消息发出complete之后。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final AsyncSubject<Object> subject = AsyncSubject.create();
final Observer observer = mock(Observer.class);
subject.subscribe(observer);

subject.onNext("first");
subject.onNext("second");
subject.onNext("third");
//如果没有发出结束信号,那么AsyncSubject不会发出任何事件。反之,会发出最后一个事件
subject.onCompleted();

//observer收到最后一个事件
verify(observer, times(1)).onNext(anyString());
verify(observer, never()).onError(new Throwable());
//收到结束信号
verify(observer, times(1)).onCompleted();

BehaviorSubject

发送默认值之后发送剩余的事件 示例

1
2
3
4
5
6
7
8
9
10
11
12
final BehaviorSubject<Object> subject = BehaviorSubject.create("default");
final Observer observerA = mock(Observer.class);
final Observer observerB = mock(Observer.class);
final Observer observerC = mock(Observer.class);

final InOrder inOrder = inOrder(observerA, observerB, observerC);

subject.subscribe(observerA);
subject.subscribe(observerB);

inOrder.verify(observerA).onNext("default");
inOrder.verify(observerB).onNext("default");
  • default事件会先于其他事件被发出

PublishSubject

发送从订阅时刻起的数据。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final PublishSubject<Object> subject = PublishSubject.create();
final Observer observer = mock(Observer.class);

subject.onNext("one");
subject.onNext("two");

//之后发送的消息,observer才能收到
subject.subscribe(observer);

subject.onNext("three");

verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, times(1)).onNext("three");

ReplaySubject

发送所有的事件,不管什么时候订阅,与PublishSubject相反。示例

1
2
3
4
5
6
7
8
9
10
11
12
13
final ReplaySubject<Object> subject = ReplaySubject.create();
final Observer observer = mock(Observer.class);

subject.onNext("one");
subject.onNext("two");

subject.subscribe(observer);

subject.onNext("three");

verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");

Rxjava2.0

待续