A Tight Squeeze

Data is the new science. Big Data holds the answers.
~ Pat Gelsinger

A strong feeling of adventure is animating those who are working on bacterial viruses, a feeling that they have a small part in the great drive towards a fundamental problem in biology.
~ Max Ludwig Henning Delbrück

Since about this time last year, I have been tracking mutations in the Sars-CoV-2 virus. I have mostly been relying on data from NCBI. Right now, the NCBI database contains 51,650 Sars-CoV-2 genomic sequences. On a well-powered desktop computer, it's possible to analyze that amount of data, albeit slowly. However, the major depository of pandemic sequences is at GISAID. At this point, GISAID contains over 500,000 genomic sequences. Dealing with that amount of data on a desktop system presents some new problems.

On Jan. 27, 2021 I downloaded a multiple sequence alignment from GISAID. It contained 399,796 sequences, each with 32,280 letters. The MSA file weighs in at about 13 Gb. The first thing I wanted to do was to find the nucleotide variation across the alignment. This involves counting the numbers of each type of nucleotide in each column of the alignment.

Thirteen Gb seemed a bit large for testing, so I pulled out a subset of the alignment containing sequences from USA cases. This subset contained 78,193 sequences and takes up about 2.5 GB.

A naïve approach to counting the variation across the aligned columns would be something like this.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_biopython.py

@author: Bill Thompson
@license: GPL 3
@copyright: 2020_03_05
"""
import os
import psutil
from collections import Counter
from Bio import AlignIO

def print_memory_use():
    """
    Print percent of memory used.
    From https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python

    Returns
    -------
    None.

    """    
    pid = os.getpid()
    py = psutil.Process(pid)
    memoryUse = py.memory_info()[0]/2.**30  # memory use in GB...I think
    print('memory use:', memoryUse)

def gisaid_get_columns_variation(align):
    """
    Get counts of each nucleotide type in each column of MSA.

    Parameters
    ----------
    align : Bio.Align,MultipleSeqAlignmnet object
        Alignment returned by Bio.AlignIO

    Returns
    -------
    column_variation : a list
        A list counts for each column of the MSA.

    """
    column_variation = list()
    for col in range(align.get_alignment_length()):
        c = Counter(align[:, col])
        column_variation.append(c)

    return column_variation
           
def main():
    align_file = 'msa_usa.fasta'
    
    align = AlignIO.read(align_file, 'fasta')
    print_memory_use()
    
    column_variation = gisaid_get_columns_variation(align)
    print_memory_use()

if __name__ == "__main__":
    main()

The code uses Biopython's AlignIO routine to read the data and Counter to get  a count of each nucleotide type. This approach is simple, but it has problems. On my 32 Gb i9 system, this method takes over 22 minutes to run and uses about 2.5 Gb of memory. Of that 22 minutes, about 42 seconds involves reading the MSA file. The rest of the time is spend iterating through the columns. Scaling up to the full MSA would take several  hours and uses about 13 Gb of memory. Not impossible, but it's only a small part of the analysis.

If we need to have all of the data in memory, there's not much that can be done about the memory consumption, but maybe something can be done about the speed.

Python Multiprocessing


On a multithreading machine, a natural approach to decreasing runtime is to use multiple processes. Python provides a multiprocessing module with a variety of options for spreading a task over a number of processes.  My current machine has 8 cores, effectively giving me the ability to run 16 processes at once. Essentially, I can pretend that I have 16 CPUs.

Here's a version that tries to use multiprocessing and pretends to have 15 CPUs. It uses multiprocessing.Pool to allocate 15 processes and iterate across the columns.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_multi.py

@author: Bill Thompson
@license: GPL 3
@copyright: 2021_-3_05
"""
import time
import os
import psutil
from collections import Counter
from Bio import AlignIO
import multiprocessing as mp

def print_memory_use():
    """
    Print percent of memory used.
    From https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python

    Returns
    -------
    None.

    """    
    pid = os.getpid()
    py = psutil.Process(pid)
    memoryUse = py.memory_info()[0]/2.**30  # memory use in GB...I think
    print('memory use:', memoryUse)
    
def col_variation(align, col):
    return Counter(align[:, col])

def gisaid_get_columns_variation(align, num_processes = 10):
    """
    Get counts of each nucleotide type in each column of MSA.

    Parameters
    ----------
    align : Bio.Align,MultipleSeqAlignmnet object
        Alignment returned by Bio.AlignIO

    Returns
    -------
    column_variation : a list
        A list counts for each column of the MSA.

    """
    pool = mp.Pool(processes = num_processes)
    column_variation = pool.starmap(col_variation, [(align, col)
                                                    for col in range(align.get_alignment_length())])

    return column_variation

