# multiprocessing

## 11/30/2023
<a href="?print-pdf">print view</a>

<a href="multiprocessing.ipynb">notebook</a>

In [1]:
%%html
<script src="https://bits.csb.pitt.edu/preamble.js"></script>

# OMETs

Please fill out.

https://teaching.pitt.edu/omet/student-information/

# Parallel Programming

There are several parallel programming models enabled by a variety of hardware (**multicore**, cloud computing, supercomputers, GPU).

<img src='http://images.anandtech.com/reviews/cpu/intel/SNBE/Core_I7_LGA_2011_Die.jpg' width="400">

# Threads vs. Processes

A **thread** of execution is the smallest sequence of programmed instructions that can be managed independently by an operating system scheduler. 

A **process** is an instance of a computer program.

<center><img src='http://upload.wikimedia.org/wikipedia/commons/a/a5/Multithreaded_process.svg' width="400"></center>

# Address Spaces

A process has its own *address space*.  An address space is a mapping of **virtual memory addresses** to **physical memory addresses** managed by the operating system.

Address spaces prevent processes from crashing other applications or the operating system - they can only access their *own* memory.

<center><img src='http://upload.wikimedia.org/wikipedia/commons/thumb/3/32/Virtual_address_space_and_physical_address_space_relationship.svg/587px-Virtual_address_space_and_physical_address_space_relationship.svg.png' width="350"></center>

# Threads vs. Processes

In [None]:
import threading,time, math

cnt = [0]

def incrementCnt(cnt):
    for i in range(1000000): # a million times
        cnt[0] += math.sqrt(1)

t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start()
t2.start()

print(cnt[0])

What do we expect to print out?

In [2]:
%%html
<div id="thread1" style="width: 500px"></div>
<script>
    var divid = '#thread1';
	jQuery(divid).asker({
	    id: divid,
	    question: "What will print out?",
		answers: ['0','2','1000000','2000000',"I don't know"],
        server: "https://bits.csb.pitt.edu/asker.js/example/asker.cgi",
		charter: chartmaker})
    
$(".jp-InputArea .o:contains(html)").closest('.jp-InputArea').hide();


</script>

# The Answer

In [28]:
cnt = [0]

t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start()
t2.start()

print(cnt) 
time.sleep(1)
print(cnt)
time.sleep(1)
print(cnt)

[75813.0]
[1182366.0]
[1182366.0]


# Threads vs. Processes

In [29]:
import multiprocess,time

def incrementCnt(cnt):
    for i in range(1000000): # a million times
        cnt[0] += math.sqrt(1)
        
cnt = [0]

p1 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p2 = multiprocess.Process(target=incrementCnt,args=(cnt,))

p1.start()
p2.start()

#what do we expect when we print out cnt[0]?

In [3]:
%%html
<div id="proc1" style="width: 500px"></div>
<script>
    var divid = '#proc1';
	jQuery(divid).asker({
	    id: divid,
	    question: "What will print out?",
		answers: ['0','2','1000000','2000000',"I don't know"],
        server: "https://bits.csb.pitt.edu/asker.js/example/asker.cgi",
		charter: chartmaker})
    
$(".jp-InputArea .o:contains(html)").closest('.jp-InputArea').hide();


</script>

# The Answer

In [31]:
cnt = [0]
p1 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p2 = multiprocess.Process(target=incrementCnt,args=(cnt,))

p1.start()
p2.start()

print(cnt[0])
time.sleep(3)
print(cnt[0])

0
0


# Threads/Process Objects

Thread and Process Objects have the same interface 
 * `start` - start running the `target` function with (optional) `args`
 * `join` - wait for thread to finish before doing anything else
 * `is_alive` - returns true if thread is still running

# Example

In [32]:
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start() #start launches the thread to run target with args
t2.start()

print(cnt[0],t1.is_alive(),t2.is_alive())

t1.join() #join waits for a thread to finish
t2.join() #if you don't join a Process, it will become a zombie

print(cnt[0],t1.is_alive(),t2.is_alive())

78226.0 True True
1246141.0 False False


# Synchronization

When two threads update the same resource, their access to that resource needs to be gated.

There are a variety of synchronization primitives provided for both threads and processes (although they usually aren't needed for processes):

### Lock
Can be acquired by exactly one thread (calling acquire twice from the same thread will hang).  Must be released to be acquired by another thread.  Basically, just wrap the *critical section* with an acquire-release pair.

### RLock
A *recursive* lock. This is a lock that can be acquired multiple times by the same thread (and then must be released exactly the same number times).  This is especially useful for modular programming.  For example, you can acquire/release a lock around a function call without working about what that function is doing with the lock.


# Lock Example

In [33]:
lock = threading.Lock()

def incrementCnt(cnt):
    for i in range(1000000): # a million times
        lock.acquire()  #only one thread can acquire the lock at a time
        cnt[0] += math.sqrt(1) #this is the CRITICAL SECTION
        lock.release()

cnt = [0]
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))

t1.start() #start launches the thread to run target with args
t2.start()

t1.join() 
t2.join() 

print(cnt[0])

2000000.0


# Communication

Threads communicate through their shared address space, and use locks to protect sensitive state.  Several classes provide a simplified interface for communication (these are available with processes as well, but the underlying implementation is different). 

### Queue
`Queue.Queue` (`multiprocess.Queue` for processes) provides a simple, thread-safe, first-in-first-out queue.

* `put`: put an element on the queue. **This will block if the queue has filled up**
* `get`: get an element from the queue. **This will block if the queue is empty**.  


# Communication

### Pipe
A pipe is a communication channel between processes that can send and receive messages.   *Unlike a queue, it is not okay for multiple threads to write to the same end of the pipe at the same time*.

`Pipe()` return a duple of `Connection` objects representing the ends of the pipe.  Each connection object has the following methods:

* `send`: sends data to other end of the pipe
* `recv`: waits for data from other end of the pipe (unless pipe closed, then `EOFError`)
* `close`: close the pipe

# Communication

Since they don't have a shared address space, it is recommended the *processes* use exclusively either `multiprocess.Queue` or `multiprocess.Pipe` to communicate.

Use a pipe for communication between two processes (server-client architecture).

Use a queue for one-way communication between many threads (producer-consumer architecture).

# Pipes

In [34]:
import multiprocess

def chatty(conn): #this takes a Connection object representing one end of a pipe
    msg = conn.recv()
    conn.send("you sent me "+msg)
    
(c1,c2) = multiprocess.Pipe()

p1 = multiprocess.Process(target=chatty,args=(c2,))
p1.start()

c1.send("Hello!")
result = c1.recv()
p1.join()

print(result)

you sent me Hello!


# Pools

`multiprocess` supports the concept of a *pool* of workers.  You initialize with the number of processes you want to run in parallel (the default is the number of CPUs on your system) and they are available for doing parallel work:

* `map`: the most important function - just like the built-in map, this will map a function to an iterable and return the result, but the *mapping will be done in parallel*.  Blocks until the full result is computed and the result is in the proper order.
* `map_async`: map without blocking
* `imap`: parallel imap - returns iterable instead of list.  The `next()` method of the returned iterable will block if necessary.
* `imap_unordered`: same as imap but returns values in order they are computed
* `close`: close the pool to prevent additional jobs from being submitted
* `join`: must call `close` first; waits for all jobs to complete

# Pool Example

In [35]:
def f(x):
    return x*x

pool = multiprocess.Pool(processes=4)

print(pool.map(f,range(20)))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]


# Producer/Consumer

In [36]:
def dowork(inQ, outQ):
    val = inQ.get()
    outQ.put(val*val)

inQ = multiprocess.Queue()
outQ = multiprocess.Queue()
pool = multiprocess.Pool(4, dowork, (inQ, outQ))

In [37]:
inQ.put(4)

In [38]:
outQ.get()

16

# Bad Example

In [24]:
#What is wrong with this code?
for i in range(10):
    inQ.put(i)

while not outQ.empty():
    print(outQ.get())

You should not produce and consume in the same thread.  If `outQ` fills up, `inQ` will fill up and the `put` will block.

The `empty`/`full` methods of the Queue class are pretty much **useless** since their result depends on when they are called.  Here, no values had been generated when it was called so nothing is printed.

In order communicate that there are no more values, you must send a *sentinal* value.

# Threads or Processes?

Normally, the choice between threads and processes depends on how data will be accessed and the level of communication between parallel tasks etc.

However, in Python, the answer is almost always **use multiprocessing, not threading**.

Why? The CPython interpretter has a *Global Interpretter Lock*.  This means that only **one** thread of python code is actually executed at any given time when using threads.  With processes, each process has its own interpretter (with its own lock).

<center><img width="300" src='../files/threadsprocesses.png'></center>

# Embarassingly Parallel

Writing correct, high performance parallel code can be difficult, but some in some cases it's trivial.

A problem is *embarassingly parallel* if it can be easily separated into *independent* subtasks (i.e., no need to communicate between threads) each of which does a substantial amount of computation.

Fortunately this is often the case.
 * Apply this filter to 1000 images
 * Process these 5000 protein structures
 * Compute RMSDs of all frames in a trajectory
 
In cases like these, using **Pools** will get you a significant speedup (by however many cores you have).

# Key Concepts

* You can running many things at the same time with threads/processes
* This is an easy way to make things faster, but can get complicated
* Use `multiprocess` not threads
* Processes can communicate with Queues/Pipes
* 90% of the time all you need are Pools, and they are not complicated

# Project Plans?

# Project

 * Take a search term as an argument
 * Use biopython to search the NCBI protein database for entries that match this term and are in the pdb
 * Extract the PDB entries
 * For each PDB entry, use ProDy to calculate the ANM modes (this can be done in parallel)
 * Sort the results based on the highest fractional variance in the first mode
 * Print out the top ten PDBs with the fractional variance of their first three modes

In [39]:
from Bio import Entrez
import re
seqfile = 'searchresults.fasta'
Entrez.email = "dkoes@pitt.edu"
records = Entrez.read(Entrez.esearch(db='protein',term='covid AND pdb[filter]',retmax=1000))
result = Entrez.efetch(db='protein',id=records['IdList'],rettype='fasta',retmode='text').read()
set(re.findall(r'>pdb\|(\S+?)\|',result))

{'6KD5',
 '6LU7',
 '6W61',
 '6WUU',
 '6WX4',
 '6XA4',
 '6XBG',
 '6XBH',
 '6XBI',
 '6XFN',
 '7AKU',
 '7BQY',
 '7EIN',
 '7GAV',
 '7GAW',
 '7GAX',
 '7GAY',
 '7GAZ',
 '7GB0',
 '7GB1',
 '7GB2',
 '7GB3',
 '7GB4',
 '7GB5',
 '7GB6',
 '7GB7',
 '7GB8',
 '7GB9',
 '7GBA',
 '7GBB',
 '7GBC',
 '7GBD',
 '7GBE',
 '7GBF',
 '7GBG',
 '7GBH',
 '7GBI',
 '7GBJ',
 '7GBK',
 '7GBL',
 '7GBM',
 '7GBN',
 '7GBO',
 '7GBP',
 '7GBQ',
 '7GBR',
 '7GBS',
 '7GBT',
 '7GBU',
 '7GBV',
 '7GBW',
 '7GBX',
 '7GBY',
 '7GBZ',
 '7GC0',
 '7GC1',
 '7GC2',
 '7GC3',
 '7GC4',
 '7GC5',
 '7GC6',
 '7GC7',
 '7GC8',
 '7GC9',
 '7GCA',
 '7GCB',
 '7GCC',
 '7GCD',
 '7GCE',
 '7GCF',
 '7GCG',
 '7GCI',
 '7GCJ',
 '7GCK',
 '7GCL',
 '7GCM',
 '7GCN',
 '7GCO',
 '7GCP',
 '7GCQ',
 '7GCR',
 '7GCS',
 '7GCT',
 '7GCU',
 '7GCV',
 '7GCW',
 '7GCX',
 '7GCY',
 '7GCZ',
 '7GD0',
 '7GD1',
 '7GD2',
 '7GD3',
 '7GD4',
 '7GD5',
 '7GD6',
 '7GD7',
 '7GD8',
 '7GD9',
 '7GDA',
 '7GDB',
 '7GDC',
 '7GDD',
 '7GDE',
 '7GDF',
 '7GDG',
 '7GDH',
 '7GDI',
 '7GDJ',
 '7GDK',
 '7GDL',
 