[viff-devel] Two-threaded VIFF

Martin Geisler mg at lazybytes.net
Wed Apr 22 02:21:43 PDT 2009


Marcel Keller <mkeller at cs.au.dk> writes:

> Hi friends of VIFF,
>
> I've finally completed the patch for a two-threaded VIFF where most of
> the VIFF code runs in a separate thread. The patch is against the tip
> of my repository: http://hg.viff.dk/mkeller

Okay. Your clone is about 70 commits and 6 weeks behind the current
version. There has been some changes in the ShareExchanger code since
which your code also touches. You should therefore pull in the latest
code and merge/rebase your patch.

But also, please look at splitting this up into smaller pieces. There
are a number of changes which do not logically belong together and
which should therefore be put into separate patches. Quoting from a
post on the Mercurial mailinglist:

  http://www.selenic.com/pipermail/mercurial-devel/2008-August/007632.html

  First, the probability of a patch being acceptable is the *product*
  of the probability of each individual feature being acceptable.
  Which means it's a good idea to submit each feature separately.

  Second, the probability of there being a bug in the patch is the sum
  of there being a bug in each feature. Which means you want each
  feature in a separate commit for locating regressions.

  Third, the difficulty of reviewing a patch increases more than
  linearly with the number of features included. Which means we really
  don't want to review patches with more than one feature.

In this case we're changing the basic design of how things are
processed in VIFF -- we're introducing threads and locks, which are
traditionally the source of many funny errors. Therefore we need some
good documentation for this:

* what is waiting on what?

* where can we deadlock -- how do we avoid this?

* what are the hidden assumptions (I saw that putting (None, None)
  into the deferred_queue signals that the thread should stop?

* maybe more :-)

> It turned out be not so straight-forward as I thought. I had to use
> a recursion, as in the hack, but I refined it to ensure that the
> recursion limit isn't exceeded.

Please document this somewhere, maybe in doc/runtime.txt or maybe a
new document which describes the design. There is a saying that if it
was hard to write the code, then it will be even harder to read it
afterwards. Therefore, if you put all your cleverness into writing the
code, you will not be able to understand it when reading it again :-)

> Benchmarks:
> Unfortunately, this solution is slower than the hack, e.g. one AES
> block encryption takes 4 seconds compared to 3 seconds with the hack.

The hack used a single thread but pumped messages out quicker by
directly calling the send-stuff function in the reactor, right? Then
it's no big surprice that it was faster since it had no locking to do.

> On the other hand, the preprocessing time in the actively secure
> multiplication is linear and not quadratic, whereas the online time is
> significantly larger:
>
> 	two-threaded		hack			original
> (n,t)	online	preprocessing	online	preprocessing	online	preproc.
> (4,1)	6	22		4	17		4	20
> (7,2)	10	37		6	29		6	42
> (10,3)	13	53		8	42		8	82
> (13,4)	17	68		10	56		10	136
> (16,5)	20	84		12	68		12	208
> (19,6)	23	106		13	83		14	287
> (22,7)	26	120		15	98		17	377

It is a shame that things become slower, but maybe that is the price
we have to pay for this more elaborate design.

> I did some profiling and didn't find an obvious reason why the
> two-thread is slower. Therefore, I guess that the reason is the
> multi-threading implementation of Python (which could be better, as
> mentioned in the discussion about the hack). The guess is also
> supported by the fact that having an own thread for every callback,
> which I also tried, turned out to be really slow.
>
> Unit tests:
> All unit test get passed, even the previosly skipped
> test_multiple_callbacks.

Excellent!

> This because I added the @increment_pc decorator to
> schedule_callback(). This of course changes the program counters
> heavily but I didn't experience any problems.
>
> Best regards,
> Marcel
> diff -r e2759515f57f apps/aes.py
> --- a/apps/aes.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/apps/aes.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -82,7 +82,7 @@
>          rt.shutdown()
>  
>      g = gather_shares(opened_ciphertext)
> -    g.addCallback(fin)
> +    rt.schedule_callback(g, fin)
>  
>  def share_key(rt):
>      key =  []
> diff -r e2759515f57f apps/benchmark.py
> --- a/apps/benchmark.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/apps/benchmark.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -91,6 +91,7 @@
>      print "Total time used: %.3f sec" % (stop-start)
>      print "Time per %s operation: %.0f ms" % (what, 1000*(stop-start) / count)
>      print "*" * 6
> +    sys.stdout.flush()

