Source code for atomworks.ml.preprocessing.utils.fasta
from os import PathLike
from pathlib import Path
import pandas as pd
from atomworks.ml.utils.misc import hash_sequence
[docs]
def wrap_sequence(sequence: str, line_length: int = 80) -> str:
"""Wrap a sequence string to a specified line length.
Args:
sequence (str): The sequence string to wrap.
line_length (int): The maximum line length. Default is 80.
Returns:
str: The wrapped sequence string.
"""
return "\n".join(sequence[i : i + line_length] for i in range(0, len(sequence), line_length))
[docs]
def create_fasta_file_from_df(
pn_units_df: PathLike | str | pd.DataFrame, sequence_col_name: str, output_path: PathLike | str
) -> None:
"""Create a FASTA file from sequences stored as a dataframe in a Parquet file.
Args:
pn_units_df (pd.DataFrame | PathLike | str): Dataframe, as a path Parquet or object directly, containing a column with the sequences to be clustered.
sequence_col_name (str): The name of the column containing the canonical sequences to be clustered.
output_path (PathLike | str): Path to where the fasta file will be saved. Must end in .fasta extension.
"""
# Load the pn_unit_df, if it is not already a DataFrame
if not isinstance(pn_units_df, pd.DataFrame):
df = pd.read_parquet(pn_units_df)
else:
df = pn_units_df
# Remove rows where the sequence is not given
df = df[df[sequence_col_name].notnull()]
# Remove rows where the sequence is all unknown ("X")
df = df[df[sequence_col_name].apply(lambda x: not all(char == "X" for char in x))]
# Create output directory if it does not exist
output_path = Path(output_path)
if output_path.suffix != ".fasta":
raise ValueError("Output path must end in .fasta")
output_path.parent.mkdir(parents=True, exist_ok=True)
# Write all sequences to FASTA file, de-duplicating as we go
seen_protein_hashes = set()
with open(output_path, "w") as output_fasta_file:
for sequence in df[sequence_col_name]:
sequence_hash = hash_sequence(sequence)
# Skip if we have already seen this sequence
if sequence_hash in seen_protein_hashes:
continue
wrapped_sequence = wrap_sequence(sequence)
output_fasta_file.write(f">{sequence_hash}\n{wrapped_sequence}\n")
seen_protein_hashes.add(sequence_hash)