[viff-devel] Mystery of the quadratic running time solved?

Marcel Keller mkeller at cs.au.dk
Sun Mar 8 08:16:30 PDT 2009


> Wow, this is nice! I had sort of given up finding the cause of this :-(
> Thank you for looking at this, and just in time for my presentation at
> PKC in 10 days :-)

You're welcome. :-)

>> --- /usr/lib/python2.5/site-packages/twisted/internet/base.py	2008-07-29 22:13:54.000000000 +0200
>> +++ internet/base.py	2009-02-20 12:27:42.000000000 +0100
>> @@ -1127,17 +1127,32 @@
>>          self.startRunning(installSignalHandlers=installSignalHandlers)
>>          self.mainLoop()
>>  
>> +        
>> +    def setLoopCall(self, f, *args, **kw):
>> +        self.loopCall = (f, args, kw)
>> +
>> +
>> +    def myIteration(self, t):
>> +        # Advance simulation time in delayed event
>> +        # processors.
>> +        self.runUntilCurrent()
>> +
>> +        if (t is None):
>> +            t2 = self.timeout()
>> +            t = self.running and t2
>> +
>> +        self.doIteration(t)
>> +
>> +        if ("loopCall" in dir(self)):
>> +            f, args, kw = self.loopCall
>> +            f(*args, **kw)
>> +
>>  
>>      def mainLoop(self):
>>          while self._started:
>>              try:
>>                  while self._started:
>> -                    # Advance simulation time in delayed event
>> -                    # processors.
>> -                    self.runUntilCurrent()
>> -                    t2 = self.timeout()
>> -                    t = self.running and t2
>> -                    self.doIteration(t)
>> +                    self.myIteration(None)
>>              except:
>>                  log.msg("Unexpected error in main loop.")
>>                  log.err()
> 
> The changes above basically insert a call to self.loopCall after each
> doIteration invocation, right?
> 
> When the loopCall is process_deferred_queue the main loop becomes:
> 
>   self.runUntilCurrent()
>   self.doIteration(t)
>   runtime.process_deferred_queue
> 
>   self.runUntilCurrent()
>   self.doIteration(t)
>   runtime.process_deferred_queue
> 
>   ...

Yes, exactly.

> I think we would get the same result if we started a LoopingCall that
> executes process_deferred_queue with an interval of, say, 100 ms:
> 
>   http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html
> 
> This should work since the runUntilCurrent method runs through the
> waiting calls and will trigger our process_deferred_queue method.
> 
> And voilá -- no hacking of the Twisted source needed.

I'm not sure but LoopingCall._reschedule() looks more like it schedules 
the calls at certain tick, not as soon as possible after the interval is 
elapsed. This might not cost too much time but still doesn't feel very 
elegant. Furthermore, setting the interval very low leads to high CPU 
usage when waiting. Again, this is not too bad but not elegant either. 
The same applies if using reactor.callLater() directly.

Of course, we can avoid hacking the Twisted code by extending it within 
VIFF. Still, I'm in favor of the two-threaded solution because it's more 
elegant, doesn't have the danger of recursing too deep, and, in my 
opinion, it should be feasible.

>> diff -r e2759515f57f viff/runtime.py
>> --- a/viff/runtime.py	Thu Mar 05 21:02:57 2009 +0100
>> +++ b/viff/runtime.py	Fri Mar 06 13:43:14 2009 +0100
>> @@ -306,6 +306,8 @@
>>                  deferred = deq.popleft()
>>                  if not deq:
>>                      del self.incoming_data[key]
>> +                # just queue
>> +                self.factory.runtime.queue_deferred(deferred)
> 
> Why is this done?

At this time, we shouldn't call the callbacks because we might recurse 
into selectreactor.doSelect(). However, we want to know which deferreds 
are ready so we can call deferred.callback() later.

>> +        #: Activation depth counter
>> +        self.depth_counter = 0
> 
> This is for keeping track of the recursion depth in the future?

Yes. It was used in some debug output earlier but I removed it to 
simplify the patch.

>> +    def queue_deferred(self, deferred):
>> +        deferred.pause()
>> +        self.deferred_queue.append(deferred)
>> +
>> +    def process_deferred_queue(self):
>> +        while(self.deferred_queue):
>> +            deferred = self.deferred_queue.pop(0)
>> +            deferred.unpause()
> 
> Are you doing it this way to ensure that the Deferreds are unpaused in
> the same order as they were added to the list?

Yes. I'm not sure whether this is really necessary but it seems just 
cleaner because the callback of the some deferred might do a lot of 
computations and recurse, which unnecessarily extends the lifetime of 
the remaining deferreds.

> If that doesn't matter, then I think this would be faster:
> 
>   queue, self.deferred_queue = self.deferred_queue, []
>   map(Deferred.unpause, queue)
> 
> My idea is that looping over the list with map is faster than repeatedly
> popping items from the beginning (an O(n) operation).

But map() still would need O(n) time because that is the nature of 
calling a function n times, isn't it? Maybe the function calls are 
optimized but the code in the function still is called n times.

>> +    def activate_reactor(self):
>> +        self.activation_counter += 1
>> +
>> +        # setting the number to n makes the reactor called 
>> +        # only every n-th time
>> +        if (self.activation_counter >= 2):
>> +            self.depth_counter += 1
>> +            reactor.myIteration(0)
>> +            self.depth_counter -= 1
>> +            self.activation_counter = 0
> 
> If we remove the myIteration code above, we'll have to move the calls to
> reactor.runUntilCurrent and reactor.doIteration here.
> 
> A question springs to my mind: calling
> 
>   reactor.runUntilCurrent()
>   reactor.doIteration(0)
> 
> is the same as calling
> 
>   reactor.iterate(0)
> 
> and the documentation for that method says:
> 
>   [...] This method is not re-entrant: you must not call it recursively;
>   in particular, you must not call it while the reactor is running.
> 
>   http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.interfaces.IReactorCore.html#iterate
> 
> How does your code ensure that we only call myIteration when we're not
> in a call made by the reactor? And could we simply call reactor.iterate
> instead?

We actually call it recursively but it should be reentrant if it's not 
called from doIteration(). doIteration() is a the same as 
select.doSelect(), which certainly is not reentrant. We however call it 
from the loop call (process_deferred_queue()) after doIterate().

Calling reactor.iterate() is not enough because it doesn't call 
process_deferred_queue(). The principle is that not only the 
communication with doSelect() is done but also the callbacks with 
process_deferred_queue() are processed. Only the latter triggers further 
communication and ends the lifetime of deferreds, which frees memory.

The following graph illustrates my hack:

                           myIteration()
                          /            \
                    doSelect()     process_deferred_queue()
                        |                    |
                  communication          callbacks
                        |                    |
                 stringReceived()    mul() / open() etc.
                        |                    |
                 queue_deferred()       myIteration()
                                             |
                                            ...


More information about the viff-devel mailing list