This looks like an unrelated change.

>  
>  
>  operations = ["mul", "compToft05", "compToft07", "eq"]
> @@ -174,7 +175,7 @@
>              # run with no preprocessing. So they are quite brittle.
>              if self.operation == operator.mul:
>                  key = ("generate_triples", (Zp,))
> -                desc = [(i, 1, 0) for i in range(2, 2 + count)]
> +                desc = [(2, 2, 2 * count + 1, 2, i, 1, 0) for i in range(1, count + 1)]
>                  program_desc.setdefault(key, []).extend(desc)
>              elif isinstance(self.rt, ComparisonToft05Mixin):
>                  key = ("generate_triples", (GF256,))
> @@ -228,7 +229,8 @@
>          if seconds > 0:
>              print "Starting test in %d" % seconds
>              sys.stdout.flush()
> -            reactor.callLater(1, self.countdown, None, seconds - 1)
> +            time.sleep(1)
> +            self.countdown(None, seconds - 1)
>          else:
>              print "Starting test now"
>              sys.stdout.flush()
> @@ -255,6 +257,7 @@
>      def run_test(self, _):
>          c_shares = []
>          record_start("parallel test")
> +        sys.stdout.flush()
>          while self.a_shares and self.b_shares:
>              a = self.a_shares.pop()
>              b = self.b_shares.pop()
> diff -r e2759515f57f apps/millionaires.py
> --- a/apps/millionaires.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/apps/millionaires.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -97,10 +97,10 @@
>          # the argument (which is None since self.results_ready does
>          # not return anything), so we throw it away using a lambda
>          # expressions which ignores its first argument.
> -        results.addCallback(lambda _: runtime.synchronize())
> +        runtime.schedule_callback(results, lambda _: runtime.synchronize())
>          # The next callback shuts the runtime down, killing the
>          # connections between the players.
> -        results.addCallback(lambda _: runtime.shutdown())
> +        runtime.schedule_callback(results, lambda _: runtime.shutdown())

Please put the foo.addCallback -> runtime.schedule_callback(foo,...)
changes into their own patch.

>      def results_ready(self, results):
>          # Since this method is called as a callback above, the results
> diff -r e2759515f57f viff/active.py
> --- a/viff/active.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/viff/active.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -501,6 +501,7 @@
>          result = Share(self, share_x.field)
>          # This is the Deferred we will do processing on.
>          triple = self.get_triple(share_x.field)
> +        triple.addCallback(gather_shares)

Why, what does this change do? I think you explained it to me once,
but I don't remember and surely the next guy who looks at the code
wont remember either.

>          self.schedule_callback(triple, finish_mul)
>          # We add the result to the chains in triple.
>          triple.chainDeferred(result)
> diff -r e2759515f57f viff/aes.py
> --- a/viff/aes.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/viff/aes.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -374,9 +374,9 @@
>                  trigger.addCallback(progress, i, time.time())
>  
>                  if (i < self.rounds - 1):
> -                    self.runtime.schedule_callback(trigger, round, state, i + 1)
> +                    self.runtime.schedule_complex_callback(trigger, round, state, i + 1)
>                  else:
> -                    self.runtime.schedule_callback(trigger, final_round, state)
> +                    self.runtime.schedule_complex_callback(trigger, final_round, state)
>  
>              prep_progress(i, start_round)
>  
> diff -r e2759515f57f viff/equality.py
> --- a/viff/equality.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/viff/equality.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -74,7 +74,7 @@
>                  xj = (-1) * (1/Zp(2)) * (bj - 1)
>              return xj
>  
> -        x = [cj.addCallback(finish, bj) for cj, bj in zip(c, b)]
> +        x = [self.schedule_callback(cj, finish, bj) for cj, bj in zip(c, b)]
>  
>          # Take the product (this is here the same as the "and") of all
>          # the x'es
> diff -r e2759515f57f viff/passive.py
> --- a/viff/passive.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/viff/passive.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -98,7 +98,7 @@
>                          d = Share(self, share.field, (share.field(peer_id), share))
>                      else:
>                          d = self._expect_share(peer_id, share.field)
> -                        self.schedule_callback(d, lambda s, peer_id: (s.field(peer_id), s), peer_id)
> +                        d.addCallback(lambda s, peer_id: (s.field(peer_id), s), peer_id)

