twisted 开发(2)-- Deferreds 组件

󰃭 2017-03-19

Deferreds 异步回调序列

Deferred 本质上是一个回调函数的集合,twisted 提供了对函数延迟调用的机制。

在 twisted 内部,使用 Deferred 对象管理回调序列。当异步请求结果返回时,使用 Deferred 对象调用回调序列中的函数。

这里通过几个例子来了解 Deferreds 使用的方式和工作的原理

from twisted.internet import reactor, defer

def getDummyData(x):
    """
    创建一个 Deferred 对象,并返回这个对象
    """
    d = defer.Deferred()
    # 2 秒钟后执行 Deferred 回调函数序列,把 x * 3 作为参数传递给回调序列中的第一个函数
    # reactor.callLater 是一个定时延迟调用的方法
    reactor.callLater(2, d.callback, x * 3)
    return d

def printData(result):
    """
    打印结果
    """
    print result

d = getDummyData(3)

# 添加回调函数到回调函数序列中
d.addCallback(printData)

# 4 秒钟后停止 reactor 循环(退出进程)
reactor.callLater(4, reactor.stop)

# 开始 Twisted reactor 事件循环
reactor.run()

使用 Deferred 的方式:

  1. d = defer.Deferred(), 创建 Deferred 对象
  2. d.addCallback(printData), 添加回调函数,可以添加多个函数到回调序列中
  3. d.callback(result),回调序列开始执行(使用result作为参数)

在 Deferred 中,d.addCallbacks/a.addBoth 等函数都可以添加回调函数到序列中。还可以添加 errback(错误回调函数)在回调函数出错时进行处理

Deferred 的内部机制

虽然调用 addCallback 的时候只传入参数 callback, 但是回调序列中 callback/errback 总是成对出现的。twisted 内部的源代码如下

def addCallbacks(self, callback, errback=None,
            callbackArgs=None, callbackKeywords=None,
            errbackArgs=None, errbackKeywords=None):
         """
         Add a pair of callbacks (success and error) to this L{Deferred}.
         These will be executed when the 'master' callback is run.
         @return: C{self}.
         @rtype: a L{Deferred}
         """
         assert callable(callback)
         assert errback == None or callable(errback)

         # 主要是这里成对添加 callback/errback
         cbs = ((callback, callbackArgs, callbackKeywords),
                (errback or (passthru), errbackArgs, errbackKeywords))
         self.callbacks.append(cbs)
 
         if self.called:
             self._runCallbacks()
         return self

当回调序列中的函数发生异常,或者返回twisted.python.failure.Failure 实例时, 下一个callback/errback回调函数对中的错误回调函数会被调用。

错误回调函数的例子:

#!/usr/bin/env python
# coding: utf-8

from twisted.internet import defer, reactor
from pprint import pprint

def cback1(result):
    raise Exception('cback1')

def cerr1(failure):
    print 'cerr1: %s' % str(failure)

def cback2(result):
    return 'succ cback2'

def cerr2(failure):
    print 'cerr2: %s' % str(failure)


d = defer.Deferred()
d.addCallbacks(callback=cback1, errback=cerr1)
d.addCallbacks(callback=cback2, errback=cerr2)
d.addCallback(callback=lambda ign:pprint('cback3'))

# 执行回调函数序列
d.callback('run')

# 退出事件循环
reactor.callLater(2, reactor.stop)
reactor.run()

运行结果

user@host:~/test $ python testErr.py
cerr2: [Failure instance: Traceback: <type 'exceptions.Exception'>: cback1
testErr.py:26:<module>
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:382:callback
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:490:_startRunCallbacks
--- <exception caught here> ---
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:577:_runCallbacks
testErr.py:8:cback1
]
'cback3'

从例子中看运行机制:

  1. 在回调序列执行过程中,当前回调函数的返回值就是下一个回调函数的第一个参数,d.callback(‘run’) 使用字符串’run’作为第一个回调函数的参数
  2. 当回调函数 cback1 出现异常,下一个错误处理函数 cerr2 被调用
  3. 当 cerr2 返回后,如果返回值不是 twisted.python.failure.Failure, 调用将转回到下个 callback ( cback3 )函数,如果返回值是 Failure 对象,则继续调用下一个的错误处理函数

Deferred 结构图

Deferred 调用图

Deferred 回调函数的参数传递和回调函数内异步操作方法

添加到 Deferred 中的回调函数可以传入参数,并且在回调函数内可以执行异步操作(异步网络请求)

下面是一个从 京东 获取书籍信息的例子: 先从搜索的列表页获取一个书籍信息,再从详情页获取详细信息

#!/usr/bin/env python
# coding: utf-8

import sys
import bs4 as bs

from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.python.util import println
from twisted.internet import defer

