python twisted deferToThread

要使用twisted做server部分的代码,一定离不开Protocol和Factory两个类,下面是个简单的例子:


Python

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor, defer, threads
from twisted.protocols.basic import LineReceiver
import time

class EchoProtocol(LineReceiver):
    def lineReceived(self, line):
        if line == "add":
            self.factory.status = "busy"
            self.process()
            self.sendLine('added')
        elif line == "status":
            self.sendLine("%s"%self.factory.status)

   def process(self):
        # do something which may takes a long time
        time.sleep(10)
        self.factory.status = "idle"

class EchoFactory(Factory):
    protocol = EchoProtocol
    def __init__(self):
        self.status = "idle"

if __name__ == '__main__':
    reactor.listenTCP(8087, EchoFactory())
    reactor.run()

之前也就看过几个例子,我表面上理解,我以为是每来一个tcp connection和这个server通讯,server就会启动一个EchoProtocol handler来处理这个链接,类似apache的fork模型,今天才发现根本就不是这回事,本来上面的代码的意思是,如果client(测试client发指令可以直接用telnet)发送一个add的指令,服务器做一些process函数的调用操作,然后返回added;另一个client再发送一个status指令,服务器返回当前服务器的status,busy或者idle。

按照我之前的理解,应该1:是当一个client发送add到server时,server调用process操作,这时候这个connection会一直在阻塞状态,等time.sleep完后才能返回;2:如果此时再起一个client发送status指令到server,factory会再起一个protocol处理这个请求,也就是马上返回当前factory的status给它。但是实际情况是,1是没错的,而2有问题,由于第1个connection阻塞了,导致第2个也被阻塞!结果是第1个connection返回结果后,才处理了第2个status的指令请求!

搜了一下,(这些本应该详细看文档的,twisted的文档比较大,一下子也摸不到头绪),搜到这么句话:

twisted采用的是纯事件驱动的一种模式,这意味着一个事件未处理完毕的情况下,是不能够在处理新事件的。但并不是所有事情都能马上或在短时间内做完,例如读写文件等。

这和实验结果相同,也就意味着,当一个connection的请求server没有处理完,阻塞了这个twisted server,后面的请求都被阻塞了。(这也配叫Factory?太sb了吧,我还以为用twisted能简单几行就搞定一个多线程/进程并发处理的server~)

google了下“twisted 阻塞”的问题,发现twisted有个defer.Deferred的东西,可惜大家给的例子都不是很清楚,搞了半天我也没弄明白defer的机制,捣鼓的几个就是不回调指定的函数。要弄明白估计还是要仔细读下文档。

后来找到这个函数:threads.deferToThread,可以把会阻塞server的函数(上例中的process函数)丢给这个函数去做,从函数名上应该是会起一个线程了处理process函数,而在本处理线程内就不再阻塞,下面是EchoProtocol修改后的代码,改动也就一行:


Python

class EchoProtocol(LineReceiver):
    def lineReceived(self, line):
        if line == "add":
            self.factory.status = "busy"
            #self.process()
            threads.deferToThread(self.process)
            self.sendLine('added')
        elif line == "status":
            self.sendLine("%s"%self.factory.status)

    def process(self):
        time.sleep(10)
        self.factory.status = "idle"

这样把process这个耗时的函数让deferToThread去处理,当一个client发送add指令时,server可以迅速回应,同时“默默”的启动线程调用process函数;另一个client发送status时也可以迅速得到回应,因为现在没有耗时的函数阻塞server了。所以这样的运行结果就是:开始一个client发送status指令时得到idle的结果,发送add指令时,迅速得到回应”added”,再迅速发送status指令时,得到”busy”(time.sleep的10秒之内发送多少次status的指令都会得到busy的结果),10秒后再发送status将再次得到idle的结果,因为此时process函数在sleep后把factory.status设置回了”idle”状态。

结论是1:deferToThread很好用;2:不看文档,一知半解害死人

– EOF –

0 comments:

Leave a Reply

Your email address will not be published. Required fields are marked *