Programming: Concurrency Patterns
Resources
Wikipedia: Concurrency Patterns https://en.wikipedia.org/wiki/Concurrency_pattern
Producer/Consumer
Safely parallelized task enqueueing/execution,
optimized for a small number of cores.
- A producer adds data to a synchronized queue
- A variable number of consumers process data from a synchronized queue in a loop
- When producer is finished, send a poison pill for each worker, informing it to break/exit the loop gracefully
- Join on worker threads (wait to close)
Fork/Join
A task is gradually broken up into smaller units of work, and distributed amongs CPU cores.
Very fast, optimized for high number of cores (ex. GPU).
It is common in this pattern for threads to steal works from deques of work from other threads to maximize resource usage.Task | +--------+---------+ (fork) (fork) | | Task Task | | +----+-----+ +----+-----+ (fork) (fork) (fork) (fork) | | | | Task Task Task Task | | | | +---(join)-+ +---(join)-+ | | +------(join)------+ | Result
Monitor Object
Ensure only one of method(s) is performing at any given time.
Safely share an object between multiple threads,
The monitor blocks while performing, callers from other threads will block until the performed work is complete.
The monitor's unit of work may be synchronous, or performed in a different thread.
The caller does not need to know about the perform-lock, it will simply block until the monitor is able to process a new task.Say we only ever want one thread performing IO on a file at any given time
We create a WriterMonitor, which might have methods read/write
- in thread-1, we call read(), which blocks while in the background creates and performs a thread
- in thread-2, we call write(), it will automatically block until thread-1's read() call is finished, then it will take lock and write()
class IoMonitor: lock = _thread.allocate_lock() def read(): self.lock.acquire() try: with open('foo.txt' ,'r') as fd: return fd.read() finally: self.lock.release() def write(data): self.lock.acquire() try: with open('foo.txt', 'w') as fd: fd.write(data) finally: self.lock.release()# thread-1 mon = IoMonitor() mon.read() # thread-2 mon = IoMonitor() mon.write("foo") # blocks until 'read' complete, then performs
Active Object
Abstract method calls run in a different thread.
Safely share an object between threads,
each method call on a proxy-object enqueues an execution in a separate thread.
Unlike Monitor Object, method can be enqueued several times (does not block on perform)
Also unlike Monitor Object, you may choose to perform tasks in a different order than they were enqueued.
Hides the lock/queue logic from the caller.ex. update a progressbar in the UI thread from a background thread
Overview:
- Background thread instantiates a proxy object
- On a method call, the proxy returns a Future/Promise (a request is enqueued to UI thread)
- The UI thread eventually performs the task
- You may wait/join on the Future/Promise to wait for it to return it's results
https://www.youtube.com/watch?v=U9Tf7h-etl0
# bg-thread class Proxy # (ActiveObject) methods that will run in sep thread class Future # waitable object, will obtain result when performed # ui-thread class Queue # holds method requests class Scheduler # (EventLoop) chooses next method request to run# runs in thread-1 @singleton class Scheduler: def __init__(self): self._queue = queue.Queue() def send(runnable): self._queue.add(runnable) return Promise(runnable) def start(self): while True: runnable = self._queue.get() runnable.run() # runs in thread-2 class Proxy: def say_hi(): return Scheduler().send(SayHiRunnable()) def say_bye(): return Scheduler().send(SayByeRunnable())Sample Usage
# thread-2 proxy = Proxy() promise_1 = proxy.say_hi() promise_2 = proxy.say_bye() promise_1.wait() promise_2.wait()
Reactor Pattern
Post events, and add handlers to subscribe to them.
examples:
- logging frameworks that send logs to various outputs
- Qt's signals/slots are a variation of this pattern
# The reactor is your eventloop. # It handles emitted events # # This should be a singleton class Reactor: def __init__(self): self._queue = queue.Queue() def send_event(self, event): self._queue.put(event) def register(self, handler): self._handlers.append(handler) def handle_events(self): while True: event = self._queue.get(timeout=0) if event is not None: self._dispatch(event) def _dispatch(self, event): for handler in self._handlers: handler.handle(event) # Events are things that happen class Event: class MouseclickEvent(Event): class KeypressEvent(Event): # EventHandlers decide if they will handle an event class EventHandler: def handle(self, event): pass class KeypressHandler(EventHandler): class MouseclickHandler(EventHandler):Sample Usage
# thread-1 reactor = Reactor() reactor.register(KeypressHandler()) reactor.register(MouseclickHandler()) reactor.handle_events() # thread-2 reactor = Reactor() reactor.send_event(KeypressEvent("a")) # thread-1 handles the eventhttp://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf
Promise / Future
Represents a unit of work to be performed asynchronously.
Often chainable on success like a monad (exfoo.then(bar).then(baz)
).TODO:
I need to think on this, specifically implementing a Promise.wait() without polling
https://docs.microsoft.com/en-us/cpp/parallel/concrt/walkthrough-implementing-futures?view=msvc-170
Scheduler
Actor Model
An abstraction of units of work, and processors of work that may represent threads, processes, or servers.
- Actors have addresses (which may be represented by one or more objects)
- Actors have message queues (tasks for them to process)
- Actors can start other actors
- Actors can monitor other actors
TODO:
I don't have a great understanding of this