import pysam
import pandas as pd
from collections import defaultdict
import logging
import re
[docs]class BamFileReadParser:
"""
Used to simplify the opening and reading from BAM files. BAMs must be coordinate sorted and indexed.
:Example:
>>> from clubcpg.ParseBam import BamFileReadParser
>>> parser = BamFileReadParser("/path/to/data.BAM", quality_score=20, read1_5=3, read1_3=4, read2_5=7, read2_3=1)
>>> reads = parser.parse_reads("chr7", 10000, 101000)
>>> reads = parser.correct_cpg_positions(reads) # This step is optional, but highly recommended
>>> matrix = parser.create_matrix(reads)
"""
[docs] def __init__(self, bamfile, quality_score, read1_5=None, read1_3=None,
read2_5=None, read2_3=None, no_overlap=True):
"""
Class used to read WGBSeq reads from a BAM file, extract methylation, and convert into data frame
:param bamfile: Path to bam file location
:param quality_score: Only include reads >= this fastq quality
:param read1_5: mbias ignore read1 5'
:param read1_3: mbias ignore read1 3'
:param read2_5: mbias ignore read2 5'
:param read2_3: mbias ignore read2 3'
:param no_overlap: bool. If overlap exists between two reads, ignore that region from read 2.
"""
self.mapping_quality = quality_score
self.bamfile = bamfile
self.read1_5 = read1_5
self.read1_3 = read1_3
self.read2_5 = read2_5
self.read2_3 = read2_3
self.full_reads = []
self.read_cpgs = []
self.no_overlap = no_overlap
if read1_5 or read2_5 or read1_3 or read2_3:
self.mbias_filtering = True
else:
self.mbias_filtering = False
self.OpenBamFile = pysam.AlignmentFile(bamfile, 'rb')
# Check for presence of index file
index_present = self.OpenBamFile.check_index()
if not index_present:
raise FileNotFoundError("BAM file index is not found. Please create it using samtools index")
# From open bam file, get locaiton of first read from the provided chromosome
def get_location_of_first_read(self, chromosome):
# Get reference lenghts
ref_lens = dict(zip(self.OpenBamFile.references, self.OpenBamFile.lengths))
for read in self.OpenBamFile.fetch(chromosome, 0, ref_lens[chromosome]):
reads_start_loc = read.reference_start
break
return reads_start_loc
# Get reads from the bam file, extract methylation state
[docs] def parse_reads(self, chromosome: str, start:int , stop: int):
"""
:param chromosome: chromosome as "chr6"
:param start: start coordinate
:param stop: end coordinate
:return: List of reads and their positional tags as assigned by bismark
"""
reads = []
for read in self.OpenBamFile.fetch(chromosome, start, stop):
if read.mapping_quality >= self.mapping_quality:
reads.append(read)
## CIGAR FILTERING BY C. COARFA
read_cpgs = []
self.skipped_reads = set()
read_index = -1
self.query_count_hash = {}
for read in reads:
read_index +=1
if not (read.query_name in self.query_count_hash):
self.query_count_hash[read.query_name]=0
self.query_count_hash[read.query_name] += 1
# if (self.query_count_hash[read.query_name]>2):
# logging.info("Found read with more than 2 mappings: %s --> %s\n"%(read.query_name, self.query_count_hash[read.query_name]))
# need to check for regular expression though
no_indel_mapping = re.match("^\d+M$", read.cigarstring)
if (no_indel_mapping):
reduced_read = []
# Join EVERY XM tag with its aligned_pair location
for pair, tag in zip(read.get_aligned_pairs(), read.get_tag('XM')):
if pair[1]:
if read.flag == 83 or read.flag == 163 or read.flag == 16:
reduced_read.append((pair[1] - 1, tag))
else:
reduced_read.append((pair[1], tag))
else:
continue
# if MBIAS was set, slice the joined list
if self.mbias_filtering:
if read.is_read1:
mbias_5_prime = self.read1_5
# note taking the NEGATIVE of the value for the 3-prime
mbias_3_prime = -self.read1_3
if mbias_3_prime == 0:
mbias_3_prime = None
reduced_read = reduced_read[mbias_5_prime:mbias_3_prime]
if read.is_read2:
mbias_5_prime = self.read2_5
mbias_3_prime = -self.read2_3
if mbias_3_prime == 0:
mbias_3_prime = None
reduced_read = reduced_read[mbias_5_prime:mbias_3_prime]
read_cpgs.append(reduced_read)
else:
self.skipped_reads.add(read.query_name)
self.full_reads = reads
self.read_cpgs = read_cpgs
# Correct overlapping paired reads if set, this is default behavior
if self.no_overlap:
try:
read_cpgs = self.fix_read_overlap(reads, read_cpgs)
except AttributeError:
pass
# print("Could not determine read 1 or 2. {}:{}-{}".format(chromosome, start, stop))
# sys.stdout.flush()
# Filter the list for positions between start-stop and CpG (Z/z) tags
output = []
read_cpg_index = -1
found_cpg_count = 0
for read_cpg in read_cpgs:
read_cpg_index +=1
temp = []
for pos, tag in read_cpg:
if pos and (pos > start) and (pos <= stop) and ((tag == 'Z') or (tag == 'z')):
temp.append((pos, tag))
found_cpg_count += 1
output.append(temp)
return output
[docs] def create_matrix(self, read_cpgs):
"""
Converted parsed reads into a pandas dataframe.
:param read_cpgs: read CpGs generated by self.parse_reads
:type read_cpgs: iterable
:return: matrix methylated (1) and unmethylated (0) states
:rtype: pd.DataFrame
"""
series = []
data_index = -1
for data in read_cpgs:
data_index += 1
positions = []
statues = []
num_positions = 0
dup_flag = False
positions_set = set()
for pos, status in data:
num_positions += 1
if pos in positions_set:
dup_flag = True
positions_set.add(pos)
positions.append(pos)
statues.append(status)
if dup_flag:
pass
else:
if num_positions > 0:
series.append(pd.Series(statues, positions))
try:
matrix = pd.concat(series, axis=1, ignore_index=True)
except BaseException as e:
raise ValueError("Empty matrix")
matrix = matrix.replace('Z', 1)
matrix = matrix.replace('z', 0)
return matrix.T
[docs] def fix_read_overlap(self, full_reads, read_cpgs):
"""Takes pysam reads and read_cpgs generated during parse reads and removes any
overlap between read1 and read2. If possible it also stitches read1 and read2 together to create
a super read.
:param full_reads: set of reads generated by self.parse_reads()
:param read_cpgs: todoo
:return: A list in the same format as read_cpgs input, but corrected for paired read overlap
"""
# data for return
fixed_read_cpgs = []
# Combine raw reads and extracted tags
combined = []
for read, state in zip(full_reads, read_cpgs):
combined.append((read, state))
# Get names of all the reads present
# query_names = [x.query_name for x in full_reads]
query_names = []
for x in full_reads:
if (not (x.query_name in self.skipped_reads)) and (self.query_count_hash[x.query_name]<=2):
query_names.append(x.query_name)
# Match paired reads by query_name
tally = defaultdict(list)
for i, item in enumerate(query_names):
tally[item].append(i)
for key, value in sorted(tally.items()):
# A pair exists, process it
if len(value) == 2:
# Set read1 and read2 correctly
if combined[value[0]][0].is_read1:
read1 = combined[value[0]]
read2 = combined[value[1]]
elif combined[value[1]][0].is_read1:
read1 = combined[value[1]]
read2 = combined[value[0]]
# both reads have same value, this shouldn't be. Drop one completely, dont
# bother with overlap
elif combined[value[0]][0].is_read1 == combined[value[1]][0].is_read1:
fixed_read_cpgs.append(combined[value[0]][1])
continue
else:
raise AttributeError("Could not determine read 1 or read 2")
# Find amount of overlap
amount_overlap = 0
r1_bps = [x[0] for x in read1[1]]
r2_bps = [x[0] for x in read2[1]]
if min(r1_bps) < min(r2_bps):
trim_direction = 5
for bp in r2_bps:
if bp and bp in r1_bps:
amount_overlap += 1
else:
trim_direction = 3
for bp in r1_bps:
if bp and bp in r2_bps:
amount_overlap += 1
# remove the overlap by trimming or discarding
if amount_overlap == len(read2[1]):
# discard read 2, only append read 1
fixed_read_cpgs.append(read1[1])
else:
# trim overlap
if trim_direction == 5:
new_read2_cpgs = read2[1][amount_overlap:]
elif trim_direction == 3:
new_read2_cpgs = read2[1][:-amount_overlap]
# stitch together read1 and read2
read1[1].extend(new_read2_cpgs)
fixed_read_cpgs.append(read1[1])
elif len(value) == 1:
# No pair, add to output
fixed_read_cpgs.append(combined[value[0]][1])
return fixed_read_cpgs
[docs] @staticmethod
def correct_cpg_positions(output: list):
"""
For some reason, Bismark alignment produces instances where a CpG site location is incorrect by 1 bp, even
after accounting for DNA strand alignmment. This function fixes this. If two cpgs have positions such as 4, 5
(which is impossible because there needs to by a G between them) this function will convert all 5s to 4s. This
only needs to be applied to matrices which are empty after dropna() is called.
:param output: a list of lists of tuples. The output of self.parse_reads()
:return: list of the same style, execpt the first position in the tuple will have a corrected CpG position.
"""
# find all cpg positions
cpg_positions = []
for item in output:
if item:
for cpg in item:
cpg_positions.append(cpg[0])
cpg_positions = sorted(list(set(cpg_positions)))
# determine corrections
corrections = {}
for x in range(len(cpg_positions)):
try:
if cpg_positions[x + 1] == cpg_positions[x] + 1:
corrections[cpg_positions[x + 1]] = cpg_positions[x]
except IndexError: # end of cpg position list
pass
# correct items
corrected_output = []
for item in output:
corrected_item = []
if item:
for cpg in item:
if cpg[0] in corrections.keys():
new_cpg = (corrections[cpg[0]], cpg[1])
corrected_item.append(new_cpg)
else:
corrected_item.append(cpg)
corrected_output.append(corrected_item)
else:
corrected_output.append(item)
return corrected_output