Programming: Concurrency Patterns

From wikinotes

Resources

Wikipedia: Concurrency Patterns https://en.wikipedia.org/wiki/Concurrency_pattern

Producer/Consumer

Safely parallelized task enqueueing/execution,
optimized for a small number of cores.

  • A producer adds data to a synchronized queue
  • A variable number of consumers process data from a synchronized queue in a loop
  • When producer is finished, send a poison pill for each worker, informing it to break/exit the loop gracefully
  • Join on worker threads (wait to close)

Fork/Join

A task is gradually broken up into smaller units of work, and distributed amongs CPU cores.
Very fast, optimized for high number of cores (ex. GPU).
It is common in this pattern for threads to steal works from deques of work from other threads to maximize resource usage.

                          Task
                           |
                  +--------+---------+
                (fork)            (fork)
                  |                 |
                Task               Task
                  |                 |
             +----+-----+      +----+-----+
           (fork)    (fork)  (fork)     (fork)
             |         |       |          |
            Task      Task    Task       Task

             |          |      |          |
             +---(join)-+      +---(join)-+
                   |                  |
                   +------(join)------+
                            |
                          Result

https://en.wikipedia.org/wiki/Fork%E2%80%93join_model

Monitor Object

Ensure only one of method(s) is performing at any given time.

Safely share an object between multiple threads,
The monitor blocks while performing, callers from other threads will block until the performed work is complete.
The monitor's unit of work may be synchronous, or performed in a different thread.
The caller does not need to know about the perform-lock, it will simply block until the monitor is able to process a new task.

Say we only ever want one thread performing IO on a file at any given time
We create a WriterMonitor, which might have methods read/write

  • in thread-1, we call read(), which blocks while in the background creates and performs a thread
  • in thread-2, we call write(), it will automatically block until thread-1's read() call is finished, then it will take lock and write()
class IoMonitor:
    lock = _thread.allocate_lock()
    def read():
        self.lock.acquire()
        try:
            with open('foo.txt' ,'r') as fd:
                return fd.read()
        finally:
            self.lock.release()

    def write(data):
        self.lock.acquire()
        try:
            with open('foo.txt', 'w') as fd:
                fd.write(data)
        finally:
            self.lock.release()
# thread-1
mon = IoMonitor()
mon.read()

# thread-2
mon = IoMonitor()
mon.write("foo")  # blocks until 'read' complete, then performs

https://www.youtube.com/watch?v=_p-TM1x48zk

Active Object

Abstract method calls run in a different thread.

Safely share an object between threads,
each method call on a proxy-object enqueues an execution in a separate thread.
Unlike Monitor Object, method can be enqueued several times (does not block on perform)
Also unlike Monitor Object, you may choose to perform tasks in a different order than they were enqueued.
Hides the lock/queue logic from the caller.

ex. update a progressbar in the UI thread from a background thread


Overview:

  • Background thread instantiates a proxy object
  • On a method call, the proxy returns a Future/Promise (a request is enqueued to UI thread)
  • The UI thread eventually performs the task
  • You may wait/join on the Future/Promise to wait for it to return it's results

https://www.youtube.com/watch?v=U9Tf7h-etl0


# bg-thread
class Proxy           # (ActiveObject) methods that will run in sep thread
class Future          # waitable object, will obtain result when performed

# ui-thread
class Queue           # holds method requests
class Scheduler       # (EventLoop) chooses next method request to run
# runs in thread-1
@singleton
class Scheduler:
    def __init__(self):
        self._queue = queue.Queue()

    def send(runnable):
        self._queue.add(runnable)
        return Promise(runnable)

    def start(self):
        while True:
            runnable = self._queue.get()
            runnable.run()

# runs in thread-2
class Proxy:
    def say_hi():
        return Scheduler().send(SayHiRunnable())

    def say_bye():
        return Scheduler().send(SayByeRunnable())

Sample Usage

# thread-2
proxy = Proxy()
promise_1 = proxy.say_hi()
promise_2 = proxy.say_bye()

promise_1.wait()
promise_2.wait()

Reactor Pattern

Post events, and add handlers to subscribe to them.


examples:

  • logging frameworks that send logs to various outputs
  • Qt's signals/slots are a variation of this pattern
# The reactor is your eventloop.
# It handles emitted events
#
# This should be a singleton
class Reactor:
    def __init__(self):
        self._queue = queue.Queue()

    def send_event(self, event):
        self._queue.put(event)

    def register(self, handler):
        self._handlers.append(handler)

    def handle_events(self):
        while True:
            event = self._queue.get(timeout=0)
            if event is not None:
                self._dispatch(event)

    def _dispatch(self, event):
        for handler in self._handlers:
            handler.handle(event)

# Events are things that happen
class Event:
class MouseclickEvent(Event):
class KeypressEvent(Event):

# EventHandlers decide if they will handle an event
class EventHandler:
    def handle(self, event):
        pass
class KeypressHandler(EventHandler):
class MouseclickHandler(EventHandler):

Sample Usage

# thread-1
reactor = Reactor()
reactor.register(KeypressHandler())
reactor.register(MouseclickHandler())
reactor.handle_events()

# thread-2
reactor = Reactor()
reactor.send_event(KeypressEvent("a"))

# thread-1 handles the event

http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

Promise / Future

Represents a unit of work to be performed asynchronously.
Often chainable on success like a monad (ex foo.then(bar).then(baz)).

TODO:

I need to think on this, specifically implementing a Promise.wait() without polling

https://docs.microsoft.com/en-us/cpp/parallel/concrt/walkthrough-implementing-futures?view=msvc-170

Scheduler

See https://en.wikipedia.org/wiki/Scheduling_(computing)

Actor Model

An abstraction of units of work, and processors of work that may represent threads, processes, or servers.

  • Actors have addresses (which may be represented by one or more objects)
  • Actors have message queues (tasks for them to process)
  • Actors can start other actors
  • Actors can monitor other actors

TODO:

I don't have a great understanding of this