Skip to content

Commit

Permalink
Basic parallelism machinery (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
sampsyo authored Dec 7, 2024
2 parents d952536 + 49dfe2e commit a84e029
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 21 deletions.
52 changes: 52 additions & 0 deletions flatgfa-py/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions flatgfa/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions flatgfa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bstr = "1.10.0"
memchr = "2.7.4"
memmap = "0.7.0"
num_enum = "0.7.3"
rayon = "1.10.0"
tinyvec = "1.8.0"
zerocopy = { version = "0.7.35", features = ["derive"] }

Expand Down
77 changes: 62 additions & 15 deletions flatgfa/src/cmds.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::flatgfa::{self, Handle, Link, Orientation, Path, Segment};
use crate::memfile;
use crate::pool::{self, Id, Span, Store};
use crate::{GFAStore, HeapFamily};
use argh::FromArgs;
use rayon::iter::ParallelIterator;
use std::collections::{HashMap, HashSet};

/// print the FlatGFA table of contents
Expand Down Expand Up @@ -136,6 +138,33 @@ pub fn position(gfa: &flatgfa::FlatGFA, args: Position) -> Result<(), &'static s
Ok(())
}

/// benchmarks
#[derive(FromArgs, PartialEq, Debug)]
#[argh(subcommand, name = "bench")]
pub struct Bench {
/// count lines in a text file
#[argh(option)]
wcl: Option<String>,

/// enable parallelism when available
#[argh(switch, short = 'p')]
parallel: bool,
}

pub fn bench(args: Bench) {
// TODO: We don't need a GFA for (some of) these? So avoid opening it.
if let Some(filename) = args.wcl {
let buf = memfile::map_file(&filename);
let split = memfile::MemchrSplit::new(b'\n', &buf);
let count = if args.parallel {
ParallelIterator::count(split)
} else {
Iterator::count(split)
};
println!("{}", count);
}
}

/// create a subset graph
#[derive(FromArgs, PartialEq, Debug)]
#[argh(subcommand, name = "extract")]
Expand All @@ -149,12 +178,17 @@ pub struct Extract {
link_distance: usize,

/// maximum number of basepairs allowed between subpaths s.t. the subpaths are merged together
#[argh(option, short = 'd', long = "max-distance-subpaths", default = "300000")]
#[argh(
option,
short = 'd',
long = "max-distance-subpaths",
default = "300000"
)]
max_distance_subpaths: usize, // TODO: possibly make this bigger

/// maximum number of iterations before we stop merging subpaths
#[argh(option, short = 'e', long = "max-merging-iterations", default = "6")]
num_iterations: usize // TODO: probably make this smaller
num_iterations: usize, // TODO: probably make this smaller
}

pub fn extract(
Expand All @@ -165,7 +199,12 @@ pub fn extract(

let mut subgraph = SubgraphBuilder::new(gfa);
subgraph.add_header();
subgraph.extract(origin_seg, args.link_distance, args.max_distance_subpaths, args.num_iterations);
subgraph.extract(
origin_seg,
args.link_distance,
args.max_distance_subpaths,
args.num_iterations,
);
Ok(subgraph.store)
}

Expand All @@ -192,10 +231,10 @@ impl<'a> SubgraphBuilder<'a> {

/// Include the old graph's header
fn add_header(&mut self) {
// pub fn add_header(&mut self, version: &[u8]) {
// assert!(self.header.as_ref().is_empty());
// self.header.add_slice(version);
// }
// pub fn add_header(&mut self, version: &[u8]) {
// assert!(self.header.as_ref().is_empty());
// self.header.add_slice(version);
// }
assert!(self.store.header.as_ref().is_empty());
self.store.header.add_slice(self.old.header.all());
}
Expand Down Expand Up @@ -242,7 +281,10 @@ impl<'a> SubgraphBuilder<'a> {
// We just entered the subgraph. End the current subpath.
if !ignore_path && subpath_length <= max_distance_subpaths {
// TODO: type safety
let subpath_span = Span::new(path.steps.start + *start as u32, path.steps.start + idx as u32);
let subpath_span = Span::new(
path.steps.start + *start as u32,
path.steps.start + idx as u32,
);
for step in &self.old.steps[subpath_span] {
if !self.seg_map.contains_key(&step.segment()) {
self.include_seg(step.segment());
Expand All @@ -254,7 +296,7 @@ impl<'a> SubgraphBuilder<'a> {
} else if let (None, false) = (&cur_subpath_start, in_neighb) {
// We've exited the current subgraph, start a new subpath
cur_subpath_start = Some(idx);
}
}

// Track the current bp position in the path.
subpath_length += self.old.get_handle_seg(*step).len();
Expand Down Expand Up @@ -313,7 +355,13 @@ impl<'a> SubgraphBuilder<'a> {
///
/// Include any links between the segments in the neighborhood and subpaths crossing
/// through the neighborhood.
fn extract(&mut self, origin: Id<Segment>, dist: usize, max_distance_subpaths: usize, num_iterations: usize) {
fn extract(
&mut self,
origin: Id<Segment>,
dist: usize,
max_distance_subpaths: usize,
num_iterations: usize,
) {
self.include_seg(origin);

// Find the set of all segments that are c links away.
Expand Down Expand Up @@ -412,10 +460,9 @@ pub fn chop<'a>(
gfa: &'a flatgfa::FlatGFA<'a>,
args: Chop,
) -> Result<flatgfa::HeapGFAStore, &'static str> {
let mut flat = flatgfa::HeapGFAStore::default();

let mut flat = flatgfa::HeapGFAStore::default();

// when segment S is chopped into segments S1 through S2 (exclusive),
// when segment S is chopped into segments S1 through S2 (exclusive),
// seg_map[S.name] = Span(Id(S1.name), Id(S2.name)). If S is not chopped: S=S1, S2.name = S1.name+1
let mut seg_map: Vec<Span<Segment>> = Vec::new();
// The smallest id (>0) which does not already belong to a segment in `flat`
Expand Down Expand Up @@ -448,14 +495,14 @@ pub fn chop<'a>(
let mut offset = seg.seq.start.index();
let segs_start = flat.segs.next_id();
// Could also generate end_id by setting it equal to the start_id and
// updating it for each segment that is added - only benefits us if we
// updating it for each segment that is added - only benefits us if we
// don't unroll the last iteration of this loop
while offset < seq_end.index() - args.c {
// Generate a new segment of length c
flat.segs.add(Segment {
name: max_node_id,
seq: Span::new(Id::new(offset), Id::new(offset + args.c)),
optional: Span::new_empty()
optional: Span::new_empty(),
});
offset += args.c;
max_node_id += 1;
Expand Down
4 changes: 4 additions & 0 deletions flatgfa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum Command {
Depth(cmds::Depth),
Chop(cmds::Chop),
GafLookup(gaf::GAFLookup),
Bench(cmds::Bench),
}

fn main() -> Result<(), &'static str> {
Expand Down Expand Up @@ -132,6 +133,9 @@ fn main() -> Result<(), &'static str> {
Some(Command::GafLookup(sub_args)) => {
gaf::gaf_lookup(&gfa, sub_args);
}
Some(Command::Bench(sub_args)) => {
cmds::bench(sub_args);
}
None => {
// Just emit the GFA or FlatGFA file.
dump(&gfa, &args.output);
Expand Down
Loading

0 comments on commit a84e029

Please sign in to comment.