class Goods(object):
    '''商品信息分析类'''
    _urlFmt='http://search.jd.com/search?keyword=%s&enc=utf-8&rt=1&book=y&area=1&wtype=1'

    _name=''
    _id=''
    _price=0
    _detail=''
    _detailUrl=''

    def __init__(self, keyword):
        self._url = self._urlFmt % keyword

    def getGoodsUrl(self):
        return self._url

    def getGoodsDetailUrl(self):
        return self._detailUrl

    def getGoods(self, page):
        soup = bs.BeautifulSoup(page)

        # 获取商品名称,详情页url
        infos = soup.find_all(class_='info')
        info = infos[0].select('dt.p-name > a')[0]
        self._name = info.get_text().strip()
        self._detailUrl = info['href']

    def getGoodsDetail(self, page):
        #print page
        soup = bs.BeautifulSoup(page)

        # 获取价格
        price = soup.select('#summary-price strong')[0]
        self._price = price.get_text().strip()

        # 获取详细信息
        details = soup.select('#product-detail-1')[0].strings
        self._detail = ''.join(details)
        return True

    def printResult(self, IsPrint=True):
        if IsPrint:
            print self._name
            print self._price
            print self._detailUrl
            print ' ' * 10, self._detail

def main():
    errback=lambda error:(println("an error occurred", error),reactor.stop())
    def goodsCallback(page, goods):
        ''''''
        #print page
        goods.getGoods(page)
        d2 = getPage(goods.getGoodsDetailUrl())
        d2.addCallbacks(callback=goods.getGoodsDetail, errback=errback)
        return d2

    # 根据关键词创建商品信息对象
    goods = Goods(sys.argv[1])

    # 异步查询商品信息
    d1 = getPage(goods.getGoodsUrl())

    # 添加获取商品信息的回调函数到 defer 对象
    d1.addCallbacks(callback=goodsCallback, errback=errback, callbackArgs=[goods])
    d1.addCallbacks(callback=lambda successed:(goods.printResult(successed)), errback=errback)

    # 结束主事件监控循环
    d1.addCallbacks(callback=lambda ign: reactor.stop(), errback=errback)

    reactor.run()


if __name__ == '__main__':
    main()

运行例子程序:

python goodsJD.py 机器学习

例子中 Deferred 对象的使用:

  1. d.addCallbacks 的 callbackArgs 参数接收一个 list 作为添加的回调函数的参数,errbackArgs 也是一个 list, 作为添加的错误处理函数的参数
  2. 在回调序列中的执行过程中,如果回调函数返回了一个 Deferred 对象,例如上面的 goodsCallback 函数。在新返回的 d2 回调序列被执行完之前,d1的调用序列会暂停执行,在 d2 被执行完之后,d1 回调序列才会继续执行。(这个功能对于嵌套的异步请求很重要,例如我有三个请求A,B,C, 这三个请求有依赖关系,必须根据 A 的响应发送 B 请求,根据 B 的响应发送 C 请求)

DeferredList 对象的使用

有时候需要等待几个异步事件完成,然后再执行某些任务。这时候 DeferredList 就可以发挥作用了。

看官网的一个例子,我修改并注释一些代码

#!/usr/bin/env python
# coding: utf-8

from twisted.internet import defer


# DeferredList 回调函数,参数是它包含的 Deferred 执行后的结果组成的 list
def printResult(result):
    for (success, value) in result:
        if success:
            print 'Success:', value
        else:
            print 'Failure:', value.getErrorMessage()

# deferred1 的回调函数
def printDeferred(result):
    print 'deferred1: %s' % result
    return 'deferred1: %s' % result

# 创建三个新的 Deferred
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# 打包三个 Deferred 对象到 DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# 添加回调函数到 DeferredList,
# 当它包含的 Deferred 都执行完后,将执行 DeferredList 这个调用序列
dl.addCallback(printResult)

# 注意这个回调函数添加的位置,
# 因为在 defer.DeferredList 后面,所以这个回调将不影响 DeferredList 结果
# 只有添加在 defer.DeferredList 之前,结果才会传递给 defer.DeferredList 的回调序列
deferred1.addCallback(printDeferred)

# 调用三个回调序列
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

运行结果

user@host ~/test/twisted $ python testDeferredList.py
deferred1: one
Success: one
Failure: bang!
Success: three

DeferredList 运行需要注意的地方

  1. DeferredList 打包的回调序列 deferred1,deferred2,deferred3,这几个序列中的回调函数要在打包之前(创建DeferredList对象之前)添加。(我例子中添加的位置是不正确的!!!)
  2. DeferredList 的回调函数接收的参数是个 list, list 中的每个元素是 (succ, result),表示 deferred1,deferred2,deferred3 执行成功的标志和最后结果