twisted 开发(2)-- Deferreds 组件
2017-03-19
Deferreds 异步回调序列
Deferred 本质上是一个回调函数的集合,twisted 提供了对函数延迟调用的机制。
在 twisted 内部,使用 Deferred 对象管理回调序列。当异步请求结果返回时,使用 Deferred 对象调用回调序列中的函数。
这里通过几个例子来了解 Deferreds 使用的方式和工作的原理
from twisted.internet import reactor, defer
def getDummyData(x):
"""
创建一个 Deferred 对象,并返回这个对象
"""
d = defer.Deferred()
# 2 秒钟后执行 Deferred 回调函数序列,把 x * 3 作为参数传递给回调序列中的第一个函数
# reactor.callLater 是一个定时延迟调用的方法
reactor.callLater(2, d.callback, x * 3)
return d
def printData(result):
"""
打印结果
"""
print result
d = getDummyData(3)
# 添加回调函数到回调函数序列中
d.addCallback(printData)
# 4 秒钟后停止 reactor 循环(退出进程)
reactor.callLater(4, reactor.stop)
# 开始 Twisted reactor 事件循环
reactor.run()
使用 Deferred 的方式:
- d = defer.Deferred(), 创建 Deferred 对象
- d.addCallback(printData), 添加回调函数,可以添加多个函数到回调序列中
- d.callback(result),回调序列开始执行(使用result作为参数)
在 Deferred 中,d.addCallbacks/a.addBoth 等函数都可以添加回调函数到序列中。还可以添加 errback(错误回调函数)在回调函数出错时进行处理
Deferred 的内部机制
虽然调用 addCallback 的时候只传入参数 callback, 但是回调序列中 callback/errback 总是成对出现的。twisted 内部的源代码如下
def addCallbacks(self, callback, errback=None,
callbackArgs=None, callbackKeywords=None,
errbackArgs=None, errbackKeywords=None):
"""
Add a pair of callbacks (success and error) to this L{Deferred}.
These will be executed when the 'master' callback is run.
@return: C{self}.
@rtype: a L{Deferred}
"""
assert callable(callback)
assert errback == None or callable(errback)
# 主要是这里成对添加 callback/errback
cbs = ((callback, callbackArgs, callbackKeywords),
(errback or (passthru), errbackArgs, errbackKeywords))
self.callbacks.append(cbs)
if self.called:
self._runCallbacks()
return self
当回调序列中的函数发生异常,或者返回twisted.python.failure.Failure 实例时, 下一个callback/errback回调函数对中的错误回调函数会被调用。
错误回调函数的例子:
#!/usr/bin/env python
# coding: utf-8
from twisted.internet import defer, reactor
from pprint import pprint
def cback1(result):
raise Exception('cback1')
def cerr1(failure):
print 'cerr1: %s' % str(failure)
def cback2(result):
return 'succ cback2'
def cerr2(failure):
print 'cerr2: %s' % str(failure)
d = defer.Deferred()
d.addCallbacks(callback=cback1, errback=cerr1)
d.addCallbacks(callback=cback2, errback=cerr2)
d.addCallback(callback=lambda ign:pprint('cback3'))
# 执行回调函数序列
d.callback('run')
# 退出事件循环
reactor.callLater(2, reactor.stop)
reactor.run()
运行结果
user@host:~/test $ python testErr.py
cerr2: [Failure instance: Traceback: <type 'exceptions.Exception'>: cback1
testErr.py:26:<module>
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:382:callback
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:490:_startRunCallbacks
--- <exception caught here> ---
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:577:_runCallbacks
testErr.py:8:cback1
]
'cback3'
从例子中看运行机制:
- 在回调序列执行过程中,当前回调函数的返回值就是下一个回调函数的第一个参数,d.callback(‘run’) 使用字符串’run’作为第一个回调函数的参数
- 当回调函数 cback1 出现异常,下一个错误处理函数 cerr2 被调用
- 当 cerr2 返回后,如果返回值不是 twisted.python.failure.Failure, 调用将转回到下个 callback ( cback3 )函数,如果返回值是 Failure 对象,则继续调用下一个的错误处理函数
Deferred 结构图
Deferred 调用图
Deferred 回调函数的参数传递和回调函数内异步操作方法
添加到 Deferred 中的回调函数可以传入参数,并且在回调函数内可以执行异步操作(异步网络请求)
下面是一个从 京东 获取书籍信息的例子: 先从搜索的列表页获取一个书籍信息,再从详情页获取详细信息
#!/usr/bin/env python
# coding: utf-8
import sys
import bs4 as bs
from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.python.util import println
from twisted.internet import defer
class Goods(object):
'''商品信息分析类'''
_urlFmt='http://search.jd.com/search?keyword=%s&enc=utf-8&rt=1&book=y&area=1&wtype=1'
_name=''
_id=''
_price=0
_detail=''
_detailUrl=''
def __init__(self, keyword):
self._url = self._urlFmt % keyword
def getGoodsUrl(self):
return self._url
def getGoodsDetailUrl(self):
return self._detailUrl
def getGoods(self, page):
soup = bs.BeautifulSoup(page)
# 获取商品名称,详情页url
infos = soup.find_all(class_='info')
info = infos[0].select('dt.p-name > a')[0]
self._name = info.get_text().strip()
self._detailUrl = info['href']
def getGoodsDetail(self, page):
#print page
soup = bs.BeautifulSoup(page)
# 获取价格
price = soup.select('#summary-price strong')[0]
self._price = price.get_text().strip()
# 获取详细信息
details = soup.select('#product-detail-1')[0].strings
self._detail = ''.join(details)
return True
def printResult(self, IsPrint=True):
if IsPrint:
print self._name
print self._price
print self._detailUrl
print ' ' * 10, self._detail
def main():
errback=lambda error:(println("an error occurred", error),reactor.stop())
def goodsCallback(page, goods):
''''''
#print page
goods.getGoods(page)
d2 = getPage(goods.getGoodsDetailUrl())
d2.addCallbacks(callback=goods.getGoodsDetail, errback=errback)
return d2
# 根据关键词创建商品信息对象
goods = Goods(sys.argv[1])
# 异步查询商品信息
d1 = getPage(goods.getGoodsUrl())
# 添加获取商品信息的回调函数到 defer 对象
d1.addCallbacks(callback=goodsCallback, errback=errback, callbackArgs=[goods])
d1.addCallbacks(callback=lambda successed:(goods.printResult(successed)), errback=errback)
# 结束主事件监控循环
d1.addCallbacks(callback=lambda ign: reactor.stop(), errback=errback)
reactor.run()
if __name__ == '__main__':
main()
运行例子程序:
python goodsJD.py 机器学习
例子中 Deferred 对象的使用:
- d.addCallbacks 的 callbackArgs 参数接收一个 list 作为添加的回调函数的参数,errbackArgs 也是一个 list, 作为添加的错误处理函数的参数
- 在回调序列中的执行过程中,如果回调函数返回了一个 Deferred 对象,例如上面的 goodsCallback 函数。在新返回的 d2 回调序列被执行完之前,d1的调用序列会暂停执行,在 d2 被执行完之后,d1 回调序列才会继续执行。(这个功能对于嵌套的异步请求很重要,例如我有三个请求A,B,C, 这三个请求有依赖关系,必须根据 A 的响应发送 B 请求,根据 B 的响应发送 C 请求)
DeferredList 对象的使用
有时候需要等待几个异步事件完成,然后再执行某些任务。这时候 DeferredList 就可以发挥作用了。
看官网的一个例子,我修改并注释一些代码
#!/usr/bin/env python
# coding: utf-8
from twisted.internet import defer
# DeferredList 回调函数,参数是它包含的 Deferred 执行后的结果组成的 list
def printResult(result):
for (success, value) in result:
if success:
print 'Success:', value
else:
print 'Failure:', value.getErrorMessage()
# deferred1 的回调函数
def printDeferred(result):
print 'deferred1: %s' % result
return 'deferred1: %s' % result
# 创建三个新的 Deferred
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()
# 打包三个 Deferred 对象到 DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)
# 添加回调函数到 DeferredList,
# 当它包含的 Deferred 都执行完后,将执行 DeferredList 这个调用序列
dl.addCallback(printResult)
# 注意这个回调函数添加的位置,
# 因为在 defer.DeferredList 后面,所以这个回调将不影响 DeferredList 结果
# 只有添加在 defer.DeferredList 之前,结果才会传递给 defer.DeferredList 的回调序列
deferred1.addCallback(printDeferred)
# 调用三个回调序列
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')
运行结果
user@host ~/test/twisted $ python testDeferredList.py
deferred1: one
Success: one
Failure: bang!
Success: three
DeferredList 运行需要注意的地方
- DeferredList 打包的回调序列 deferred1,deferred2,deferred3,这几个序列中的回调函数要在打包之前(创建DeferredList对象之前)添加。
(我例子中添加的位置是不正确的!!!)
- DeferredList 的回调函数接收的参数是个 list, list 中的每个元素是 (succ, result),表示 deferred1,deferred2,deferred3 执行成功的标志和最后结果