diff --git a/DESCRIPTION b/DESCRIPTION index 8c41962..784b8f4 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -22,7 +22,6 @@ Config/testthat/edition: 3 License: BSD_3_clause + file LICENSE Encoding: UTF-8 Roxygen: list(markdown = TRUE) -RoxygenNote: 7.3.3 Depends: R (>= 4.5.0) URL: https://github.com/JRaviLab/amR_data @@ -48,3 +47,4 @@ Suggests: knitr, rmarkdown, testthat (>= 3.0.0) +Config/roxygen2/version: 8.0.0 diff --git a/NAMESPACE b/NAMESPACE index 786ac51..ff6d425 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -3,11 +3,15 @@ export(.extractAMRtable) export(.updateBVBRCdata) export(CDHIT2duckdb) +export(buildClusterFeatureMap) export(cleanData) export(cleanMetaData) export(prepareGenomes) +export(proteinAnnotations2Duckdb) export(retrieveGenomes) export(retrieveMetadata) export(runDataProcessing) export(runPanaroo2Duckdb) +import(DBI) +import(duckdb) importFrom(data.table,":=") diff --git a/R/runHMMER.R b/R/runHMMER.R new file mode 100644 index 0000000..6d77565 --- /dev/null +++ b/R/runHMMER.R @@ -0,0 +1,306 @@ +#' Write a data frame to a compressed Parquet file +#' +#' @param df A data frame or tibble to write. +#' @param path Output file path (`.parquet` extension). +#' +#' @keywords internal +.write_compressed_parquet <- function(df, path) { + arrow::write_parquet( + df, + path, + compression = "zstd", + compression_level = 9, + use_dictionary = TRUE + ) +} + +.runHMMER <- function(duckdb_path, + output_path, + threads = 8L, + database_path, + docker_image = "staphb/hmmer", + split_jobs = TRUE, + num_of_splits = 20L, + n_workers = 4L) { + # Fail fast if Docker is missing + if (!nzchar(Sys.which("docker"))) { + stop("Docker is not available on your PATH but is required to run HMMER.") + } + + duckdb_path <- .docker_path(duckdb_path) + if (missing(output_path) || output_path %in% c(".", "results", "results/")) { + output_path <- dirname(duckdb_path) + } + output_path <- .docker_path(output_path) + if (!dir.exists(output_path)) dir.create(output_path, recursive = TRUE) + + con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) + on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) + + prot_seqs <- DBI::dbReadTable(con, "protein_cluster_seq") |> + tibble::as_tibble() + + # derive a clean label from the database filename + database <- tools::file_path_sans_ext(basename(database_path)) + + # clamp splits to the number of sequences available + chunk_count <- min(as.integer(num_of_splits), nrow(prot_seqs)) + + split_fasta <- function(seqs, prefix) { + records <- paste0(">", seqs$name, "\n", seqs$sequence) + chunk_size <- ceiling(length(records) / chunk_count) + chunks <- split(records, ceiling(seq_along(records) / chunk_size)) + + purrr::walk2(chunks, seq_along(chunks), function(chunk, i) { + chunk_path <- file.path(output_path, sprintf("%s_chunk_%02d.fasta", prefix, i)) + readr::write_lines(chunk, chunk_path) + }) + } + + split_fasta(prot_seqs, "protein") + + job_list <- expand.grid( + chunk = sprintf("%02d", seq_len(chunk_count)), + db = database, + stringsAsFactors = FALSE + ) |> + dplyr::mutate( + JOB_NAME = paste0("protein_chunk_", chunk, "_", db), + FASTA = paste0("protein_chunk_", chunk, ".fasta"), + DB = db + ) |> + dplyr::select(JOB_NAME, FASTA, DB) + + .runHmmerJob <- function(JOB_NAME, FASTA, DB) { + hmmer_input <- file.path(output_path, FASTA) + hmmer_output <- file.path(output_path, paste0(JOB_NAME, ".tbl")) + + # database paths + db_host_dir <- dirname(database_path) + db_filename <- basename(database_path) + db_cont_dir <- "/opt/hmmer/data" + db_cont_path <- file.path(db_cont_dir, db_filename) + + # mounts + mount_host <- output_path + mount_cont <- "/work" + + cmd_args <- c( + "run", "--rm", + "-v", paste0(mount_host, ":", mount_cont), + "-v", paste0(db_host_dir, ":", db_cont_dir), + docker_image, + "hmmscan", + "--cpu", as.character(threads), + "--tblout", .to_container(hmmer_output, mount_host, mount_cont), + db_cont_path, + .to_container(hmmer_input, mount_host, mount_cont) + ) + + message("Running hmmscan via Docker...") + output <- tryCatch( + { + system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) + }, + error = function(e) { + stop("hmmscan execution failed: ", e$message) + } + ) + + if (!file.exists(hmmer_output)) { + stop("hmmscan failed: output file not found. Check stderr:\n", paste(output, collapse = "\n")) + } + + message("hmmscan completed successfully.") + + hmmer_tbl <- .parseHMMEROutput(hmmer_output) |> + dplyr::select("name", "query_name", "description") + + hmmer_tbl_filename <- file.path( + dirname(hmmer_output), + paste0(tools::file_path_sans_ext(basename(hmmer_output)), ".parquet") + ) + + .write_compressed_parquet(hmmer_tbl, hmmer_tbl_filename) + + hmmer_tbl_filename + } + + hmmer_param <- BiocParallel::SnowParam(workers = max(1L, n_workers)) + parquet_files <- BiocParallel::bpmapply( + FUN = .runHmmerJob, + JOB_NAME = job_list$JOB_NAME, + FASTA = job_list$FASTA, + DB = job_list$DB, + SIMPLIFY = TRUE, + USE.NAMES = FALSE, + BPPARAM = hmmer_param + ) + + final_parquet <- file.path(output_path, paste0("protein_", database, ".parquet")) + + purrr::map(parquet_files, arrow::read_parquet) |> + dplyr::bind_rows() |> + .write_compressed_parquet(final_parquet) + + message("Combined parquet written.") + + arrow::read_parquet(final_parquet) |> + DBI::dbWriteTable(conn = con, name = tools::file_path_sans_ext(basename(final_parquet)), overwrite = TRUE) +} + + +#' Parse HMMER tabular output into a tibble +#' +#' Reads a HMMER `--tblout` file and returns a tidy tibble with one row per +#' target-query hit. Comment lines are stripped and the free-text description +#' field is reunited from the remaining whitespace-delimited columns. +#' +#' @param file Path to a HMMER `.tbl` output file produced with `--tblout`. +#' +#' @return A tibble with 19 columns matching the HMMER per-sequence hit table: +#' `name`, `accession`, `query_name`, `query_accession`, `sequence_evalue`, +#' `sequence_score`, `sequence_bias`, `best_evalue`, `best_score`, +#' `best_bias`, `number_exp`, `number_reg`, `number_clu`, `number_ov`, +#' `number_env`, `number_dom`, `number_rep`, `number_inc`, `description`. +#' +#' @references Adapted from the rhmmer package +#' (). +#' +#' @examples +#' \dontrun{ +#' hits <- .parseHMMEROutput("results/Ecoli/protein_chunk_01_COG.tbl") +#' hits |> dplyr::filter(sequence_evalue < 1e-5) +#' } +#' +#' @keywords internal +.parseHMMEROutput <- function(file) { + col_types <- readr::cols( + name = readr::col_character(), + accession = readr::col_character(), + query_name = readr::col_character(), + query_accession = readr::col_character(), + sequence_evalue = readr::col_double(), + sequence_score = readr::col_double(), + sequence_bias = readr::col_double(), + best_evalue = readr::col_double(), + best_score = readr::col_double(), + best_bias = readr::col_double(), + number_exp = readr::col_double(), + number_reg = readr::col_integer(), + number_clu = readr::col_integer(), + number_ov = readr::col_integer(), + number_env = readr::col_integer(), + number_dom = readr::col_integer(), + number_rep = readr::col_integer(), + number_inc = readr::col_character(), + description = readr::col_character() + ) + # the line delimiter should always be just "\n", even on Windows + lines <- readr::read_lines(file, lazy = FALSE, progress = FALSE) + + # drop comment lines + data_lines <- lines[!grepl("^#", lines)] + + # split: whitespace-separated fields + split_fields <- strsplit(data_lines, "\\s+", perl = TRUE) + + # count space separated fields + N <- max(sapply(split_fields, length)) + + table <- sub( + pattern = sprintf("(%s).*", paste0(rep("\\S+", N), collapse = " +")), + replacement = "\\1", + x = lines, + perl = TRUE + ) |> + gsub(pattern = " *", replacement = "\t") |> + paste0(collapse = "\n") |> + readr::read_tsv( + col_names = names(col_types$cols), + comment = "#", + na = "-", + col_types = col_types, + lazy = FALSE, + progress = FALSE + ) |> + tidyr::unite(description, description:last_col(), sep = " ") + table$description <- gsub("\t", " ", table$description) + + table +} + +#' Map HMMER protein annotations to genome-level count matrix and load into DuckDB +#' +#' Reads a Parquet file of HMMER hits (produced by [.runHMMER()]), joins the +#' annotations to the protein-cluster count matrix already in DuckDB, aggregates +#' counts per genome and annotation, and writes the result both as a Parquet file +#' and as a new table in the DuckDB database. +#' +#' @param annotated_parquet Path to the combined HMMER results Parquet file +#' (e.g. `"results/Ecoli/protein_COG.parquet"`). The filename stem is used as +#' the table name in DuckDB. +#' @param duckdb_path Path to the per-selection DuckDB database containing a +#' `protein_count` table (created by [CDHIT2duckdb()]). +#' +#' @return Invisibly returns the path to the written count Parquet file. +#' +#' @seealso [CDHIT2duckdb()], [runDataProcessing()] +#' +#' @examples +#' \dontrun{ +#' proteinAnnotations2Duckdb( +#' annotated_parquet = "results/Ecoli/protein_COG.parquet", +#' duckdb_path = "data/Ecoli/Eco.duckdb" +#' ) +#' } +#' +#' @export +proteinAnnotations2Duckdb <- function(annotated_parquet, duckdb_path) { + annotated_parquet <- .docker_path(annotated_parquet) + duckdb_path <- .docker_path(duckdb_path) + + # derive table name from the annotation filename stem + database <- tools::file_path_sans_ext(basename(annotated_parquet)) + + con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) + on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) + + protein_long <- DBI::dbReadTable(con, "protein_count") |> + tibble::as_tibble() |> + tidyr::pivot_longer( + cols = -genome_id, + names_to = "query_name", + values_to = "count" + ) |> + dplyr::filter(count > 0) + + annotation <- arrow::read_parquet(annotated_parquet) + + genome_annot_matrix <- protein_long |> + # protein IDs are stored with "." separator in DuckDB but "|" in HMMER output + dplyr::mutate(query_name = stringr::str_replace(query_name, "^fig\\.", "fig|")) |> + dplyr::inner_join( + dplyr::select(annotation, name, query_name), + by = "query_name" + ) |> + dplyr::group_by(genome_id, name) |> + dplyr::summarise(count = sum(count), .groups = "drop") |> + tidyr::pivot_wider( + names_from = name, + values_from = count, + values_fill = 0 + ) + + count_path <- file.path(dirname(duckdb_path), paste0(database, "_count.parquet")) + arrow::write_parquet(genome_annot_matrix, count_path) + + DBI::dbWriteTable( + conn = con, + name = tools::file_path_sans_ext(basename(count_path)), + value = genome_annot_matrix, + overwrite = TRUE + ) + + invisible(count_path) +} diff --git a/man/buildClusterFeatureMap.Rd b/man/buildClusterFeatureMap.Rd new file mode 100644 index 0000000..7b56363 --- /dev/null +++ b/man/buildClusterFeatureMap.Rd @@ -0,0 +1,52 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/feature_to_cluster.R +\name{buildClusterFeatureMap} +\alias{buildClusterFeatureMap} +\title{Build a protein cluster-to-feature mapping using DuckDB} +\usage{ +buildClusterFeatureMap(duckdb_parquet_path, output_path = NULL) +} +\arguments{ +\item{duckdb_parquet_path}{Character. Path to the DuckDB database file with parquet file views +containing the input tables.} + +\item{output_path}{Character or NULL. Directory where the output Parquet file +(\code{cluster_feature.parquet}) will be written. If NULL, the output is written +alongside the DuckDB database.} +} +\value{ +Invisibly returns the file path to the generated Parquet file. +} +\description{ +This function constructs a mapping between protein clusters and functional +features (e.g., gene families, domains, COGs, and ARGs) using a DuckDB-backed +workflow. The implementation is SQL-first and memory-efficient, leveraging +DuckDB views and Parquet output. +} +\details{ +The function performs the following steps: +\itemize{ +\item Creates views for each feature type: +\itemize{ +\item Gene → protein features +\item Domain annotations +\item COG annotations +\item Antibiotic resistance gene (ARG) annotations +} +\item Combines all feature mappings into a unified protein–feature view +\item Joins protein–feature mappings to cluster membership +\item Writes the resulting cluster–feature mapping to a compressed Parquet file +} + +All joins and transformations are executed inside DuckDB, ensuring scalability +for large datasets without loading data into R memory. +} +\examples{ +\dontrun{ +buildClusterFeatureMap( + duckdb_parquet_path = "data/Csp_parquet.duckdb", + output_path = NULL +) +} + +} diff --git a/man/class_abbr.Rd b/man/class_abbr.Rd index 7863770..dda73de 100644 --- a/man/class_abbr.Rd +++ b/man/class_abbr.Rd @@ -11,7 +11,7 @@ A data frame Internal reference data } \usage{ -class_abbr +data(class_abbr) } \description{ A dataset mapping drug classes to their abbreviations. diff --git a/man/clean_drug.Rd b/man/clean_drug.Rd index fc4b485..809676c 100644 --- a/man/clean_drug.Rd +++ b/man/clean_drug.Rd @@ -11,7 +11,7 @@ A data frame Internal reference data } \usage{ -clean_drug +data(clean_drug) } \description{ A dataset mapping original drug names to cleaned/standardized names. diff --git a/man/cleaned_bvbrc_countries.Rd b/man/cleaned_bvbrc_countries.Rd index 4edf9aa..dfda707 100644 --- a/man/cleaned_bvbrc_countries.Rd +++ b/man/cleaned_bvbrc_countries.Rd @@ -11,7 +11,7 @@ A data frame Internal reference data } \usage{ -cleaned_bvbrc_countries +data(cleaned_bvbrc_countries) } \description{ A dataset mapping raw country entries to cleaned and standardized names. diff --git a/man/dot-generateDBname.Rd b/man/dot-generateDBname.Rd index 30ce51e..bebcf43 100644 --- a/man/dot-generateDBname.Rd +++ b/man/dot-generateDBname.Rd @@ -26,4 +26,5 @@ For numeric taxon IDs, it prefixes them with "tid_". .generateDBname(c("90371", "Bacillus subtilis")) .generateDBname(c("12345", "Escherichia coli", "Lactobacillus")) } -\keyword{internal} + +} diff --git a/man/dot-audit_gaps.Rd b/man/dot-missing_any.Rd similarity index 77% rename from man/dot-audit_gaps.Rd rename to man/dot-missing_any.Rd index 86009c0..b1b798f 100644 --- a/man/dot-audit_gaps.Rd +++ b/man/dot-missing_any.Rd @@ -1,10 +1,10 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/data_curation.R -\name{.audit_gaps} -\alias{.audit_gaps} +\name{.missing_any} +\alias{.missing_any} \title{Helps in auditing downloaded files to ensure everything's complete per ID} \usage{ -.audit_gaps(out_dir, ids, min_bytes = 100) +.missing_any(dir, genome_ids, min_bytes = 100) } \description{ Helps in auditing downloaded files to ensure everything's complete per ID diff --git a/man/dot-parseHMMEROutput.Rd b/man/dot-parseHMMEROutput.Rd new file mode 100644 index 0000000..fd5e8d9 --- /dev/null +++ b/man/dot-parseHMMEROutput.Rd @@ -0,0 +1,35 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/runHMMER.R +\name{.parseHMMEROutput} +\alias{.parseHMMEROutput} +\title{Parse HMMER tabular output into a tibble} +\usage{ +.parseHMMEROutput(file) +} +\arguments{ +\item{file}{Path to a HMMER \code{.tbl} output file produced with \code{--tblout}.} +} +\value{ +A tibble with 19 columns matching the HMMER per-sequence hit table: +\code{name}, \code{accession}, \code{query_name}, \code{query_accession}, \code{sequence_evalue}, +\code{sequence_score}, \code{sequence_bias}, \code{best_evalue}, \code{best_score}, +\code{best_bias}, \code{number_exp}, \code{number_reg}, \code{number_clu}, \code{number_ov}, +\code{number_env}, \code{number_dom}, \code{number_rep}, \code{number_inc}, \code{description}. +} +\description{ +Reads a HMMER \code{--tblout} file and returns a tidy tibble with one row per +target-query hit. Comment lines are stripped and the free-text description +field is reunited from the remaining whitespace-delimited columns. +} +\examples{ +\dontrun{ +hits <- .parseHMMEROutput("results/Ecoli/protein_chunk_01_COG.tbl") +hits |> dplyr::filter(sequence_evalue < 1e-5) +} + +} +\references{ +Adapted from the rhmmer package +(\url{https://github.com/arendsee/rhmmer}). +} +\keyword{internal} diff --git a/man/dot-write_compressed_parquet.Rd b/man/dot-write_compressed_parquet.Rd new file mode 100644 index 0000000..b9e45de --- /dev/null +++ b/man/dot-write_compressed_parquet.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/runHMMER.R +\name{.write_compressed_parquet} +\alias{.write_compressed_parquet} +\title{Write a data frame to a compressed Parquet file} +\usage{ +.write_compressed_parquet(df, path) +} +\arguments{ +\item{df}{A data frame or tibble to write.} + +\item{path}{Output file path (\code{.parquet} extension).} +} +\description{ +Write a data frame to a compressed Parquet file +} +\keyword{internal} diff --git a/man/drug_abbr.Rd b/man/drug_abbr.Rd index 39542f2..6912d4f 100644 --- a/man/drug_abbr.Rd +++ b/man/drug_abbr.Rd @@ -11,7 +11,7 @@ A data frame Internal reference data } \usage{ -drug_abbr +data(drug_abbr) } \description{ A dataset mapping drug names to their abbreviations. diff --git a/man/drug_class.Rd b/man/drug_class.Rd index 1cf412a..89e354a 100644 --- a/man/drug_class.Rd +++ b/man/drug_class.Rd @@ -11,7 +11,7 @@ A data frame Internal reference data } \usage{ -drug_class +data(drug_class) } \description{ A dataset mapping drugs to their drug classes. diff --git a/man/proteinAnnotations2Duckdb.Rd b/man/proteinAnnotations2Duckdb.Rd new file mode 100644 index 0000000..30c28c8 --- /dev/null +++ b/man/proteinAnnotations2Duckdb.Rd @@ -0,0 +1,37 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/runHMMER.R +\name{proteinAnnotations2Duckdb} +\alias{proteinAnnotations2Duckdb} +\title{Map HMMER protein annotations to genome-level count matrix and load into DuckDB} +\usage{ +proteinAnnotations2Duckdb(annotated_parquet, duckdb_path) +} +\arguments{ +\item{annotated_parquet}{Path to the combined HMMER results Parquet file +(e.g. \code{"results/Ecoli/protein_COG.parquet"}). The filename stem is used as +the table name in DuckDB.} + +\item{duckdb_path}{Path to the per-selection DuckDB database containing a +\code{protein_count} table (created by \code{\link[=CDHIT2duckdb]{CDHIT2duckdb()}}).} +} +\value{ +Invisibly returns the path to the written count Parquet file. +} +\description{ +Reads a Parquet file of HMMER hits (produced by \code{\link[=.runHMMER]{.runHMMER()}}), joins the +annotations to the protein-cluster count matrix already in DuckDB, aggregates +counts per genome and annotation, and writes the result both as a Parquet file +and as a new table in the DuckDB database. +} +\examples{ +\dontrun{ +proteinAnnotations2Duckdb( + annotated_parquet = "results/Ecoli/protein_COG.parquet", + duckdb_path = "data/Ecoli/Eco.duckdb" +) +} + +} +\seealso{ +\code{\link[=CDHIT2duckdb]{CDHIT2duckdb()}}, \code{\link[=runDataProcessing]{runDataProcessing()}} +}