multiprocessing¶

11/30/2023¶

print view

notebook

In [1]:
%%html
<script src="https://bits.csb.pitt.edu/preamble.js"></script>

OMETs¶

Please fill out.

https://teaching.pitt.edu/omet/student-information/

Parallel Programming¶

There are several parallel programming models enabled by a variety of hardware (multicore, cloud computing, supercomputers, GPU).

Threads vs. Processes¶

A thread of execution is the smallest sequence of programmed instructions that can be managed independently by an operating system scheduler.

A process is an instance of a computer program.

Address Spaces¶

A process has its own address space. An address space is a mapping of virtual memory addresses to physical memory addresses managed by the operating system.

Address spaces prevent processes from crashing other applications or the operating system - they can only access their own memory.

Threads vs. Processes¶

In [ ]:
import threading,time, math

cnt = [0]

def incrementCnt(cnt):
    for i in range(1000000): # a million times
        cnt[0] += math.sqrt(1)

t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start()
t2.start()

print(cnt[0])

What do we expect to print out?

In [2]:
%%html
<div id="thread1" style="width: 500px"></div>
<script>
    var divid = '#thread1';
	jQuery(divid).asker({
	    id: divid,
	    question: "What will print out?",
		answers: ['0','2','1000000','2000000',"I don't know"],
        server: "https://bits.csb.pitt.edu/asker.js/example/asker.cgi",
		charter: chartmaker})
    
$(".jp-InputArea .o:contains(html)").closest('.jp-InputArea').hide();


</script>

The Answer¶

In [28]:
cnt = [0]

t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start()
t2.start()

print(cnt) 
time.sleep(1)
print(cnt)
time.sleep(1)
print(cnt)
[75813.0]
[1182366.0]
[1182366.0]

Threads vs. Processes¶

In [29]:
import multiprocess,time

def incrementCnt(cnt):
    for i in range(1000000): # a million times
        cnt[0] += math.sqrt(1)
        
cnt = [0]

p1 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p2 = multiprocess.Process(target=incrementCnt,args=(cnt,))

p1.start()
p2.start()

#what do we expect when we print out cnt[0]?
In [3]:
%%html
<div id="proc1" style="width: 500px"></div>
<script>
    var divid = '#proc1';
	jQuery(divid).asker({
	    id: divid,
	    question: "What will print out?",
		answers: ['0','2','1000000','2000000',"I don't know"],
        server: "https://bits.csb.pitt.edu/asker.js/example/asker.cgi",
		charter: chartmaker})
    
$(".jp-InputArea .o:contains(html)").closest('.jp-InputArea').hide();


</script>

The Answer¶

In [31]:
cnt = [0]
p1 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p2 = multiprocess.Process(target=incrementCnt,args=(cnt,))

p1.start()
p2.start()

print(cnt[0])
time.sleep(3)
print(cnt[0])
0
0

Threads/Process Objects¶

Thread and Process Objects have the same interface

  • start - start running the target function with (optional) args
  • join - wait for thread to finish before doing anything else
  • is_alive - returns true if thread is still running

Example¶

In [32]:
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start() #start launches the thread to run target with args
t2.start()

print(cnt[0],t1.is_alive(),t2.is_alive())

t1.join() #join waits for a thread to finish
t2.join() #if you don't join a Process, it will become a zombie

print(cnt[0],t1.is_alive(),t2.is_alive())
78226.0 True True
1246141.0 False False

Synchronization¶

When two threads update the same resource, their access to that resource needs to be gated.

