1#ifndef BTLLIB_INDEXLR_HPP
2#define BTLLIB_INDEXLR_HPP
4#include "btllib/bloom_filter.hpp"
5#include "btllib/nthash.hpp"
6#include "btllib/order_queue.hpp"
7#include "btllib/seq_reader.hpp"
8#include "btllib/status.hpp"
9#include "btllib/util.hpp"
36 static const unsigned NO_ID = 1;
38 static const unsigned BX = 2;
40 static const unsigned SEQ = 4;
54 static const unsigned QUAL = 128;
57 bool output_id()
const {
return bool(~flags &
Flag::NO_ID); }
58 bool output_bx()
const {
return bool(flags &
Flag::BX); }
59 bool output_seq()
const {
return bool(flags &
Flag::SEQ); }
60 bool output_qual()
const {
return bool(flags &
Flag::QUAL); }
94 , qual(std::move(qual))
102 std::vector<uint64_t> all_hashes)
107 , seq(std::move(seq))
108 , all_hashes(std::move(all_hashes))
118 std::vector<uint64_t> all_hashes)
123 , seq(std::move(seq))
124 , qual(std::move(qual))
125 , all_hashes(std::move(all_hashes))
129 uint64_t min_hash = 0, out_hash = 0;
131 bool forward =
false;
134 std::vector<uint64_t> all_hashes;
147 std::vector<Minimizer> minimizers)
150 , barcode(std::move(barcode))
152 , minimizers(std::move(minimizers))
160 std::vector<Minimizer> minimizers;
162 operator bool()
const
164 return !
id.empty() || !barcode.empty() || !minimizers.empty();
197 unsigned threads = 5,
198 bool verbose =
false,
207 unsigned threads = 5,
208 bool verbose =
false,
214 void close() noexcept;
216 static const
size_t MAX_SIMULTANEOUS_INDEXLRS = 256;
223 void operator++() { record = indexlr.read(); }
224 bool operator!=(
const RecordIterator& i)
226 return bool(record) || bool(i.record);
228 Record operator*() {
return std::move(record); }
232 auto val = operator*();
240 RecordIterator(
Indexlr& indexlr,
bool end)
253 RecordIterator
begin() {
return RecordIterator(*
this,
false); }
254 RecordIterator end() {
return RecordIterator(*
this,
true); }
257 static std::string extract_barcode(
const std::string&
id,
258 const std::string& comment);
259 static void filter_hashed_kmer(Indexlr::HashedKmer& hk,
262 const BloomFilter& filter_in_bf,
263 const BloomFilter& filter_out_bf);
265 static void filter_kmer_qual(Indexlr::HashedKmer& hk,
266 const std::string& kmer_qual,
268 static size_t calc_kmer_quality(
const std::string& qual);
270 static void calc_minimizer(
271 const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
272 const Indexlr::Minimizer*& min_current,
274 ssize_t& min_idx_left,
275 ssize_t& min_idx_right,
276 ssize_t& min_pos_prev,
278 std::vector<Indexlr::Minimizer>& minimizers);
279 std::vector<Minimizer> minimize(
const std::string& seq,
280 const std::string& qual)
const;
282 const std::string seqfile;
285 const unsigned flags;
288 std::atomic<bool> closed{
false };
290 static const BloomFilter& dummy_bf()
292 static const BloomFilter var;
296 const std::reference_wrapper<const BloomFilter> filter_in_bf;
297 const std::reference_wrapper<const BloomFilter> filter_out_bf;
298 bool filter_in_enabled;
299 bool filter_out_enabled;
302 OrderQueueMPSC<Record> output_queue;
304 using OutputQueueType =
decltype(output_queue);
305 static std::unique_ptr<OutputQueueType::Block>* ready_blocks_array()
307 thread_local static std::unique_ptr<
decltype(output_queue)::Block>
308 var[MAX_SIMULTANEOUS_INDEXLRS];
312 static long* ready_blocks_owners()
314 thread_local static long var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
318 static size_t* ready_blocks_current()
320 thread_local static size_t var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
324 static std::atomic<long>& last_id()
326 static std::atomic<long> var(0);
333 void start() { t = std::thread(do_work,
this); }
334 void join() { t.join(); }
335 void set_id(
const int id) { this->
id = id; }
337 Worker& operator=(
const Worker& worker) =
delete;
338 Worker& operator=(Worker&& worker) =
delete;
344 Worker(
const Worker& worker)
345 : Worker(worker.indexlr)
348 Worker(Worker&& worker) noexcept
349 : Worker(worker.indexlr)
355 static void do_work(Worker* worker) { worker->work(); }
362 std::vector<Worker> workers;
364 std::mutex last_block_num_mutex;
365 uint64_t last_block_num = 0;
366 bool last_block_num_valid =
false;
374 const unsigned flags,
375 const unsigned threads,
377 const BloomFilter& bf1,
378 const BloomFilter& bf2)
379 : seqfile(std::move(seqfile))
386 , filter_in_bf(filter_in() ? bf1 : Indexlr::dummy_bf())
387 , filter_out_bf(filter_out() ? filter_in() ? bf2 : bf1 : Indexlr::dummy_bf())
388 , filter_in_enabled(filter_in())
389 , filter_out_enabled(filter_out())
390 , reader(this->seqfile,
391 short_mode() ? SeqReader::Flag::SHORT_MODE
392 : SeqReader::Flag::LONG_MODE)
393 , output_queue(reader.get_buffer_size(), reader.get_block_size())
394 , workers(std::vector<Worker>(threads, Worker(*this)))
395 , end_barrier(threads)
398 "Indexlr: no mode selected, either short or long mode flag must "
401 "Indexlr: short and long mode are mutually exclusive.");
403 "Indexlr: Number of processing threads cannot be 0.");
405 for (
auto& worker : workers) {
406 worker.set_id(id_counter++);
415 const unsigned flags,
416 const unsigned threads,
420 :
Indexlr(std::move(seqfile), k, w, 0, flags, threads, verbose, bf1, bf2)
424inline Indexlr::~Indexlr()
430Indexlr::close() noexcept
432 bool closed_expected =
false;
433 if (closed.compare_exchange_strong(closed_expected,
true)) {
436 output_queue.close();
437 for (
auto& worker : workers) {
440 }
catch (
const std::system_error& e) {
441 log_error(
"Indexlr thread join failure: " + std::string(e.what()));
442 std::exit(EXIT_FAILURE);
475Indexlr::extract_barcode(
const std::string&
id,
const std::string& comment)
477 const static std::string barcode_prefix =
"BX:Z:";
479 const auto space_pos = comment.find(
' ');
480 if (space_pos != std::string::npos) {
481 return comment.substr(barcode_prefix.size(),
482 space_pos - barcode_prefix.size());
484 return comment.substr(barcode_prefix.size());
486 const auto pound_pos =
id.find(
'#');
487 if (pound_pos != std::string::npos) {
488 const auto slash_pos =
id.find(
'/');
489 if (slash_pos > pound_pos) {
490 return id.substr(pound_pos + 1, slash_pos - (pound_pos + 1));
497Indexlr::filter_hashed_kmer(Indexlr::HashedKmer& hk,
500 const BloomFilter& filter_in_bf,
501 const BloomFilter& filter_out_bf)
503 if (!filter_in && !filter_out) {
507 bool use_all = (filter_in ? filter_in_bf.get_hash_num()
508 : filter_out_bf.get_hash_num()) > 1;
510 auto check_contains = [&](
const BloomFilter& bf) {
512 return bf.contains(hk.all_hashes);
514 return bf.contains({ hk.min_hash });
518 if (filter_in && filter_out) {
519 if (!check_contains(filter_in_bf) || check_contains(filter_out_bf)) {
520 hk.min_hash = std::numeric_limits<uint64_t>::max();
522 }
else if (filter_in) {
523 if (!check_contains(filter_in_bf)) {
524 hk.min_hash = std::numeric_limits<uint64_t>::max();
526 }
else if (filter_out) {
527 if (check_contains(filter_out_bf)) {
528 hk.min_hash = std::numeric_limits<uint64_t>::max();
534Indexlr::filter_kmer_qual(Indexlr::HashedKmer& hk,
535 const std::string& kmer_qual,
538 if (calc_kmer_quality(kmer_qual) < q) {
539 hk.min_hash = std::numeric_limits<uint64_t>::max();
544Indexlr::calc_kmer_quality(
const std::string& qual)
547 std::vector<int> qual_ints;
548 const int thirty_three = 33;
549 qual_ints.reserve(qual.size());
550 for (
auto c : qual) {
551 qual_ints.push_back(c - thirty_three);
555 for (
auto q : qual_ints) {
558 return (sum / qual_ints.size());
562Indexlr::calc_minimizer(
563 const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
564 const Indexlr::Minimizer*& min_current,
566 ssize_t& min_idx_left,
567 ssize_t& min_idx_right,
568 ssize_t& min_pos_prev,
570 std::vector<Indexlr::Minimizer>& minimizers)
572 min_idx_left = ssize_t(idx + 1 - w);
573 min_idx_right = ssize_t(idx + 1);
574 const auto& min_left =
575 hashed_kmers_buffer[min_idx_left % hashed_kmers_buffer.size()];
576 const auto& min_right =
577 hashed_kmers_buffer[(min_idx_right - 1) % hashed_kmers_buffer.size()];
579 if (min_current ==
nullptr || min_current->pos < min_left.pos) {
580 min_current = &min_left;
582 for (ssize_t i = min_idx_left; i < min_idx_right; i++) {
583 const auto& min_i = hashed_kmers_buffer[i % hashed_kmers_buffer.size()];
584 if (min_i.min_hash <= min_current->min_hash) {
585 min_current = &min_i;
588 }
else if (min_right.min_hash <= min_current->min_hash) {
589 min_current = &min_right;
591 if (ssize_t(min_current->pos) > min_pos_prev &&
592 min_current->min_hash != std::numeric_limits<uint64_t>::max()) {
593 min_pos_prev = ssize_t(min_current->pos);
594 minimizers.push_back(*min_current);
598inline std::vector<Indexlr::Minimizer>
599Indexlr::minimize(
const std::string& seq,
const std::string& qual)
const
601 if ((k > seq.size()) || (w > seq.size() - k + 1)) {
604 std::vector<Minimizer> minimizers;
605 minimizers.reserve(2 * (seq.size() - k + 1) / w);
606 std::vector<HashedKmer> hashed_kmers_buffer(w + 1);
607 ssize_t min_idx_left, min_idx_right, min_pos_prev = -1;
608 const Minimizer* min_current =
nullptr;
610 size_t bf_num_hashes = 0;
611 if (!filter_in() && !filter_out()) {
613 }
else if (filter_in()) {
614 bf_num_hashes = filter_in_bf.get().get_hash_num();
616 bf_num_hashes = filter_out_bf.get().get_hash_num();
619 size_t nthash_num_hashes = std::max(
static_cast<size_t>(2), bf_num_hashes);
621 for (NtHash nh(seq, nthash_num_hashes, k); nh.roll(); ++idx) {
622 auto& hk = hashed_kmers_buffer[idx % hashed_kmers_buffer.size()];
623 if (bf_num_hashes == 1) {
624 hk = HashedKmer(nh.hashes()[0],
627 nh.get_forward_hash() <= nh.get_reverse_hash(),
628 output_seq() ? seq.substr(nh.get_pos(), k) :
"",
629 output_qual() ? qual.substr(nh.get_pos(), k) :
"");
636 nh.get_forward_hash() <= nh.get_reverse_hash(),
637 output_seq() ? seq.substr(nh.get_pos(), k) :
"",
638 output_qual() ? qual.substr(nh.get_pos(), k) :
"",
639 std::vector<uint64_t>(nh.hashes(), nh.hashes() + bf_num_hashes));
643 hk, filter_in(), filter_out(), filter_in_bf.get(), filter_out_bf.get());
646 filter_kmer_qual(hk, qual.substr(nh.get_pos(), k), q);
650 calc_minimizer(hashed_kmers_buffer,
663inline Indexlr::Record
666 if (ready_blocks_owners()[
id % MAX_SIMULTANEOUS_INDEXLRS] !=
id) {
667 ready_blocks_array()[
id % MAX_SIMULTANEOUS_INDEXLRS] =
669 decltype(output_queue)::Block>(
670 new decltype(output_queue)::Block(reader.get_block_size()));
671 ready_blocks_owners()[
id % MAX_SIMULTANEOUS_INDEXLRS] = id;
672 ready_blocks_current()[
id % MAX_SIMULTANEOUS_INDEXLRS] = 0;
674 auto& block = *(ready_blocks_array()[
id % MAX_SIMULTANEOUS_INDEXLRS]);
675 auto& current = ready_blocks_current()[
id % MAX_SIMULTANEOUS_INDEXLRS];
676 if (current >= block.count) {
678 output_queue.read(block);
679 if (block.count == 0) {
680 output_queue.close();
681 block =
decltype(output_queue)::Block(reader.get_block_size());
686 return std::move(block.data[current++]);
690Indexlr::Worker::work()
692 decltype(indexlr.output_queue)::Block output_block(
693 indexlr.reader.get_block_size());
694 uint64_t last_block_num = 0;
695 bool last_block_num_valid =
false;
697 auto input_block = indexlr.reader.
read_block();
698 if (input_block.count == 0) {
702 output_block.num = input_block.num;
703 for (
size_t idx = 0; idx < input_block.count; idx++) {
705 auto& reader_record = input_block.data[idx];
706 record.num = reader_record.num;
707 if (indexlr.output_bx()) {
709 indexlr.extract_barcode(reader_record.id, reader_record.comment);
711 if (indexlr.output_id()) {
712 record.id = std::move(reader_record.id);
715 record.readlen = reader_record.seq.size();
717 check_info(indexlr.verbose && indexlr.k > record.readlen,
718 "Indexlr: skipped seq " + std::to_string(record.num) +
720 std::to_string(record.num * (indexlr.reader.get_format() ==
721 SeqReader::Format::FASTA
725 "; k (" + std::to_string(indexlr.k) +
") > seq length (" +
726 std::to_string(record.readlen) +
")");
728 check_info(indexlr.verbose && indexlr.w > record.readlen - indexlr.k + 1,
729 "Indexlr: skipped seq " + std::to_string(record.num) +
731 std::to_string(record.num * (indexlr.reader.get_format() ==
732 SeqReader::Format::FASTA
736 "; w (" + std::to_string(indexlr.w) +
") > # of hashes (" +
737 std::to_string(record.readlen - indexlr.k + 1) +
")");
739 if (indexlr.k <= record.readlen &&
740 indexlr.w <= record.readlen - indexlr.k + 1) {
742 indexlr.minimize(reader_record.seq, reader_record.qual);
744 record.minimizers = {};
747 output_block.data[output_block.count++] = std::move(record);
749 if (output_block.count > 0) {
750 last_block_num = output_block.num;
751 last_block_num_valid =
true;
752 indexlr.output_queue.write(output_block);
753 output_block.count = 0;
756 if (last_block_num_valid) {
757 std::unique_lock<std::mutex> lock(indexlr.last_block_num_mutex);
758 indexlr.last_block_num = std::max(indexlr.last_block_num, last_block_num);
759 indexlr.last_block_num_valid =
true;
762 indexlr.end_barrier.wait();
763 if (last_block_num_valid && indexlr.last_block_num_valid &&
764 last_block_num == indexlr.last_block_num) {
765 output_block.num = last_block_num + 1;
766 indexlr.output_queue.write(output_block);
767 }
else if (!indexlr.last_block_num_valid &&
id == 0) {
768 output_block.num = 0;
769 indexlr.output_queue.write(output_block);
Definition bloom_filter.hpp:73
Definition indexlr.hpp:26
Indexlr(std::string seqfile, size_t k, size_t w, unsigned flags=0, unsigned threads=5, bool verbose=false, const btllib::BloomFilter &bf1=Indexlr::dummy_bf(), const btllib::BloomFilter &bf2=Indexlr::dummy_bf())
Definition indexlr.hpp:412
RecordIterator begin()
Definition indexlr.hpp:253
Record read()
Definition indexlr.hpp:664
OrderQueueMPMC< Record >::Block read_block()
void check_error(bool condition, const std::string &msg)
void log_error(const std::string &msg)
bool startswith(std::string s, std::string prefix)
void check_info(bool condition, const std::string &msg)
Definition indexlr.hpp:34
static const unsigned BX
Definition indexlr.hpp:38
static const unsigned SEQ
Definition indexlr.hpp:40
static const unsigned LONG_MODE
Definition indexlr.hpp:52
static const unsigned QUAL
Definition indexlr.hpp:54
static const unsigned FILTER_IN
Definition indexlr.hpp:43
static const unsigned FILTER_OUT
Definition indexlr.hpp:48
static const unsigned NO_ID
Definition indexlr.hpp:36
static const unsigned SHORT_MODE
Definition indexlr.hpp:50
Definition indexlr.hpp:67
Definition indexlr.hpp:140