def main():
    align_file = 'msa_usa.fasta'
    
    t1 =time.time()
    align = AlignIO.read(align_file, 'fasta')
    print('Load time:', time.time() - t1)
    print_memory_use()
    
    num_processors = mp.cpu_count()
    
    t1 =time.time()
    column_variation = gisaid_get_columns_variation(align, num_processors - 1)
    print('Calculation time:', time.time() - t1)
    print_memory_use()
    
if __name__ == "__main__":
    main()


Unfortunately, this code runs for several seconds once the MSA is loaded, but then crashes with a "MemoryError Exception in thread Thread-1" error. 

The difference from the previous version is
   pool = mp.Pool(processes = num_processes)
   column_variation = pool.starmap(col_variation, [(align, col)
                                                    for col in range(align.get_alignment_length())])


The first creates a pool of processors and the second line spawns a series tasks and asks col_variation to do the work. 

In trying this, I make two assumptions. First, Python is not passing large objects like align to functions. Instead, It is passing a reference to the object. col_variation doesn't modify align. It just extracts a column as a string. Therefore, I would assume that align isn't being modified and python shouldn't make a copy of the object.

Secondly, when the multiprocessing module spawns a task, it performs a fork() call. fork creates a copy of the code for the new processor and launches it. The program is running on Linux. When forking, Linux uses copy on write. This means that memory pages that aren't modified don't get copied to the new process. They are only copied when the page is modified by the forked task.

Ideally, we should not be chewing up much more memory than the original task. However, that's not what occurs. Python keeps track of active objects using reference counting. That is, when an object is created, it gets a reference count of one. When it is accessed, for example in an assignment statement, the reference count is increased. When the variable that refers to the original object goes out of scope the reference count is decremented. When the reference count goes to zero, the object is available for garbage collection. An operation like Align[:, col] apparently increments the reference count, which tells Linux that the page has been modified, causing a copy.

I'll Never Forget You


A second subtle problem arises because it appears that BioPython 1.78's Bio.AlignIO, Bio.MultipleSeqAlignment, or some other underlying class has a memory leak. A leak occurs when an object goes out of scope and is no longer referenced but can't be reclaimed by garbage collection. Memory used by the object should be collected by the garbage collector and available for reuse. 

This code loads the MSA file and immediately allows it to go out of scope. Garbage collection is forced and the amount of used memory is printed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
test_memory.py

