~ Pat Gelsinger
~ Max Ludwig Henning Delbrück
Since about this time last year, I have been tracking mutations in the Sars-CoV-2 virus. I have mostly been relying on data from NCBI. Right now, the NCBI database contains 51,650 Sars-CoV-2 genomic sequences. On a well-powered desktop computer, it's possible to analyze that amount of data, albeit slowly. However, the major depository of pandemic sequences is at GISAID. At this point, GISAID contains over 500,000 genomic sequences. Dealing with that amount of data on a desktop system presents some new problems.
On Jan. 27, 2021 I downloaded a multiple sequence alignment from GISAID. It contained 399,796 sequences, each with 32,280 letters. The MSA file weighs in at about 13 Gb. The first thing I wanted to do was to find the nucleotide variation across the alignment. This involves counting the numbers of each type of nucleotide in each column of the alignment.
Thirteen Gb seemed a bit large for testing, so I pulled out a subset of the alignment containing sequences from USA cases. This subset contained 78,193 sequences and takes up about 2.5 GB.
A naïve approach to counting the variation across the aligned columns would be something like this.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_biopython.py
@author: Bill Thompson
@license: GPL 3
@copyright: 2020_03_05
"""
import os
import psutil
from collections import Counter
from Bio import AlignIO
def print_memory_use():
"""
Print percent of memory used.
From https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python
Returns
-------
None.
"""
pid = os.getpid()
py = psutil.Process(pid)
memoryUse = py.memory_info()[0]/2.**30 # memory use in GB...I think
print('memory use:', memoryUse)
def gisaid_get_columns_variation(align):
"""
Get counts of each nucleotide type in each column of MSA.
Parameters
----------
align : Bio.Align,MultipleSeqAlignmnet object
Alignment returned by Bio.AlignIO
Returns
-------
column_variation : a list
A list counts for each column of the MSA.
"""
column_variation = list()
for col in range(align.get_alignment_length()):
c = Counter(align[:, col])
column_variation.append(c)
return column_variation
def main():
align_file = 'msa_usa.fasta'
align = AlignIO.read(align_file, 'fasta')
print_memory_use()
column_variation = gisaid_get_columns_variation(align)
print_memory_use()
if __name__ == "__main__":
main()
The code uses Biopython's AlignIO routine to read the data and Counter to get a count of each nucleotide type. This approach is simple, but it has problems. On my 32 Gb i9 system, this method takes over 22 minutes to run and uses about 2.5 Gb of memory. Of that 22 minutes, about 42 seconds involves reading the MSA file. The rest of the time is spend iterating through the columns. Scaling up to the full MSA would take several hours and uses about 13 Gb of memory. Not impossible, but it's only a small part of the analysis.Python Multiprocessing
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_multi.py
@author: Bill Thompson
@license: GPL 3
@copyright: 2021_-3_05
"""
import time
import os
import psutil
from collections import Counter
from Bio import AlignIO
import multiprocessing as mp
def print_memory_use():
"""
Print percent of memory used.
From https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python
Returns
-------
None.
"""
pid = os.getpid()
py = psutil.Process(pid)
memoryUse = py.memory_info()[0]/2.**30 # memory use in GB...I think
print('memory use:', memoryUse)
def col_variation(align, col):
return Counter(align[:, col])
def gisaid_get_columns_variation(align, num_processes = 10):
"""
Get counts of each nucleotide type in each column of MSA.
Parameters
----------
align : Bio.Align,MultipleSeqAlignmnet object
Alignment returned by Bio.AlignIO
Returns
-------
column_variation : a list
A list counts for each column of the MSA.
"""
pool = mp.Pool(processes = num_processes)
column_variation = pool.starmap(col_variation, [(align, col)
for col in range(align.get_alignment_length())])
return column_variation
def main():
align_file = 'msa_usa.fasta'
t1 =time.time()
align = AlignIO.read(align_file, 'fasta')
print('Load time:', time.time() - t1)
print_memory_use()
num_processors = mp.cpu_count()
t1 =time.time()
column_variation = gisaid_get_columns_variation(align, num_processors - 1)
print('Calculation time:', time.time() - t1)
print_memory_use()
if __name__ == "__main__":
main()
pool = mp.Pool(processes = num_processes)
column_variation = pool.starmap(col_variation, [(align, col)
for col in range(align.get_alignment_length())])
I'll Never Forget You
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_memory.py
@author: Bill Thompson
@license: GPL 3
@copyright: 2020_03_05
"""
import os
import psutil
import gc
from Bio import AlignIO
import numpy as np
import pandas as pd
def print_memory_use():
"""
Print percent of memory used.
From https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python
Returns
-------
None.
"""
pid = os.getpid()
py = psutil.Process(pid)
memoryUse = py.memory_info()[0]/2.**30 # memory use in GB...I think
print('memory use:', memoryUse)
def read_alignment(filename):
align = AlignIO.read(filename, 'fasta')
# n = np.ones((78193, 32280), dtype = int)
# p = pd.DataFrame(n)
def main():
align_file = 'msa_usa.fasta'
print('Initial:')
print_memory_use()
for r in range(5):
print('Round:', r + 1)
read_alignment(align_file)
gc.collect()
print_memory_use()
if __name__ == "__main__":
main()
$ time python test_memory.py
Initial:
memory use: 0.050556182861328125
Round: 1
memory use: 0.7907676696777344
Round: 2
memory use: 0.7904548645019531
Round: 3
memory use: 0.7904548645019531
Round: 4
memory use: 1.3799095153808594
Round: 5
memory use: 1.3799095153808594
real 3m34.374s
user 1m6.712s
sys 0m16.028s
Sharing is Caring
def read_alignment_to_numpy(filename):
"""
Read a MSA in fasta format into a numpy array.
Sequences are stored in a (seq_count, align_length) numpy array
of characters. The array is stored in coumn major order to speed
up column access.
Parameters
----------
filename : str
The name of an MSA file.
Returns
-------
None.
Requires
-------
All sequences must have the same length.
"""
# First we need to figure out how much memory we will need
# Make a pass through the file and count number of sequences and
# width of each record.
def get_msa_size():
res = subprocess.check_output(['/usr/bin/grep', '>', filename]).split(b'\n')
res.remove(b'')
rec = next(SeqIO.parse(filename, 'fasta'))
seq_count = len(res)
align_length = len(rec.seq)
return (seq_count, align_length)
seq_count, align_length = get_msa_size()
# Reserve some memory for sequence data. .
shm = shared_memory.SharedMemory(create = True,
size = seq_count * align_length)
align_array = np.ndarray((seq_count, align_length),
dtype = '|S1',
order = 'F',
buffer = shm.buf)
# load the sequences
seq_count = 0
curr_line = ''
with open(filename) as f:
for line in f:
line = line.strip()
if line[0] == '>':
if len(curr_line) > 0:
align_array[seq_count, :] = list(curr_line)
seq_count += 1
curr_line = ''
elif len(line) > 0:
curr_line += line
# Add the last sequence.
if len(curr_line) > 0:
align_array[seq_count, :] = list(curr_line)
return (align_array, shm)
def get_col_variation(shm_name, shape, col):
"""
Get count of each nucleotide in each column.
Parameters
----------
shm_name : str
The id of the shared memory block.
shape : a tuple of ints.
(number of sequences, alignmnent width)
col : int
The column number.
Returns
-------
TYPE
A count of each nucleotide in the column.
"""
shm = shared_memory.SharedMemory(name=shm_name)
align = np.ndarray(shape,
dtype = '|S1',
order = 'F',
buffer = shm.buf)
return Counter(align[:, col])
def gisaid_get_columns_variation(shm_name, shape, num_processes = 10):
"""
Get counts of each nucleotide type in each column of MSA.
Parameters
----------
shm_name : str
The assigned name of the shared memory block.
shape : a tuple of ints
(number of sequences, alignmnet width)
num_processes : int
The number of subprocesses to start.
Returns
-------
column_variation : a list
A list counts for each column of the MSA.
"""
pool = mp.Pool(processes = num_processes)
column_variation = pool.starmap(get_col_variation, [(shm_name, shape, col)
for col in range(shape[1])])
return column_variation
def main():
align_file = 'msa_usa.fasta'
t1 =time.time()
align_array, shm = read_alignment_to_numpy(align_file)
print('Load time:', time.time() - t1)
print_memory_use()
num_processors = mp.cpu_count()
t1 =time.time()
column_variation = gisaid_get_columns_variation(shm.name,
align_array.shape,
num_processors - 1)
print('Calculation time:', time.time() - t1)
print_memory_use()
shm.close()
shm.unlink()
$ time python test_sharing.py
Load time: 211.76434803009033
memory use: 2.3973617553710938
Calculation time: 34.26133418083191
memory use: 2.415111541748047
No comments:
Post a Comment