Unrelated change, but a good one :-)

>                      deferreds.append(d)
>                  return recombine(deferreds)
>  
> diff -r e2759515f57f viff/runtime.py
> --- a/viff/runtime.py	Thu Mar 05 21:02:57 2009 +0100
> +++ b/viff/runtime.py	Tue Apr 21 17:25:45 2009 +0200
> @@ -39,9 +39,10 @@
>  from collections import deque
>  
>  from viff.field import GF256, FieldElement
> -from viff.util import wrapper, rand, deep_wait, track_memory_usage
> +from viff.util import wrapper, rand, deep_wait, track_memory_usage, \
> +     clone_deferred
>  
> -from twisted.internet import reactor
> +from twisted.internet import reactor, selectreactor
>  from twisted.internet.task import LoopingCall
>  from twisted.internet.error import ConnectionDone, CannotListenError
>  from twisted.internet.defer import Deferred, DeferredList, gatherResults, succeed
> @@ -49,6 +50,10 @@
>  from twisted.internet.protocol import ReconnectingClientFactory, ServerFactory
>  from twisted.protocols.basic import Int16StringReceiver
>  
> +from Queue import Queue, Empty
> +from threading import Lock, Event
> +import sys
> +
>  
>  class Share(Deferred):
>      """A shared number.
> @@ -78,6 +83,7 @@
>          self.field = field
>          if value is not None:
>              self.callback(value)
> +        self.priority = 0
>  
>      def __add__(self, other):
>          """Addition."""
> @@ -212,14 +218,19 @@
>  
>          for index, share in enumerate(shares):
>              share.addCallbacks(self._callback_fired, self._callback_fired,
> -                               callbackArgs=(index, True),
> +                               callbackArgs=(index, True, share.priority),
>                                 errbackArgs=(index, False))
>  
> -    def _callback_fired(self, result, index, success):
> +    def _callback_fired(self, result, index, success, priority=0):
>          self.results[index] = (success, result)
>          self.missing_shares -= 1
>          if not self.called and self.missing_shares == 0:
> -            self.callback(self.results)
> +            if self.priority >= priority:
> +                self.callback(self.results)
> +            else:
> +                self.pause()
> +                self.callback(self.results)
> +                self.runtime.deferred_queue.put((self, None))
>          return result
>  
>  
> @@ -266,6 +277,9 @@
>          self.lost_connection = Deferred()
>          #: Data expected to be received in the future.
>          self.incoming_data = {}
> +        #: Lock to protect :attr:`incoming_data`.
> +        self.incoming_lock = Lock()
> +        self.activation_counter = 0
>  
>      def connectionMade(self):
>          self.sendString(str(self.factory.runtime.id))
> @@ -301,21 +315,32 @@
>              program_counter, data_type, data = marshal.loads(string)
>              key = (program_counter, data_type)
>  
> +            self.incoming_lock.acquire()
>              deq = self.incoming_data.setdefault(key, deque())
>              if deq and isinstance(deq[0], Deferred):
>                  deferred = deq.popleft()
>                  if not deq:
>                      del self.incoming_data[key]
> +                self.incoming_lock.release()
> +                # queue deferred
> +                deferred.pause()
>                  deferred.callback(data)
> +                self.factory.runtime.deferred_queue.put((deferred, program_counter))
>              else:
>                  deq.append(data)
> +                self.incoming_lock.release()
>  
>              # TODO: marshal.loads can raise EOFError, ValueError, and
>              # TypeError. They should be handled somehow.
>  
>      def sendData(self, program_counter, data_type, data):
>          send_data = (program_counter, data_type, data)
> -        self.sendString(marshal.dumps(send_data))
> +        reactor.threadCallQueue.append((self.sendString, [marshal.dumps(send_data)], {}))
> +        self.activation_counter +=1
> +
> +        if (self.activation_counter >= 1):
> +            reactor.wakeUp()
> +            self.activation_counter = 0
>  
>      def sendShare(self, program_counter, share):
>          """Send a share.
> @@ -501,6 +526,15 @@
>          # communicating with ourselves.
>          self.add_player(player, None)
>  
> +        #: Blocking queue for Deferreds.
> +        self.deferred_queue = Queue(0)
> +        #: Current recursion depth.
> +        self.recursion_depth = 0
> +        #: Activation counter.
> +        self.activation_counter = 0
> +        #: Event is set while the reactor is waiting (select syscall).
> +        self.select_event = Event()
> +
>      def add_player(self, player, protocol):
>          self.players[player.id] = player
>          self.num_players = len(self.players)
> @@ -522,18 +556,18 @@
>              results = [maybeDeferred(self.port.stopListening)]
>              for protocol in self.protocols.itervalues():
>                  results.append(protocol.lost_connection)
> -                protocol.loseConnection()
> +                reactor.callFromThread(protocol.loseConnection)
>              return DeferredList(results)
>  
> -        def stop_reactor(_):
> +        def stop_reactor(_, self):
>              print "done."
>              print "Stopping reactor... ",
> -            reactor.stop()
> +            reactor.callFromThread(reactor.stop)
>              print "done."