@author: Bill Thompson
@license: GPL 3
@copyright: 2020_03_05
"""
import os
import psutil
import gc
from Bio import AlignIO
import numpy as np
import pandas as pd

def print_memory_use():
    """
    Print percent of memory used.
    From https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python

    Returns
    -------
    None.

    """    
    pid = os.getpid()
    py = psutil.Process(pid)
    memoryUse = py.memory_info()[0]/2.**30  # memory use in GB...I think
    print('memory use:', memoryUse)
    
def read_alignment(filename):
    align = AlignIO.read(filename, 'fasta')
    # n = np.ones((78193, 32280), dtype = int)
    # p = pd.DataFrame(n)
    
def main():
    align_file = 'msa_usa.fasta'

    print('Initial:')
    print_memory_use()
    for r in range(5):
        print('Round:', r + 1)
        read_alignment(align_file)
        gc.collect()
        print_memory_use()
        
if __name__ == "__main__":
    main()

Here's the output.

$ time python test_memory.py
Initial:
memory use: 0.050556182861328125
Round: 1
memory use: 0.7907676696777344
Round: 2
memory use: 0.7904548645019531
Round: 3
memory use: 0.7904548645019531
Round: 4
memory use: 1.3799095153808594
Round: 5
memory use: 1.3799095153808594

real    3m34.374s
user    1m6.712s
sys     0m16.028s


This isn't just Python overhead. You don't see similar behavior if the call to AlignIO is replaced by the creation of large numpy or pandas objects.

Sharing is Caring


In order to process large alignments efficiently we will need to avoid creating multiple copies of a large object. One solution is to create one copy of the alignment and share it among the subprocesses.

Python introduced multiprocessing.shared_memory in Python 3.8.  shared_memory makes it easy to share large object such as numpy arrays or pandas DataFrames.  It's a bit trickier to share a MultipleSeqAlignment object. Bio.Align.MultipleSeqAlignment objects store the alignment as a list of SeqRecords rather than as an array or other fixed block of memory. It would be possible to create a subclass of  MultipleSeqAlignment and store the alignment in a ShareableList. However, given the possible memory leak, I decided to try a more direct route.

Instead of using AlignIO to create a MultipleSeqAlignment object, we will use a 2D numpy array to hold the sequences. We will ignore the header information for now. Using this method, we would have to capture that information separately.

First, let's set up the shared memory and fill it with the sequence data. The code makes a system call to grep to count the number of sequences. There are many other ways to do this. This method turns out to be relatively quick.

def read_alignment_to_numpy(filename):
    """
    Read a MSA in fasta format into a numpy array.
    
    Sequences are stored in a (seq_count, align_length) numpy array
    of characters. The array is stored in coumn major order to speed
    up column access.
    
    Parameters
    ----------
    filename : str
        The name of an MSA file.

    Returns
    -------
    None.
    
    Requires
    -------
    All sequences must have the same length.

    """         
    # First we need to figure out how much memory we will need
    # Make a pass through the file and count number of sequences and 
    # width of each record.
    
    def get_msa_size():    
        res = subprocess.check_output(['/usr/bin/grep', '>', filename]).split(b'\n')
        res.remove(b'')
        rec = next(SeqIO.parse(filename, 'fasta'))
        seq_count = len(res)
        align_length = len(rec.seq)
        
        return (seq_count, align_length)

    seq_count, align_length = get_msa_size()
    
    # Reserve some memory for sequence data.                .
    shm = shared_memory.SharedMemory(create = True, 
                                      size = seq_count * align_length)
    align_array = np.ndarray((seq_count, align_length), 
                              dtype = '|S1', 
                              order = 'F',
                              buffer = shm.buf)
    
    # load the sequences
    seq_count = 0
    curr_line = ''
    with open(filename) as f:
        for line in f:
            line = line.strip()
            if line[0] == '>':                    
                if len(curr_line) > 0:
                    align_array[seq_count, :] = list(curr_line)
                    seq_count += 1
                curr_line = ''
            elif len(line) > 0:
                curr_line += line

    # Add the last sequence.                
    if len(curr_line) > 0:
        align_array[seq_count, :] = list(curr_line)
       
    return (align_array, shm)


This method takes longer to load that using AlignIO, but it avoids the memory leak and the problem of doubling the amount of memory. It's also quicker than using SeqIO to iterate through the sequences.

To calculate the column variation, we access the shared memory and retrieve the appropriate column.

def get_col_variation(shm_name, shape, col):
    """
    Get count of each nucleotide in each column.

    Parameters
    ----------
    shm_name : str
        The id of the shared memory block.
    shape : a tuple of ints.
        (number of sequences, alignmnent width)
    col : int
        The column number.

    Returns
    -------
    TYPE
        A count of each nucleotide in the column.

    """
    shm = shared_memory.SharedMemory(name=shm_name)
    align = np.ndarray(shape, 
                       dtype = '|S1', 
                       order = 'F',
                       buffer = shm.buf)
    
    return Counter(align[:, col])

def gisaid_get_columns_variation(shm_name, shape, num_processes = 10):
    """
    Get counts of each nucleotide type in each column of MSA.

    Parameters
    ----------
    shm_name : str
        The assigned name of the shared memory block.
    shape : a tuple of ints
        (number of sequences, alignmnet width)
    num_processes : int
        The number of subprocesses to start.

    Returns
    -------
    column_variation : a list
        A list counts for each column of the MSA.

    """
    pool = mp.Pool(processes = num_processes)
    column_variation = pool.starmap(get_col_variation, [(shm_name, shape, col)
                                                        for col in range(shape[1])])
        
    return column_variation

def main():
    align_file = 'msa_usa.fasta'
    
    t1 =time.time()
    align_array, shm = read_alignment_to_numpy(align_file)
    print('Load time:', time.time() - t1)
    print_memory_use()
    
    num_processors = mp.cpu_count()
    
    t1 =time.time()
    column_variation = gisaid_get_columns_variation(shm.name, 
                                                    align_array.shape,
                                                    num_processors - 1)
    print('Calculation time:', time.time() - t1)
    print_memory_use()
    
    shm.close()
    shm.unlink()


Once the array is filled with sequences, we use a number of processes to calculate the contents of the columns. Calculation is quick, about 32 seconds on my system to go though a (78193, 32280) array column by column.

$ time python test_sharing.py
Load time: 211.76434803009033
memory use: 2.3973617553710938
Calculation time: 34.26133418083191
memory use: 2.415111541748047


Because of GISAID's data sharing policy, I can't release the data used in these examples, but the code for the shared memory tests is available at https://github.com/analytic-garden/shared_memory.

No comments:

Post a Comment