工作中涉及到的数据,其数量越来越大。这就是说,我用 Python 写的工具,需要更高的性能。今天琢磨着把小工具的性能一下子提升了一个数量级,每秒能发出、接收和处理 800 次请求。所谓「事无好坏,过则成灾」,800 QPS 虽然不至于将线上服务压垮,但是多少会对线上服务造成一定的影响(比如响应时间显著延长)。这是不好的。
为了防止架构组的同学过来撕,有必要给我的小工具加上 EVA 的拘束器,限制一下 QPS。
思路
Y 是 QA 组的同学,开发了一系列的平台,对此比较有经验。和 Y 的讨论中,他提出,应该实时监控远程服务的响应时间,动态调整发出请求的速度和工作的进程数量,将 QPS 固定住。
从研究的角度来说,这是一个比较精确的思路。但是要给上述工具加上这样一层壳,工作量还是有一些的。这样,不符合补丁「短平快」的要求。
为了找到更合适的方案,我重新理了一下需求:我只需要给 QPS 设置一个上限,防止压力过大影响线上服务就可以了,并不需要将 QPS 固定在定值上。这样一来,动态监控远程服务的响应时间,就变得不重要了。
那么怎样解决这个问题呢?
让我们先回顾一下 QPS 的含义。所谓 QPS,就是单位时间($\tau$)的请求次数($k$)。限制 QPS,就是要限制单位时间 $\tau$ 内的请求次数 $k$。换而言之,**我们需要保证,在接踵而来的请求中,任意连续的 $k+1$ 个请求,其首尾的时间间隔都严格大于 $\tau$**。
因此,我们需要用一个数据结构,顺序保存最近的 $k$ 个请求发生的时间戳。当第 $k + 1$ 个请求来临时,我们需要对比它和保存的时间戳中的最早者之间的时间间隔 $t$。
- 如果 $t > \tau$,那么对该请求放行,将当前时间戳保存到数据结构并淘汰掉最早的那个时间戳;
- 如果 $t \leqslant \tau$,那么抛弃或阻塞该请求,等待下一次调用。
算法浅显易懂,而实现起来,我们只需要一个固定长度的双向队列即可。
collections
库中的 deque
在 Python 的 list
基础上加上一些限制条件,当然可以满足我们的需要。不过 list
是用线性表实现的;而在我们的环境里,需要频繁快速地进行元素的插入和删除操作。所以,我们对选用 list
作为基础数据结构需要慎重考虑。
Python 的标准库 collections
是一组高性能的容器,其中提供了 deque
的双向队列。deque
使用链表实现的,因此能够在 O(1)
时间复杂度内进行队列首尾的插入和删除操作。从性能上说,deque
在我们的问题里,完胜 list
。
插入和删除首尾的元素
deque
提供了简便的方法,在队列首尾增减元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| In [1]: from collections import deque
In [2]: q = deque(['a', 'b', 'c'])
In [3]: q.append('x')
In [4]: q.appendleft('y')
In [5]: q Out[5]: deque(['y', 'a', 'b', 'c', 'x'])
In [6]: q.pop() Out[6]: 'x'
In [7]: q Out[7]: deque(['y', 'a', 'b', 'c'])
In [8]: q.popleft() Out[8]: 'y'
In [9]: q Out[9]: deque(['a', 'b', 'c'])
|
限制长度
在初始化一个 deque
的时候,可以指定它的最大长度(maxlen
)。这个长度一旦指定,就是该 deque
实例的只读属性。当实例的长度达到设定的最大长度后
- 执行
append(element)
会先执行 popleft()
删除队首元素,再将 element
加在队尾;
- 执行
appendleft(element)
会先执行 pop()
删除队尾元素,再将 element
加在队首。
这真是大快人心的大好事啊,完美符合我们的需求。
轮转
deque
还提供了一个有趣的成员函数,名字是 rotate(step)
。其作用是:
- 当
step
是正整数时,从队首到队尾,循环挪动 step
步;
- 当
step
是负整数时,从队尾到队首,循环挪动 step
步。
通过 rotate
函数,我们可以实现一个有趣是跑马灯程序。
marquee.py1 2 3 4 5 6 7 8 9 10 11
| import sys import time from collections import deque
marquee = deque('>>-----' * 10)
while True: print '\r%s' % ''.join(marquee), marquee.rotate(1) sys.stdout.flush() time.sleep(0.04)
|
实现频率限制器
有了上面的分析,又有了 deque
这一好用的工具,实现一个这样的限制器,是很容易的。我们将它封装成一个类,方便调用。
rate_limiter.py1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from collections import deque import time
class RateLimiter: def __init__(self, maxCount = 5, timeUnit = 1): self.timeUnit = timeUnit self.deque = deque (maxlen = maxCount)
def __call__(self): if self.deque.maxlen == len(self.deque): cTime = time.time() if cTime - self.deque[0] > self.timeUnit: self.deque.append (cTime) return False else: return True self.deque.append (time.time()) return False
if __name__ == '__main__': import random
checkLim = RateLimiter(maxCount = 8)
for i in xrange(50): rTime = random.uniform(0, 0.2) time.sleep(rTime) print i, "block[%f]" % (rTime) if checkLim() else "pass[%f]" % (rTime)
|
代码很简单,唯一可能有趣的是 __call__
这个函数。它和一般的函数在功能上没有什么不同,但是有特别的作用:让 RateLimiter
这个类的实例成为可以调用的函数。这就是为什么我们可以用 if checkLim()
的原因。
代码的输出如下(由于加入了随机数,在你的计算机上执行可能不会完全相同),pass
表示被放行,block
表示频率过快,受到限制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| 0 pass[0.109310] 1 pass[0.090743] 2 pass[0.038481] 3 pass[0.086077] 4 pass[0.101313] 5 pass[0.173419] 6 pass[0.159947] 7 pass[0.135079] 8 block[0.011212] 9 block[0.070294] 10 block[0.029171] 11 block[0.018660] 12 pass[0.080185] 13 pass[0.143896] 14 pass[0.191853] 15 pass[0.188746] 16 pass[0.198628] 17 pass[0.123896] 18 pass[0.144908] 19 pass[0.107941] 20 pass[0.080700] 21 pass[0.006529] 22 pass[0.171683] 23 block[0.040397] 24 pass[0.187498] 25 block[0.047226] 26 pass[0.124452] 27 pass[0.103683] 28 pass[0.153917] 29 block[0.021065] 30 pass[0.196360] 31 pass[0.042572] 32 pass[0.093290] 33 pass[0.113496] 34 pass[0.178457] 35 pass[0.079711] 36 pass[0.129189] 37 block[0.025027] 38 pass[0.130345] 39 block[0.037810] 40 block[0.020917] 41 pass[0.171373] 42 pass[0.031104] 43 pass[0.089182] 44 pass[0.175822] 45 pass[0.137213] 46 block[0.018022] 47 block[0.003023] 48 pass[0.167048] 49 pass[0.120170]
|