I don't see self used above, why is it there? To keep it from being
garbage collected?

>          sync = self.synchronize()
>          sync.addCallback(close_connections)
> -        sync.addCallback(stop_reactor)
> +        sync.addCallback(stop_reactor, self)
>          return sync
>  
>      def wait_for(self, *vars):
> @@ -542,8 +576,9 @@
>          The runtime is shut down when all variables are calculated.
>          """
>          dl = DeferredList(vars)
> -        dl.addCallback(lambda _: self.shutdown())
> +        self.schedule_callback(dl, lambda _: self.shutdown())
>  
> +    @increment_pc
>      def schedule_callback(self, deferred, func, *args, **kwargs):
>          """Schedule a callback on a deferred with the correct program
>          counter.
> @@ -574,7 +609,64 @@
>              finally:
>                  self.program_counter[:] = current_pc
>  
> -        deferred.addCallback(callback_wrapper, *args, **kwargs)
> +        return deferred.addCallback(callback_wrapper, *args, **kwargs)
> +
> +    def schedule_complex_callback(self, share, func, *args, **kwargs):
> +        """Schedule a complex callback, i.e. a callback which blocks a
> +        long time."""
> +
> +        assert isinstance(share, Share), "Only shares can have complex callbacks."
> +
> +        share.priority = -1
> +        self.schedule_callback(share, func, *args, **kwargs)
> +
> +    def profile_deferred_queue_loop(self):
> +        import cProfile
> +        prof = cProfile.Profile()
> +        prof.runcall(self.deferred_queue_loop)
> +        prof.print_stats(1)
> +
> +    def deferred_queue_loop(self):
> +        while True:
> +            deferred, pc = self.deferred_queue.get()
> +
> +            if deferred is not None:
> +                deferred.unpause()
> +                from twisted.python import failure

You don't want to import stuff in a loop, imports should generally go
to the module top-level.

> +                if isinstance(deferred.result,failure.Failure):

Space after comma.

> +                    deferred.result.printTraceback()
> +                sys.stdout.flush()
> +            else:
> +                return
> +
> +    def process_deferred_queue(self):
> +        self.select_event.wait()
> +
> +        max_depth = 1
> +
> +        if (self.recursion_depth >= max_depth):
> +            return
> +
> +        self.recursion_depth += 1
> +
> +        while True:
> +            try:
> +                deferred, pc = self.deferred_queue.get(block=False)
> +            except Empty:
> +                break
> +            else:
> +                if deferred is not None:
> +                    if isinstance(deferred, Share) and \
> +                           self.recursion_depth / float(max_depth) - deferred.priority > 1:
> +                        self.deferred_queue.put((deferred, None))

This looks interesting -- but what is the logic behind this?

