from clubcpg.ParseBam import BamFileReadParser
import os
import logging
from multiprocessing import Pool
import numpy as np
from collections import defaultdict
import time
from pandas.core.indexes.base import InvalidIndexError
[docs]class CalculateCompleteBins:
"""
Class to calculate the number of reads covering all CpGs
"""
[docs] def __init__(self,
bam_file, bin_size, output_directory, number_of_processors=1, mbias_read1_5=None, mbias_read1_3=None, mbias_read2_5= None, mbias_read2_3=None, no_overlap=True):
"""
This class is initialized with a path to a bam file and a bin size
:param bam_file: One of the BAM files for analysis to be performed
:param bin_size: Size of the bins for the analysis, integer
:number_of_processors: How many CPUs to use for parallel computation, default=1
"""
self.input_bam_file = bam_file
self.bin_size = int(bin_size)
self.number_of_processors = int(number_of_processors)
self.output_directory = output_directory
self.bins_no_reads = 0
self.bins_yes_reads = 0
self.mbias_read1_5 = mbias_read1_5
self.mbias_read1_3 = mbias_read1_3
self.mbias_read2_5 = mbias_read2_5
self.mbias_read2_3 = mbias_read2_3
self.no_overlap = no_overlap
[docs] def calculate_bin_coverage(self, bin):
"""
Take a single bin, return a matrix. This is passed to a multiprocessing Pool.
:param bin: Bin should be passed as "Chr19_4343343"
:return: pd.DataFrame with rows containing NaNs dropped
"""
# Get reads from bam file
parser = BamFileReadParser(self.input_bam_file, 20, self.mbias_read1_5, self.mbias_read1_3,
self.mbias_read2_5, self.mbias_read2_3, self.no_overlap)
# Split bin into parts
chromosome, bin_location = bin.split("_")
bin_location = int(bin_location)
try:
reads = parser.parse_reads(chromosome, bin_location-self.bin_size, bin_location)
matrix = parser.create_matrix(reads)
except BaseException as e:
# No reads are within this window, do nothing
self.bins_no_reads += 1
return None
except:
logging.error("Unknown error: {}".format(bin))
return None
# drop rows of ALL NaN
matrix = matrix.dropna(how="all")
# convert to data_frame of 1s and 0s, drop rows with NaN
matrix = matrix.dropna()
# if matrix is empty, attempt to create it with correction before giving up
if len(matrix) == 0:
original_matrix = matrix.copy()
reads = parser.correct_cpg_positions(reads)
try:
matrix = parser.create_matrix(reads)
except InvalidIndexError as e:
logging.error("Invalid Index error when creating matrices at bin {}".format(bin))
logging.debug(str(e))
return bin, original_matrix
except ValueError as e:
logging.error("Matrix concat error ar bin {}".format(bin))
logging.debug(str(e))
matrix = matrix.dropna()
if len(matrix) > 0:
logging.info("Correction attempt at bin {}: SUCCESS".format(bin))
else:
logging.info("Correction attempt at bin {}: FAILED".format(bin))
return bin, matrix
[docs] def get_chromosome_lengths(self):
"""
Get dictionary containing lengths of the chromosomes. Uses bam file for reference
:return: Dictionary of chromosome lengths, ex: {"chrX": 222222}
"""
parser = BamFileReadParser(self.input_bam_file, 20)
return dict(zip(parser.OpenBamFile.references, parser.OpenBamFile.lengths))
[docs] @staticmethod
def remove_scaffolds(chromosome_len_dict):
"""
Return a dict containing only the standard chromosomes starting with "chr"
:param chromosome_len_dict: A dict generated by get_chromosome_lenghts()
:return: a dict containing only chromosomes starting with "chr"
"""
new_dict = dict(chromosome_len_dict)
for key in chromosome_len_dict.keys():
if not key.startswith("chr"):
new_dict.pop(key)
return new_dict
[docs] def generate_bins_list(self, chromosome_len_dict):
"""
Get a dict of lists of all bins according to desired bin size for all chromosomes in the passed dict
:param chromosome_len_dict: A dict of chromosome length sizes from get_chromosome_lenghts, cleaned up by remove_scaffolds() if desired
:return: dict with each key being a chromosome. ex: chr1
"""
all_bins = defaultdict(list)
for key, value in chromosome_len_dict.items():
bins = list(np.arange(self.bin_size, value + self.bin_size, self.bin_size))
bins = ["_".join([key, str(x)]) for x in bins]
all_bins[key].extend(bins)
return all_bins
[docs] def analyze_bins(self, individual_chrom=None):
"""
Main function in class. Run the Complete analysis on the data
:param individual_chrom: Chromosome to analyze: ie "chr7"
:return: filename of the generated report
"""
# Track the progress of the multiprocessing and output
def track_progress(job, update_interval=60):
while job._number_left > 0:
logging.info("Tasks remaining = {0}".format(
job._number_left * job._chunksize))
time.sleep(update_interval)
# Get and clean dict of chromosome lenghts, convert to list of bins
chromosome_lengths = self.get_chromosome_lengths()
chromosome_lengths = self.remove_scaffolds(chromosome_lengths)
# If one chromosome was specified use only that chromosome
if individual_chrom:
new = dict()
new[individual_chrom] = chromosome_lengths[individual_chrom]
chromosome_lengths = new
bins_to_analyze = self.generate_bins_list(chromosome_lengths)
# Set up for multiprocessing
# Loop over bin dict and pool.map them individually
final_results = []
for key in bins_to_analyze.keys():
pool = Pool(processes=self.number_of_processors)
results = pool.map_async(self.calculate_bin_coverage, bins_to_analyze[key])
track_progress(results)
# once done, get results
results = results.get()
final_results.extend(results)
logging.info("Analysis complete")
# Write to output file
output_file = os.path.join(self.output_directory, "CompleteBins.{}.{}.csv".format(os.path.basename(self.input_bam_file), individual_chrom))
with open(output_file, "w") as out:
for result in final_results:
if result:
# bin
out.write(result[0] + ",")
# num of reads
out.write(str(result[1].shape[0]) + ",")
# num of CpGs
out.write(str(result[1].shape[1]) + "\n")
logging.info("Full read coverage analysis complete!")
return output_file