Python threading

From wikinotes

See python concurrency for more concurrency options and concurrency primitives.

Examples

single function

most simple usage

import threading

t = threading.Thread(
    target = self.my_func,
    args   = ('var','varb'),
    kwargs = { 'a':1 }
)
t.start()

producer/consumer

The canonical example of producer/single-consumer.

import time 
import threading 
import Queue 

class Consumer(threading.Thread): 
  def __init__(self, queue): 
    threading.Thread.__init__(self)
    self._queue = queue 

  def run(self):
    while True: 
      # blocks thread until item retrieved. 
      msg = self._queue.get() 
      if isinstance(msg, str) and msg == 'quit':
        break
      print("I'm a thread, and I received %s!!" % msg)


def producer():
  queue = Queue.Queue()

  worker = Consumer(queue)
  worker.start()  # runs thread.run()

  start_time = time.time() 
  while time.time() - start_time < 5: 
    queue.put('something at %s' % time.time())
    time.sleep(1)  # Sleep a bit, avoid absurd num messages

  # "poison pill" kill thread
  queue.put('quit')
  worker.join()


if __name__ == '__main__':
  producer()

producer/multiple consumers

The previous example, expanded to handle multiple consumers. Apparently, this example is derived from IBM's documentation.

import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
  def __init__(self, queue): 
    threading.Thread.__init__(self)
    self._queue = queue 

  def run(self):
    while True: 
      content = self._queue.get() 
      if isinstance(content, str) and content == 'quit':
        break
      response = urllib2.urlopen(content)


def producer():
  urls = [
    'http://www.python.org', 'http://www.yahoo.com'
    'http://www.scala.org', 'http://www.google.com'
  ]
  queue = Queue.Queue()
  worker_threads = build_worker_pool(queue, 4)
  start_time = time.time()

  # for each worker, add url, add poison-pill, wait for threads to complete
  # (even if finishes quickly, and 1x worker is all that is needed, 
  # all workers will still quit)
  for url in urls: 
    queue.put(url)  
  for worker in worker_threads:
    queue.put('quit')
  for worker in worker_threads:
    worker.join()

  print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
  workers = []
  for _ in range(size):
    worker = Consumer(queue)
    worker.start() 
    workers.append(worker)
  return workers

if __name__ == '__main__':
  producer()

multiprocessing.Pool (2.6+)

Most simple producer/consumer patterns can be handled using python's threadpool.

multiprocessing.dummy uses threads instead of processes.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  # etc.. 
  ]

pool = ThreadPool(4) 
results = pool.map(urllib2.urlopen, urls)
pool.close() 
pool.join()

Communication between Threads

Events & LockFiles

The main way of interacting between threads or processes is to use events. Events can either be on or off. This binary nature can be replicated easily with lockfiles if you need to synchronize processes running from different interpreters.

import multiprocessing

event = multiprocessing.Event()
event.set()
if event.is_set():
	event.clear()

Shared Primitives

You can create a manager object (global object to a single interpreter session), which can be used to create/share particular datastructures between threads/processes that were started from a single interpreter session.

import multiprocessing

multiprocessing.Manager()
mlist = mgr.list()

mlist.append('abc')


Shared Objects

This is possible, but my last attempt failed. See:


References

References:
https://docs.python.org/2/library/multiprocessing.html
http://sebastianraschka.com/Articles/2014_multiprocessing_intro.html
http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/