> +                        break
> +                    else:
> +                        deferred.unpause()
> +                else:
> +                    self.deferred_queue.put((None, None))
> +                    break
> +
> +        self.recursion_depth -= 1
>  
>      @increment_pc
>      def synchronize(self):
> @@ -598,16 +690,25 @@
>          pc = tuple(self.program_counter)
>          key = (pc, data_type)
>  
> +        lock = self.protocols[peer_id].incoming_lock
> +        lock.acquire()
>          deq = self.protocols[peer_id].incoming_data.setdefault(key, deque())
>          if deq and not isinstance(deq[0], Deferred):
>              # We have already received some data from the other side.
>              data = deq.popleft()
>              if not deq:
>                  del self.protocols[peer_id].incoming_data[key]
> +            lock.release()
>              deferred.callback(data)
>          else:
>              # We have not yet received anything from the other side.
>              deq.append(deferred)
> +            lock.release()
> +            self.activation_counter += 1
> +
> +            if (self.activation_counter >= 1):

No parenthesis around the condition in if-statements, please.

> +                self.process_deferred_queue()
> +                self.activation_counter = 0
>  
>      def _exchange_shares(self, peer_id, field_element):
>          """Exchange shares with another player.
> @@ -789,24 +890,13 @@
>          # profiler here and stop it upon shutdown, but this triggers
>          # http://bugs.python.org/issue1375 since the start and stop
>          # calls are in different stack frames.
> -        import hotshot
> -        prof = hotshot.Profile("player-%d.prof" % id)
> +        import cProfile
> +        prof = cProfile.Profile()

This hunk should definitely go in its own patch.

>          old_run = reactor.run
>          def new_run(*args, **kwargs):
>              print "Starting reactor with profiling"
>              prof.runcall(old_run, *args, **kwargs)
> -
> -            import sys
> -            import hotshot.stats
> -            print "Loading profiling statistics...",
> -            sys.stdout.flush()
> -            stats = hotshot.stats.load("player-%d.prof" % id)
> -            print "done."
> -            print
> -            stats.strip_dirs()
> -            stats.sort_stats("time", "calls")
> -            stats.print_stats(40)
> -            stats.dump_stats("player-%d.pstats" % id)
> +            prof.print_stats(1)
>          reactor.run = new_run
>  
>      # This will yield a Runtime when all protocols are connected.
> @@ -874,7 +964,38 @@
>              print "Will connect to %s" % player
>              connect(player.host, player.port)
>  
> -    return result
> +    # Start the main callback in the separate thread.
> +    deferred_runtime = clone_deferred(result)
> +    deferred_runtime.pause()
> +
> +    def start_deferred_queue_loop(runtime, deferred_runtime, options):
> +        print "Start VIFF thread"
> +
> +        runtime.deferred_queue.put((deferred_runtime, None))
> +
> +        if options and options.profile:
> +            reactor.callInThread(runtime.profile_deferred_queue_loop)
> +        else:
> +            reactor.callInThread(runtime.deferred_queue_loop)
> +
> +        reactor.addSystemEventTrigger("before", "shutdown",
> +                                      runtime.deferred_queue.put, (None, None))
> +        return runtime
> +
> +    result.addCallback(start_deferred_queue_loop, deferred_runtime, options)
> +
> +    # Monkey patch selectreactor to known when it is waiting.
> +    plain_select = selectreactor._select
> +
> +    def patched_select(*args, **kwargs):
> +        runtime.select_event.set()
> +        result = plain_select(*args, **kwargs)
> +        runtime.select_event.clear()
> +        return result

Do you need to do here, or can select not fail?

  runtime.select_event.set()
  try:
      result = plain_select(*args, **kwargs)
      return result
  finally:
      runtime.select_event.clear()

> +    selectreactor._select = patched_select

This ties us up on the selectreactor -- how does this affect the
integration with GUIs? Badly I fear, the GUI integration uses a
GtkReactor which wont be updated by this code.

-- 
Martin Geisler

VIFF (Virtual Ideal Functionality Framework) brings easy and efficient
SMPC (Secure Multiparty Computation) to Python. See: http://viff.dk/.



More information about the viff-devel mailing list