Implementation of parallel de-/compression of BAM files.
BAM [1] files are used for storing alignments of reads to reference sequences. Since Next-Generation-Sequencing technologies create huge amounts of data, the data is compressed. The BAM format stores compressed blocks of data (each having the same length when uncompressed) using the the library zlib [2] (BGZF Format).
When processing such files, one common bottleneck is the decompression of the BAM file. Since the data is stored in blocks, each block can be decompressed independently of the others. This decompression can be performed in parallel.
<thread>
) should be used.
One aim is the later integration into the SeqAn library, i.e. usable from other programs using the existing interface.
This way, all existing applications using BAM I/O from the SeqAn library can benefit from the parallelization.
Technically, parallelization is to be added to the BGZF Stream class.
Requirements:
The following gives a breakdown of the task in a sensible fashion:
BgzfCompressor
and BgzfDecompressor
BgzfCompressor
and a BgzfDecompressor
class.
seqan::CharString
) for the input and a buffer for the output and in a run()
method they should do the compression/decompression.
BgzfCompressor
class should try to compress the input data in 64kb (64*1024 bytes) blocks as the current BGZF code in SeqAn does [4] and write the compressed blocks after each other. The output buffer should then be resized to fit the resulting size..
BgzfDecompressor
should take the input and assume that it is a concatenation of full BGZF blocks (a check can be performed and execution stopped if the input is invalid).
B
BGZF blocks (where B
is configurable before opening the files) can be seen as an atomic operation in the parallel program:
The operation cannot be split or only performed half-way.
You could use the following interface:
class BgzfCompressor { public: // Constructor. BgzfCompressor(unsigned maxBlockSize = 64*1024); // Compress from input to result. void run(seqan::CharString & result, seqan::CharString const & input); }; // BgzfDecompressor could have the same interface.
Write tests for these classes.
After this step, you will have the two classes and test for them. You could put them into a modulepar_bgzf
that you can create using util/bin/skel.py
and also use this script for creating the test structure. Create the module and tests in your sandbox.
Stream<ParallelBgzf>
Stream<Bgzf>
class into a Stream<ParallelBgzf>
class in your sandbox.
bgzf
tool by Heng Li probably would not work very well because of ambiguity in the files. Rather do compression/decompression of a text file and test for the result being equal. Tests against the current BgzfStream should also work well. There could be differences if some blocks do not compress well enough but that should be rare.
After this step, will have the new Stream class an tests for them.
Stream<ParallelBgzf>
in a Fork/Join Fashion n
be the number of threads.
buffers[0]
is the first String of buffers and buffer[1]
is the second one).
bufno
that gives the index (0 or 1) of the current entry of buffers
to read from/write to by the stream while the threads are working on buffer[bufno - 1]
.
buffers[0]
and buffers[1]
contain the decompressed data that is given to the Stream's caller/user when reading and the uncompressed data to compress and write to the file when writing.
buffer
has n
entries of type CharString
that serve as the target buffers.
BgzfDecompressionThread
and a BgzfCompressionThread
class.
CharString
buffer for the data from the file.
readAndDecompress
function that does the following: i=1..n
: Read B
blocks in the buffer for thread i
. (This is done sequentially).
i
writes the result to buffers[1 - bufno][i]
.
compressAndWrite
function that does the following: buffers[1 - bufno][i]
to its internal buffer.
i=1..n
sequentially.
buffers[bufno]
while the threads compress and write out buffers[bufno - 1]
(if it is filled) and vice versa on reading.
The class will need some synchronization primitives, and counters and probably also a flag whether buffers[bufno - 1]
is filled when at the beginning of reading/writing.
Useful synchronization primitives are mutex
(plus lock_guard
), condition_variable
.
A thread barrier class could be constructed using a mutex
and a condition_variable
.
There should be tests for this with larger files.
The original sequential Stream<Bgzf>
is probably correct so you can compress or decompress in parallel using your implementation and then decompress or compress and look at whether the result is the same as the input.
readRecord(…, Sam/Bam)
and writeRecord(…, Sam/Bam)
to write a parallel Bam ↔ Sam converter.
BAM can be sorted by (a) coordinate or (b) query name as follows:
Pair<int, int>
or (b) read name plus flag as Pair<char const *, int>
in a second buffer keys
keys
using std::stable_sort()
or std::sort()
. (Parallelizing this step is a no-brainer: The GCC STL provides a parallel std::sort()
implementation, the OMPTL library also offers a drop-in for std::sort()
).
keys
.
The hardest thing here is probably the handling of incomplete BAM records at the end of the file. Together with the parallel compression/decompression and the parallel sorting, this should make a simple and scalable BAM sorter.
Queue
data structure:
Consecutive numbers starting with 0
are given to each chunk of B
blocks read from the file or written by the user to the stream.
Each thread tries to grab the first available chunk, compresses/decompresses it and then puts it into the queue.
A special thread is responsible for filling the buffer in the stream that the user reads from or for writing out the compressed data to the file.
This thread knows which chunk to write out next and queries the queue for this.
The queue blocks this thread until the chunk is available.
The threads for doing the compression/decompression are blocked if the Queue is too full and cannot add their result to the queue.
The exception is that if a thread has a chunk whose index is smaller than the index of the largest chunk already in the queue then it is allowed to put it in the queue also.