There are a variety of synchronization primitives provided for both threads and processes (although they usually aren't needed for processes):

Lock¶

Can be acquired by exactly one thread (calling acquire twice from the same thread will hang). Must be released to be acquired by another thread. Basically, just wrap the critical section with an acquire-release pair.

RLock¶

A recursive lock. This is a lock that can be acquired multiple times by the same thread (and then must be released exactly the same number times). This is especially useful for modular programming. For example, you can acquire/release a lock around a function call without working about what that function is doing with the lock.

Lock Example¶

In [33]:
lock = threading.Lock()

def incrementCnt(cnt):
    for i in range(1000000): # a million times
        lock.acquire()  #only one thread can acquire the lock at a time
        cnt[0] += math.sqrt(1) #this is the CRITICAL SECTION
        lock.release()

cnt = [0]
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start() #start launches the thread to run target with args
t2.start()

t1.join() 
t2.join() 

print(cnt[0])
2000000.0

Communication¶

Threads communicate through their shared address space, and use locks to protect sensitive state. Several classes provide a simplified interface for communication (these are available with processes as well, but the underlying implementation is different).

Queue¶

Queue.Queue (multiprocess.Queue for processes) provides a simple, thread-safe, first-in-first-out queue.

  • put: put an element on the queue. This will block if the queue has filled up
  • get: get an element from the queue. This will block if the queue is empty.

Communication¶

Pipe¶

A pipe is a communication channel between processes that can send and receive messages. Unlike a queue, it is not okay for multiple threads to write to the same end of the pipe at the same time.

Pipe() return a duple of Connection objects representing the ends of the pipe. Each connection object has the following methods:

  • send: sends data to other end of the pipe
  • recv: waits for data from other end of the pipe (unless pipe closed, then EOFError)
  • close: close the pipe

Communication¶

Since they don't have a shared address space, it is recommended the processes use exclusively either multiprocess.Queue or multiprocess.Pipe to communicate.

Use a pipe for communication between two processes (server-client architecture).

Use a queue for one-way communication between many threads (producer-consumer architecture).

Pipes¶

In [34]:
import multiprocess

def chatty(conn): #this takes a Connection object representing one end of a pipe
    msg = conn.recv()
    conn.send("you sent me "+msg)
    
(c1,c2) = multiprocess.Pipe()

p1 = multiprocess.Process(target=chatty,args=(c2,))
p1.start()

c1.send("Hello!")
result = c1.recv()
p1.join()

print(result)
you sent me Hello!

Pools¶

multiprocess supports the concept of a pool of workers. You initialize with the number of processes you want to run in parallel (the default is the number of CPUs on your system) and they are available for doing parallel work:

  • map: the most important function - just like the built-in map, this will map a function to an iterable and return the result, but the mapping will be done in parallel. Blocks until the full result is computed and the result is in the proper order.
  • map_async: map without blocking
  • imap: parallel imap - returns iterable instead of list. The next() method of the returned iterable will block if necessary.
  • imap_unordered: same as imap but returns values in order they are computed
  • close: close the pool to prevent additional jobs from being submitted
  • join: must call close first; waits for all jobs to complete

Pool Example¶

In [35]:
def f(x):
    return x*x

pool = multiprocess.Pool(processes=4)

print(pool.map(f,range(20)))
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

Producer/Consumer¶

In [36]:
def dowork(inQ, outQ):
    val = inQ.get()
    outQ.put(val*val)

inQ = multiprocess.Queue()
outQ = multiprocess.Queue()
pool = multiprocess.Pool(4, dowork, (inQ, outQ))
In [37]:
inQ.put(4)
In [38]:
outQ.get()
Out[38]:
16

Bad Example¶

In [24]:
#What is wrong with this code?
for i in range(10):
    inQ.put(i)

while not outQ.empty():
    print(outQ.get())

You should not produce and consume in the same thread. If outQ fills up, inQ will fill up and the put will block.

The empty/full methods of the Queue class are pretty much useless since their result depends on when they are called. Here, no values had been generated when it was called so nothing is printed.

In order communicate that there are no more values, you must send a sentinal value.

Threads or Processes?¶

Normally, the choice between threads and processes depends on how data will be accessed and the level of communication between parallel tasks etc.

However, in Python, the answer is almost always use multiprocessing, not threading.

Why? The CPython interpretter has a Global Interpretter Lock. This means that only one thread of python code is actually executed at any given time when using threads. With processes, each process has its own interpretter (with its own lock).

Embarassingly Parallel¶

Writing correct, high performance parallel code can be difficult, but some in some cases it's trivial.

A problem is embarassingly parallel if it can be easily separated into independent subtasks (i.e., no need to communicate between threads) each of which does a substantial amount of computation.

Fortunately this is often the case.

  • Apply this filter to 1000 images
  • Process these 5000 protein structures
  • Compute RMSDs of all frames in a trajectory

In cases like these, using Pools will get you a significant speedup (by however many cores you have).

Key Concepts¶

  • You can running many things at the same time with threads/processes
  • This is an easy way to make things faster, but can get complicated
  • Use multiprocess not threads
  • Processes can communicate with Queues/Pipes
  • 90% of the time all you need are Pools, and they are not complicated

Project Plans?¶

Project¶

  • Take a search term as an argument
  • Use biopython to search the NCBI protein database for entries that match this term and are in the pdb
  • Extract the PDB entries
  • For each PDB entry, use ProDy to calculate the ANM modes (this can be done in parallel)
  • Sort the results based on the highest fractional variance in the first mode
  • Print out the top ten PDBs with the fractional variance of their first three modes
In [39]:
from Bio import Entrez
import re
seqfile = 'searchresults.fasta'
Entrez.email = "dkoes@pitt.edu"
records = Entrez.read(Entrez.esearch(db='protein',term='covid AND pdb[filter]',retmax=1000))
result = Entrez.efetch(db='protein',id=records['IdList'],rettype='fasta',retmode='text').read()
set(re.findall(r'>pdb\|(\S+?)\|',result))
Out[39]:
{'6KD5',
 '6LU7',
 '6W61',
 '6WUU',
 '6WX4',
 '6XA4',
 '6XBG',
 '6XBH',
 '6XBI',
 '6XFN',
 '7AKU',
 '7BQY',
 '7EIN',
 '7GAV',
 '7GAW',
 '7GAX',
 '7GAY',
 '7GAZ',
 '7GB0',
 '7GB1',
 '7GB2',
 '7GB3',
 '7GB4',
 '7GB5',
 '7GB6',
 '7GB7',
 '7GB8',
 '7GB9',
 '7GBA',
 '7GBB',
 '7GBC',
 '7GBD',
 '7GBE',
 '7GBF',
 '7GBG',
 '7GBH',
 '7GBI',
 '7GBJ',
 '7GBK',
 '7GBL',
 '7GBM',
 '7GBN',
 '7GBO',
 '7GBP',
 '7GBQ',
 '7GBR',
 '7GBS',
 '7GBT',
 '7GBU',
 '7GBV',
 '7GBW',
 '7GBX',
 '7GBY',
 '7GBZ',
 '7GC0',
 '7GC1',
 '7GC2',
 '7GC3',
 '7GC4',
 '7GC5',
 '7GC6',
 '7GC7',
 '7GC8',
 '7GC9',
 '7GCA',
 '7GCB',
 '7GCC',
 '7GCD',
 '7GCE',
 '7GCF',
 '7GCG',
 '7GCI',
 '7GCJ',
 '7GCK',
 '7GCL',
 '7GCM',
 '7GCN',
 '7GCO',
 '7GCP',
 '7GCQ',
 '7GCR',
 '7GCS',
 '7GCT',
 '7GCU',
 '7GCV',
 '7GCW',
 '7GCX',
 '7GCY',
 '7GCZ',
 '7GD0',
 '7GD1',
 '7GD2',
 '7GD3',
 '7GD4',
 '7GD5',
 '7GD6',
 '7GD7',
 '7GD8',
 '7GD9',
 '7GDA',
 '7GDB',
 '7GDC',
 '7GDD',
 '7GDE',
 '7GDF',
 '7GDG',
 '7GDH',
 '7GDI',
 '7GDJ',
 '7GDK',
 '7GDL',
 '7GDM',
 '7GDN',
 '7GDO',
 '7GDP',
 '7GDQ',
 '7GDR',
 '7GDS',
 '7GDT',
 '7GDU',
 '7GDV',
 '7GDW',
 '7GDX',
 '7GDY',
 '7GDZ',
 '7GE0',
 '7GE1',
 '7GE2',
 '7GE3',
 '7GE4',
 '7GE5',
 '7GE6',
 '7GE7',
 '7GE8',
 '7GE9',
 '7GEA',
 '7GEB',
 '7GEC',
 '7GED',
 '7GEE',
 '7GEF',
 '7GEG',
 '7GEH',
 '7GEI',
 '7GEJ',
 '7GEK',
 '7GEL',
 '7GEM',
 '7GEN',
 '7GEO',
 '7GEQ',
 '7GER',
 '7GES',
 '7GET',
 '7GEU',
 '7GEV',
 '7GEW',
 '7GEX',
 '7GEY',
 '7GEZ',
 '7GF0',
 '7GF1',
 '7GF2',
 '7GF3',
 '7GF4',
 '7GF5',
 '7GF6',
 '7GF7',
 '7GF8',
 '7GF9',
 '7GFA',
 '7GFB',
 '7GFC',
 '7GFD',
 '7GFE',
 '7GFF',
 '7GFG',
 '7GFH',
 '7GFI',
 '7GFJ',
 '7GFK',
 '7GFL',
 '7GFM',
 '7GFN',
 '7GFO',
 '7GFP',
 '7GFQ',
 '7GFR',
 '7GFS',
 '7GFT',
 '7GFU',
 '7GFV',
 '7GFW',
 '7GFX',
 '7GFY',
 '7GFZ',
 '7GG0',
 '7GG1',
 '7GG2',
 '7GG3',
 '7GG4',
 '7GG5',
 '7GG6',
 '7GG7',
 '7GG8',
 '7GG9',
 '7GGA',
 '7GGB',
 '7GGC',
 '7GGD',
 '7GGE',
 '7GGF',
 '7GGG',
 '7GGH',
 '7GGI',
 '7GGJ',
 '7GGK',
 '7GGL',
 '7GGM',
 '7GGN',
 '7GGO',
 '7GGP',
 '7GGQ',
 '7GGR',
 '7GGS',
 '7GGT',
 '7GGU',
 '7GGV',
 '7GGW',
 '7GGX',
 '7GGY',
 '7GGZ',
 '7GH0',
 '7GH1',
 '7GH2',
 '7GH3',
 '7GH4',
 '7GH5',
 '7GH6',
 '7GH7',
 '7GH8',
 '7GH9',
 '7GHA',
 '7GHB',
 '7GHC',
 '7GHD',
 '7GHE',
 '7GHF',
 '7GHG',
 '7GHH',
 '7GHI',
 '7GHJ',
 '7GHK',
 '7GHL',
 '7GHM',
 '7GHN',
 '7GHO',
 '7GHP',
 '7GHQ',
 '7GHR',
 '7GHS',
 '7GHT',
 '7GHU',
 '7GHV',
 '7GHW',
 '7GHX',
 '7GHY',
 '7GHZ',
 '7GI0',
 '7GI1',
 '7GI2',
 '7GI3',
 '7GI4',
 '7GI5',
 '7GI6',
 '7GI7',
 '7GI8',
 '7GI9',
 '7GIA',
 '7GIB',
 '7GIC',
 '7GID',
 '7GIE',
 '7GIF',
 '7GIG',
 '7GIH',
 '7GII',
 '7GIJ',
 '7GIK',
 '7GIL',
 '7GIM',
 '7GIN',
 '7GIO',
 '7GIP',
 '7GIQ',
 '7GIR',
 '7GIS',
 '7GIT',
 '7GIU',
 '7GIV',
 '7GIW',
 '7GIX',
 '7GIY',
 '7GIZ',
 '7GJ0',
 '7GJ1',
 '7GJ2',
 '7GJ3',
 '7GJ4',
 '7GJ5',
 '7GJ6',
 '7GJ7',
 '7GJ8',
 '7GJ9',
 '7GJA',
 '7GJB',
 '7GJC',
 '7GJD',
 '7GJE',
 '7GJF',
 '7GJG',
 '7GJH',
 '7GJI',
 '7GJJ',
 '7GJK',
 '7GJL',
 '7GJM',
 '7GJN',
 '7GJO',
 '7GJP',
 '7GJQ',
 '7GJR',
 '7GJS',
 '7GJT',
 '7GJU',
 '7GJV',
 '7GJW',
 '7GJX',
 '7GJY',
 '7GJZ',
 '7GK0',
 '7GK1',
 '7GK2',
 '7GK3',
 '7GK4',
 '7GK5',
 '7GK6',
 '7GK7',
 '7GK8',
 '7GK9',
 '7GKA',
 '7GKB',
 '7GKC',
 '7GKD',
 '7GKE',
 '7GKF',
 '7GKG',
 '7GKH',
 '7GKI',
 '7GKJ',
 '7GKK',
 '7GKL',
 '7GKM',
 '7GKN',
 '7GKO',
 '7GKP',
 '7GKQ',
 '7GKR',
 '7GKS',
 '7GKT',
 '7GKU',
 '7GKV',
 '7GKW',
 '7GKX',
 '7GKY',
 '7GKZ',
 '7GL0',
 '7GL1',
 '7GL2',
 '7GL3',
 '7GL4',
 '7GL5',
 '7GL6',
 '7GL7',
 '7GL8',
 '7GL9',
 '7GLA',
 '7GLB',
 '7GLC',
 '7GLD',
 '7GLE',
 '7GLF',
 '7GLG',
 '7GLH',
 '7GLI',
 '7GLJ',
 '7GLK',
 '7GLL',
 '7GLM',
 '7GLN',
 '7GLO',
 '7GLP',
 '7GLQ',
 '7GLR',
 '7GLS',
 '7GLT',
 '7GLU',
 '7GLV',
 '7GLW',
 '7GLX',
 '7GLY',
 '7GLZ',
 '7GM0',
 '7GM1',
 '7GM2',
 '7GM3',
 '7GM4',
 '7GM5',
 '7GM6',
 '7GM7',
 '7GM8',
 '7GM9',
 '7GMA',
 '7GMB',
 '7GMC',
 '7GMD',
 '7GME',
 '7GMF',
 '7GMG',
 '7GMH',
 '7GMI',
 '7GMJ',
 '7GMK',
 '7GML',
 '7GMM',
 '7GMN',
 '7GMO',
 '7GMP',
 '7GMQ',
 '7GMR',
 '7GMS',
 '7GMT',
 '7GMU',
 '7GMV',
 '7GMW',
 '7GMX',
 '7GMY',
 '7GMZ',
 '7GN0',
 '7GN1',
 '7GN2',
 '7GN3',
 '7GN4',
 '7GN5',
 '7GN6',
 '7GN7',
 '7GN8',
 '7GN9',
 '7GNA',
 '7GNB',
 '7GNC',
 '7GND',
 '7GNE',
 '7GNF',
 '7GNG',
 '7GNH',
 '7GNI',
 '7GNJ',
 '7GNK',
 '7GNL',
 '7GNM',
 '7GNN',
 '7GNO',
 '7GNP',
 '7GNQ',
 '7GNR',
 '7GNS',
 '7GNT',
 '7GNU',
 '7JPY',
 '7JPZ',
 '7JQ0',
 '7JQ1',
 '7JQ2',
 '7JQ3',
 '7JQ4',
 '7JQ5',
 '7LCR',
 '7M2P',
 '7MW2',
 '7MW3',
 '7MW4',
 '7MW5',
 '7MW6',
 '7RVN',
 '7RVQ',
 '7RVS',
 '7RVX',
 '7S4S',
 '7S6W',
 '7SH7',
 '7TLT',
 '7TUQ',
 '7TV0',
 '7UKL',
 '7URQ',
 '7URS',
 '7UU6',
 '7UU7',
 '7UU8',
 '7UU9',
 '7UUA',
 '7UUB',
 '7UUC',
 '7UUD',
 '7UUE',
 '8AJA',
 '8AJL',
 '8BWU',
 '8D4P',
 '8DDL',
 '8DGU',
 '8DGV',
 '8DGW',
 '8DGX',
 '8DIB',
 '8DIC',
 '8DID',
 '8DIE',
 '8DIF',
 '8DIG',
 '8DIH',
 '8DII',
 '8DL9',
 '8DLB',
 '8DMD',
 '8DPR',
 '8DS2',
 '8DSU',
 '8DTR',
 '8DTT',
 '8DTX',
 '8DW9',
 '8DWA',
 '8DZ0',
 '8DZ1',
 '8DZ2',
 '8DZ6',
 '8DZA',
 '8DZB',
 '8DZC',
 '8E25',
 '8E26',
 '8E63',
 '8E6A',
 '8E7C',
 '8EL2',
 '8ELG',
 '8ELH',
 '8EOO',
 '8EUA',
 '8F44',
 '8F45',
 '8F46',
 '8GIA',
 '8H3G',
 '8H3K',
 '8H3L',
 '8H4Y',
 '8H51',
 '8H57',
 '8H5F',
 '8H5P',
 '8H6I',
 '8H6N',
 '8H7K',
 '8H7W',
 '8H82',
 '8HBK',
 '8HI9',
 '8IX3',
 '8J5J',
 '8SUZ',
 '8TBE',
 '8URB'}
In [ ]: