%%html
<script src="https://bits.csb.pitt.edu/preamble.js"></script>
There are several parallel programming models enabled by a variety of hardware (multicore, cloud computing, supercomputers, GPU).
A thread of execution is the smallest sequence of programmed instructions that can be managed independently by an operating system scheduler.
A process is an instance of a computer program.
A process has its own address space. An address space is a mapping of virtual memory addresses to physical memory addresses managed by the operating system.
Address spaces prevent processes from crashing other applications or the operating system - they can only access their own memory.
import threading,time, math
cnt = [0]
def incrementCnt(cnt):
for i in range(1000000): # a million times
cnt[0] += math.sqrt(1)
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))
t1.start()
t2.start()
print(cnt[0])
What do we expect to print out?
%%html
<div id="thread1" style="width: 500px"></div>
<script>
var divid = '#thread1';
jQuery(divid).asker({
id: divid,
question: "What will print out?",
answers: ['0','2','1000000','2000000',"I don't know"],
server: "https://bits.csb.pitt.edu/asker.js/example/asker.cgi",
charter: chartmaker})
$(".jp-InputArea .o:contains(html)").closest('.jp-InputArea').hide();
</script>
cnt = [0]
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))
t1.start()
t2.start()
print(cnt)
time.sleep(1)
print(cnt)
time.sleep(1)
print(cnt)
[75813.0] [1182366.0] [1182366.0]
import multiprocess,time
def incrementCnt(cnt):
for i in range(1000000): # a million times
cnt[0] += math.sqrt(1)
cnt = [0]
p1 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p2 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p1.start()
p2.start()
#what do we expect when we print out cnt[0]?
%%html
<div id="proc1" style="width: 500px"></div>
<script>
var divid = '#proc1';
jQuery(divid).asker({
id: divid,
question: "What will print out?",
answers: ['0','2','1000000','2000000',"I don't know"],
server: "https://bits.csb.pitt.edu/asker.js/example/asker.cgi",
charter: chartmaker})
$(".jp-InputArea .o:contains(html)").closest('.jp-InputArea').hide();
</script>
cnt = [0]
p1 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p2 = multiprocess.Process(target=incrementCnt,args=(cnt,))
p1.start()
p2.start()
print(cnt[0])
time.sleep(3)
print(cnt[0])
0 0
Thread and Process Objects have the same interface
start
- start running the target
function with (optional) args
join
- wait for thread to finish before doing anything elseis_alive
- returns true if thread is still runningt1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))
t1.start() #start launches the thread to run target with args
t2.start()
print(cnt[0],t1.is_alive(),t2.is_alive())
t1.join() #join waits for a thread to finish
t2.join() #if you don't join a Process, it will become a zombie
print(cnt[0],t1.is_alive(),t2.is_alive())
78226.0 True True 1246141.0 False False
When two threads update the same resource, their access to that resource needs to be gated.
There are a variety of synchronization primitives provided for both threads and processes (although they usually aren't needed for processes):
Can be acquired by exactly one thread (calling acquire twice from the same thread will hang). Must be released to be acquired by another thread. Basically, just wrap the critical section with an acquire-release pair.
A recursive lock. This is a lock that can be acquired multiple times by the same thread (and then must be released exactly the same number times). This is especially useful for modular programming. For example, you can acquire/release a lock around a function call without working about what that function is doing with the lock.
lock = threading.Lock()
def incrementCnt(cnt):
for i in range(1000000): # a million times
lock.acquire() #only one thread can acquire the lock at a time
cnt[0] += math.sqrt(1) #this is the CRITICAL SECTION
lock.release()
cnt = [0]
t1 = threading.Thread(target=incrementCnt,args=(cnt,))
t2 = threading.Thread(target=incrementCnt,args=(cnt,))
t1.start() #start launches the thread to run target with args
t2.start()
t1.join()
t2.join()
print(cnt[0])
2000000.0
Threads communicate through their shared address space, and use locks to protect sensitive state. Several classes provide a simplified interface for communication (these are available with processes as well, but the underlying implementation is different).
Queue.Queue
(multiprocess.Queue
for processes) provides a simple, thread-safe, first-in-first-out queue.
put
: put an element on the queue. This will block if the queue has filled upget
: get an element from the queue. This will block if the queue is empty.A pipe is a communication channel between processes that can send and receive messages. Unlike a queue, it is not okay for multiple threads to write to the same end of the pipe at the same time.
Pipe()
return a duple of Connection
objects representing the ends of the pipe. Each connection object has the following methods:
send
: sends data to other end of the piperecv
: waits for data from other end of the pipe (unless pipe closed, then EOFError
)close
: close the pipeSince they don't have a shared address space, it is recommended the processes use exclusively either multiprocess.Queue
or multiprocess.Pipe
to communicate.
Use a pipe for communication between two processes (server-client architecture).
Use a queue for one-way communication between many threads (producer-consumer architecture).
import multiprocess
def chatty(conn): #this takes a Connection object representing one end of a pipe
msg = conn.recv()
conn.send("you sent me "+msg)
(c1,c2) = multiprocess.Pipe()
p1 = multiprocess.Process(target=chatty,args=(c2,))
p1.start()
c1.send("Hello!")
result = c1.recv()
p1.join()
print(result)
you sent me Hello!
multiprocess
supports the concept of a pool of workers. You initialize with the number of processes you want to run in parallel (the default is the number of CPUs on your system) and they are available for doing parallel work:
map
: the most important function - just like the built-in map, this will map a function to an iterable and return the result, but the mapping will be done in parallel. Blocks until the full result is computed and the result is in the proper order.map_async
: map without blockingimap
: parallel imap - returns iterable instead of list. The next()
method of the returned iterable will block if necessary.imap_unordered
: same as imap but returns values in order they are computedclose
: close the pool to prevent additional jobs from being submittedjoin
: must call close
first; waits for all jobs to completedef f(x):
return x*x
pool = multiprocess.Pool(processes=4)
print(pool.map(f,range(20)))
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
def dowork(inQ, outQ):
val = inQ.get()
outQ.put(val*val)
inQ = multiprocess.Queue()
outQ = multiprocess.Queue()
pool = multiprocess.Pool(4, dowork, (inQ, outQ))
inQ.put(4)
outQ.get()
16
#What is wrong with this code?
for i in range(10):
inQ.put(i)
while not outQ.empty():
print(outQ.get())
You should not produce and consume in the same thread. If outQ
fills up, inQ
will fill up and the put
will block.
The empty
/full
methods of the Queue class are pretty much useless since their result depends on when they are called. Here, no values had been generated when it was called so nothing is printed.
In order communicate that there are no more values, you must send a sentinal value.
Normally, the choice between threads and processes depends on how data will be accessed and the level of communication between parallel tasks etc.
However, in Python, the answer is almost always use multiprocessing, not threading.
Why? The CPython interpretter has a Global Interpretter Lock. This means that only one thread of python code is actually executed at any given time when using threads. With processes, each process has its own interpretter (with its own lock).
Writing correct, high performance parallel code can be difficult, but some in some cases it's trivial.
A problem is embarassingly parallel if it can be easily separated into independent subtasks (i.e., no need to communicate between threads) each of which does a substantial amount of computation.
Fortunately this is often the case.
In cases like these, using Pools will get you a significant speedup (by however many cores you have).
multiprocess
not threadsfrom Bio import Entrez
import re
seqfile = 'searchresults.fasta'
Entrez.email = "dkoes@pitt.edu"
records = Entrez.read(Entrez.esearch(db='protein',term='covid AND pdb[filter]',retmax=1000))
result = Entrez.efetch(db='protein',id=records['IdList'],rettype='fasta',retmode='text').read()
set(re.findall(r'>pdb\|(\S+?)\|',result))
{'6KD5', '6LU7', '6W61', '6WUU', '6WX4', '6XA4', '6XBG', '6XBH', '6XBI', '6XFN', '7AKU', '7BQY', '7EIN', '7GAV', '7GAW', '7GAX', '7GAY', '7GAZ', '7GB0', '7GB1', '7GB2', '7GB3', '7GB4', '7GB5', '7GB6', '7GB7', '7GB8', '7GB9', '7GBA', '7GBB', '7GBC', '7GBD', '7GBE', '7GBF', '7GBG', '7GBH', '7GBI', '7GBJ', '7GBK', '7GBL', '7GBM', '7GBN', '7GBO', '7GBP', '7GBQ', '7GBR', '7GBS', '7GBT', '7GBU', '7GBV', '7GBW', '7GBX', '7GBY', '7GBZ', '7GC0', '7GC1', '7GC2', '7GC3', '7GC4', '7GC5', '7GC6', '7GC7', '7GC8', '7GC9', '7GCA', '7GCB', '7GCC', '7GCD', '7GCE', '7GCF', '7GCG', '7GCI', '7GCJ', '7GCK', '7GCL', '7GCM', '7GCN', '7GCO', '7GCP', '7GCQ', '7GCR', '7GCS', '7GCT', '7GCU', '7GCV', '7GCW', '7GCX', '7GCY', '7GCZ', '7GD0', '7GD1', '7GD2', '7GD3', '7GD4', '7GD5', '7GD6', '7GD7', '7GD8', '7GD9', '7GDA', '7GDB', '7GDC', '7GDD', '7GDE', '7GDF', '7GDG', '7GDH', '7GDI', '7GDJ', '7GDK', '7GDL', '7GDM', '7GDN', '7GDO', '7GDP', '7GDQ', '7GDR', '7GDS', '7GDT', '7GDU', '7GDV', '7GDW', '7GDX', '7GDY', '7GDZ', '7GE0', '7GE1', '7GE2', '7GE3', '7GE4', '7GE5', '7GE6', '7GE7', '7GE8', '7GE9', '7GEA', '7GEB', '7GEC', '7GED', '7GEE', '7GEF', '7GEG', '7GEH', '7GEI', '7GEJ', '7GEK', '7GEL', '7GEM', '7GEN', '7GEO', '7GEQ', '7GER', '7GES', '7GET', '7GEU', '7GEV', '7GEW', '7GEX', '7GEY', '7GEZ', '7GF0', '7GF1', '7GF2', '7GF3', '7GF4', '7GF5', '7GF6', '7GF7', '7GF8', '7GF9', '7GFA', '7GFB', '7GFC', '7GFD', '7GFE', '7GFF', '7GFG', '7GFH', '7GFI', '7GFJ', '7GFK', '7GFL', '7GFM', '7GFN', '7GFO', '7GFP', '7GFQ', '7GFR', '7GFS', '7GFT', '7GFU', '7GFV', '7GFW', '7GFX', '7GFY', '7GFZ', '7GG0', '7GG1', '7GG2', '7GG3', '7GG4', '7GG5', '7GG6', '7GG7', '7GG8', '7GG9', '7GGA', '7GGB', '7GGC', '7GGD', '7GGE', '7GGF', '7GGG', '7GGH', '7GGI', '7GGJ', '7GGK', '7GGL', '7GGM', '7GGN', '7GGO', '7GGP', '7GGQ', '7GGR', '7GGS', '7GGT', '7GGU', '7GGV', '7GGW', '7GGX', '7GGY', '7GGZ', '7GH0', '7GH1', '7GH2', '7GH3', '7GH4', '7GH5', '7GH6', '7GH7', '7GH8', '7GH9', '7GHA', '7GHB', '7GHC', '7GHD', '7GHE', '7GHF', '7GHG', '7GHH', '7GHI', '7GHJ', '7GHK', '7GHL', '7GHM', '7GHN', '7GHO', '7GHP', '7GHQ', '7GHR', '7GHS', '7GHT', '7GHU', '7GHV', '7GHW', '7GHX', '7GHY', '7GHZ', '7GI0', '7GI1', '7GI2', '7GI3', '7GI4', '7GI5', '7GI6', '7GI7', '7GI8', '7GI9', '7GIA', '7GIB', '7GIC', '7GID', '7GIE', '7GIF', '7GIG', '7GIH', '7GII', '7GIJ', '7GIK', '7GIL', '7GIM', '7GIN', '7GIO', '7GIP', '7GIQ', '7GIR', '7GIS', '7GIT', '7GIU', '7GIV', '7GIW', '7GIX', '7GIY', '7GIZ', '7GJ0', '7GJ1', '7GJ2', '7GJ3', '7GJ4', '7GJ5', '7GJ6', '7GJ7', '7GJ8', '7GJ9', '7GJA', '7GJB', '7GJC', '7GJD', '7GJE', '7GJF', '7GJG', '7GJH', '7GJI', '7GJJ', '7GJK', '7GJL', '7GJM', '7GJN', '7GJO', '7GJP', '7GJQ', '7GJR', '7GJS', '7GJT', '7GJU', '7GJV', '7GJW', '7GJX', '7GJY', '7GJZ', '7GK0', '7GK1', '7GK2', '7GK3', '7GK4', '7GK5', '7GK6', '7GK7', '7GK8', '7GK9', '7GKA', '7GKB', '7GKC', '7GKD', '7GKE', '7GKF', '7GKG', '7GKH', '7GKI', '7GKJ', '7GKK', '7GKL', '7GKM', '7GKN', '7GKO', '7GKP', '7GKQ', '7GKR', '7GKS', '7GKT', '7GKU', '7GKV', '7GKW', '7GKX', '7GKY', '7GKZ', '7GL0', '7GL1', '7GL2', '7GL3', '7GL4', '7GL5', '7GL6', '7GL7', '7GL8', '7GL9', '7GLA', '7GLB', '7GLC', '7GLD', '7GLE', '7GLF', '7GLG', '7GLH', '7GLI', '7GLJ', '7GLK', '7GLL', '7GLM', '7GLN', '7GLO', '7GLP', '7GLQ', '7GLR', '7GLS', '7GLT', '7GLU', '7GLV', '7GLW', '7GLX', '7GLY', '7GLZ', '7GM0', '7GM1', '7GM2', '7GM3', '7GM4', '7GM5', '7GM6', '7GM7', '7GM8', '7GM9', '7GMA', '7GMB', '7GMC', '7GMD', '7GME', '7GMF', '7GMG', '7GMH', '7GMI', '7GMJ', '7GMK', '7GML', '7GMM', '7GMN', '7GMO', '7GMP', '7GMQ', '7GMR', '7GMS', '7GMT', '7GMU', '7GMV', '7GMW', '7GMX', '7GMY', '7GMZ', '7GN0', '7GN1', '7GN2', '7GN3', '7GN4', '7GN5', '7GN6', '7GN7', '7GN8', '7GN9', '7GNA', '7GNB', '7GNC', '7GND', '7GNE', '7GNF', '7GNG', '7GNH', '7GNI', '7GNJ', '7GNK', '7GNL', '7GNM', '7GNN', '7GNO', '7GNP', '7GNQ', '7GNR', '7GNS', '7GNT', '7GNU', '7JPY', '7JPZ', '7JQ0', '7JQ1', '7JQ2', '7JQ3', '7JQ4', '7JQ5', '7LCR', '7M2P', '7MW2', '7MW3', '7MW4', '7MW5', '7MW6', '7RVN', '7RVQ', '7RVS', '7RVX', '7S4S', '7S6W', '7SH7', '7TLT', '7TUQ', '7TV0', '7UKL', '7URQ', '7URS', '7UU6', '7UU7', '7UU8', '7UU9', '7UUA', '7UUB', '7UUC', '7UUD', '7UUE', '8AJA', '8AJL', '8BWU', '8D4P', '8DDL', '8DGU', '8DGV', '8DGW', '8DGX', '8DIB', '8DIC', '8DID', '8DIE', '8DIF', '8DIG', '8DIH', '8DII', '8DL9', '8DLB', '8DMD', '8DPR', '8DS2', '8DSU', '8DTR', '8DTT', '8DTX', '8DW9', '8DWA', '8DZ0', '8DZ1', '8DZ2', '8DZ6', '8DZA', '8DZB', '8DZC', '8E25', '8E26', '8E63', '8E6A', '8E7C', '8EL2', '8ELG', '8ELH', '8EOO', '8EUA', '8F44', '8F45', '8F46', '8GIA', '8H3G', '8H3K', '8H3L', '8H4Y', '8H51', '8H57', '8H5F', '8H5P', '8H6I', '8H6N', '8H7K', '8H7W', '8H82', '8HBK', '8HI9', '8IX3', '8J5J', '8SUZ', '8TBE', '8URB'}