Skip to content

Bakta

annotate_aa(aas)

Combines IPS and PSC annotations and marks hypotheticals.

Parameters:

Name Type Description Default
aas Sequence[dict]

A sequence of amino acid dictionaries to annotate.

required

Returns:

Type Description

None

Examples:

>>> aas = [{'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}]
>>> annotate_aa(aas)
>>> aas
[{'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}]
Source code in src/baktfold/bakta/annotation.py
def annotate_aa(aas: Sequence[dict]):
    """
    Combines IPS and PSC annotations and marks hypotheticals.

    Args:
      aas (Sequence[dict]): A sequence of amino acid dictionaries to annotate.

    Returns:
      None

    Examples:
      >>> aas = [{'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}]
      >>> annotate_aa(aas)
      >>> aas
      [{'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}]
    """

    print('\tcombine annotations and mark hypotheticals...')

    for aa in aas:
        print(aa)
        combine_annotation(aa)  # combine IPS & PSC annotations and mark hypothetical
    logger.debug('analyze hypotheticals')
    hypotheticals = [aa for aa in aas if 'hypothetical' in aa]
    if(len(hypotheticals) > 0):
        print(f'\tanalyze hypothetical proteins: {len(hypotheticals)}')
        print('\tcalculated proteins statistics')

calc_annotation_score(orf)

Calculates the annotation score for a given ORF.

Parameters:

Name Type Description Default
orf dict

The ORF to calculate the annotation score for.

required

Returns:

Name Type Description
int int

The annotation score for the given ORF.

Examples:

>>> calc_annotation_score(orf)
Source code in src/baktfold/bakta/annotation.py
def calc_annotation_score(orf:dict) -> int:
    """
    Calculates the annotation score for a given ORF.

    Args:
      orf (dict): The ORF to calculate the annotation score for.

    Returns:
      int: The annotation score for the given ORF.

    Examples:
      >>> calc_annotation_score(orf)
    """
    score = 0
    if(orf.get('gene', None)):
        score += 1
    if(orf.get('product', None)):
        score += 1
    return score

combine_annotation(feature, fast)

Combines annotation information from different sources into a single feature.

Parameters:

Name Type Description Default
feature dict

The feature to combine annotation for.

required
fast bool

If True, skips AFDB

required

Returns:

Type Description

None

Examples:

>>> combine_annotation(feature)
Source code in src/baktfold/bakta/annotation.py
def combine_annotation(feature: dict, fast: bool):
    """
    Combines annotation information from different sources into a single feature.

    Args:
      feature (dict): The feature to combine annotation for.
      fast (bool): If True, skips AFDB
    Returns:
      None

    Examples:
      >>> combine_annotation(feature)
    """


    # ups = feature.get('ups', None)
    # ips = feature.get('ips', None)
    # psc = feature.get('psc', None)
    # pscc = feature.get('pscc', None)
    pstc = feature.get('pstc', None)
    # expert_hits = feature.get('expert', [])

    # gene = None
    # genes = set()
    # product = None

    product = feature.get('product', None)
    db_xrefs = feature.get('db_xrefs', [])

    if(pstc):

        # Always normalize pstc to a list
        if isinstance(pstc, dict):
            pstc = [pstc]
        elif isinstance(pstc, str):
            pstc = [pstc]

        # afdb
        afdb_entry = None if fast else next(
            (p for p in pstc if isinstance(p, dict) and p.get('source') == 'afdb'),
            None
        )
        # swissprot
        swissprot_entry = next((p for p in pstc if isinstance(p, dict) and p.get('source') == 'swissprot'), None)
        # pdb
        pdb_entry = next((p for p in pstc if isinstance(p, dict) and p.get('source') == 'pdb'), None)
        # cath
        cath_entry = next((p for p in pstc if isinstance(p, dict) and p.get('source') == 'cath'), None)
        # custom
        custom_entry = next((p for p in pstc if isinstance(p, dict) and p.get('source') == 'custom_db'), None)

        ####
        # hierarchy
        # if it exists, custom is at the top
        # custom
        # if not
        # 1. SwissProt
        # 2. AFDB
        # 3. PDB
        # 4. CATH
        ####

        if custom_entry:
            pstc_product = custom_entry['description'] 
        elif swissprot_entry:
            pstc_product = swissprot_entry['description']
        elif afdb_entry:
            pstc_product = afdb_entry['description'] 
        elif pdb_entry:
            pstc_product = pdb_entry['description'] 
        elif cath_entry:
            pstc_product = cath_entry['description'] 
        else:
            pstc_product = None

        if(pstc_product):
            product = pstc_product

        # Collect all db_xref IDs
        for entry in pstc:
            if isinstance(entry, dict):
                src = entry.get('source', '').lower()
                eid = entry.get('id')
                if eid:
                    if src == 'afdb':
                        if not fast:
                            db_xrefs.append(f"afdb_v6:afdbclusters_{eid}")
                    elif src == 'swissprot':
                        db_xrefs.append(f"afdb_v6:swissprot_{eid}")
                    elif src == 'pdb':
                        db_xrefs.append(f"pdb:pdb_{eid}")
                    elif src == 'cath':
                        db_xrefs.append(f"cath:cath_{eid}")
                    elif src == 'custom_db':
                        db_xrefs.append(f"custom:custom_{eid}")
                    else:
                        db_xrefs.append(eid)
            elif isinstance(entry, str):
                # Preserve any existing string cross-references
                db_xrefs.append(entry)

        # mark as baktfold
        mark_as_baktfold(feature)




    # if(len(expert_hits) > 0):
    #     top_expert_hit = sorted(expert_hits,key=lambda k: (k['rank'], k.get('score', 0), calc_annotation_score(k)), reverse=True)[0]
    #     expert_genes = top_expert_hit.get('gene', None)
    #     if(expert_genes):
    #         expert_genes = expert_genes.replace('/', ',').split(',')
    #         genes.update(expert_genes)
    #         gene = expert_genes[0]
    #     product = top_expert_hit.get('product', None)
    #     for hit in expert_hits:
    #         db_xrefs.update(hit.get('db_xrefs', []))

    if product and "hypothetical protein" not in product.lower():
        product = revise_cds_product(product)
        if(product):
            if(cfg.compliant):
                product = insdc.revise_product_insdc(product)
            feature['product'] = product

            unmark_as_hypothetical(feature)

            # protein_gene_symbol = extract_protein_gene_symbol(product)
            # if(protein_gene_symbol):
            #     genes.add(protein_gene_symbol)
            # revised_genes = revise_cds_gene_symbols(genes)
            # revised_gene = None
            # if gene is not None:
            #     revised_gene = revise_cds_gene_symbols([gene])  # special treatment for selected gene symbol
            #     revised_gene = revised_gene[0] if len(revised_gene) > 0 else None
            # if(revised_gene is None  and  len(revised_genes) >= 1):  # select first from gene symbol list if no symbol was selected before
            #     revised_gene = revised_genes[0]

            # feature['gene'] = revised_gene
            # feature['genes'] = sorted(revised_genes)
        else:
            mark_as_hypothetical(feature)
    else:
        mark_as_hypothetical(feature)

    feature['db_xrefs'] = sorted(list(db_xrefs))

extract_protein_gene_symbol(product)

Extracts a valid gene symbol from a protein name.

Parameters:

Name Type Description Default
product str

The protein name to extract a gene symbol from.

required

Returns:

Name Type Description
str str

The extracted gene symbol.

Examples:

>>> extract_protein_gene_symbol(product)
Source code in src/baktfold/bakta/annotation.py
def extract_protein_gene_symbol(product: str) -> str:
    """
    Extracts a valid gene symbol from a protein name.

    Args:
      product (str): The protein name to extract a gene symbol from.

    Returns:
      str: The extracted gene symbol.

    Examples:
      >>> extract_protein_gene_symbol(product)
    """
    gene_symbols = []
    for part in product.split(' '):  # try to extract valid gene symbols
        m = RE_GENE_SYMBOL.fullmatch(part)
        if(m):
            symbol = m[0]
            logger.info('fix gene: extract symbol from protein name. symbol=%s', symbol)
            gene_symbols.append(symbol)
        else:
            m = RE_PROTEIN_SYMBOL.fullmatch(part)  # extract protein names
            if(m):
                symbol = m[0]
                symbol = symbol[0].lower() + symbol[1:]
                logger.info('fix gene: extract symbol from protein name. symbol=%s', symbol)
                gene_symbols.append(symbol)
    if(len(gene_symbols) == 0):  # None found
        return None
    elif(len(gene_symbols) == 1):  # found 1
        return gene_symbols[0]
    else:  # found more than one, take the 2nd as the 1st often describes a broader gene family like "xyz family trancsriptional regulator ..."
        return gene_symbols[1]

mark_as_baktfold(feature)

Adds the baktfold key to the given feature dictionary.

Parameters:

Name Type Description Default
feature dict

The feature dictionary to add the baktfold key to.

required

Returns:

Type Description

None

Examples:

>>> feature = {'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}
>>> mark_as_baktfold(feature)
>>> feature
{'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+', 'baktfold': True}
Source code in src/baktfold/bakta/annotation.py
def mark_as_baktfold(feature: dict):
    """
    Adds the baktfold key to the given feature dictionary.

    Args:
      feature (dict): The feature dictionary to add the baktfold key to.

    Returns:
      None

    Examples:
      >>> feature = {'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}
      >>> mark_as_baktfold(feature)
      >>> feature
      {'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+', 'baktfold': True}
    """
    # logger.info(
    #     f'baktfold found hit(s) for: seq={feature['sequence']}, start={feature['start']}, stop={feature['stop']}, strand={feature['strand']}'
    # )
    feature['baktfold'] = True

mark_as_hypothetical(feature)

Marks a feature as hypothetical.

Parameters:

Name Type Description Default
feature dict

The feature to mark as hypothetical.

required

Returns:

Type Description

None

Examples:

>>> mark_as_hypothetical(feature)
Source code in src/baktfold/bakta/annotation.py
def mark_as_hypothetical(feature: dict):
    """
    Marks a feature as hypothetical.

    Args:
      feature (dict): The feature to mark as hypothetical.

    Returns:
      None

    Examples:
      >>> mark_as_hypothetical(feature)
    """
    # no need to actually print this I think
    # logger.info(
    #     f'marked as hypothetical: seq={feature['sequence']}, start={feature['start']}, stop={feature['stop']}, strand={feature['strand']}'
    # )
    feature['hypothetical'] = True
    feature['gene'] = None
    feature['genes'] = []
    feature['product'] = bc.HYPOTHETICAL_PROTEIN

revise_cds_gene_symbols(raw_genes)

Revises a list of gene symbols to ensure they are valid.

Parameters:

Name Type Description Default
raw_genes Sequence[str]

The list of gene symbols to revise.

required

Returns:

Name Type Description
list

The revised list of gene symbols.

Examples:

>>> revise_cds_gene_symbols(raw_genes)
Source code in src/baktfold/bakta/annotation.py
def revise_cds_gene_symbols(raw_genes: Sequence[str]):
    """
    Revises a list of gene symbols to ensure they are valid.

    Args:
      raw_genes (Sequence[str]): The list of gene symbols to revise.

    Returns:
      list: The revised list of gene symbols.

    Examples:
      >>> revise_cds_gene_symbols(raw_genes)
    """
    revised_genes = set()
    for gene in raw_genes:
        old_gene = gene
        if(RE_GENE_SUSPECT_CHARS.search(gene)):  # check for suspect characters -> remove gene symbol
            logger.info('fix gene: remove gene symbol containing suspect chars. old=%s', old_gene)
            continue

        old_gene = gene
        gene = gene.replace('gene', '')
        if(gene != old_gene):  # remove gene literal
            logger.info('fix gene: remove gene literal. new=%s, old=%s', gene, old_gene)

        old_gene = gene
        if(gene[-1] == '-'):  # remove orphan hyphen
            gene = gene[:-1]
            logger.info('fix gene: remove orphan hypen. new=%s, old=%s', gene, old_gene)

        old_gene = gene
        gene = RE_MULTIWHITESPACE.sub(' ', gene).strip()  # revise whitespaces
        if(gene != old_gene):
            logger.info('fix gene: revise whitespaces. new=%s, old=%s', gene, old_gene)

        old_gene = gene
        if(RE_GENE_CAPITALIZED.fullmatch(gene)):
            gene = gene[0].lower() + gene[1:]
            logger.info('fix gene: lowercase first char. new=%s, old=%s', gene, old_gene)

        if(len(gene) >= 3):
            if(len(gene) <= 12):
                revised_genes.add(gene)
            else:
                old_gene = gene
                gene = extract_protein_gene_symbol(gene)
                if(gene):
                    revised_genes.add(gene)
    return list(revised_genes)

revise_cds_product(product)

Revise product name for INSDC compliant submissions

Source code in src/baktfold/bakta/annotation.py
def revise_cds_product(product: str):
    """Revise product name for INSDC compliant submissions"""

    # from gb 
    # grep "Uncharacterized protein" AFDBClusters.tsv | wc -l
    #     805448

    if "Uncharacterized protein" in product:
        old_product = product
        product = "hypothetical protein"
        if product != old_product:
            logger.info(f'fix product: renamed uncharacterized protein as hypothetical. new={product}, old={old_product}')

    # from bakta

    old_product = product
    product = RE_PROTEIN_WEIGHT.sub(' ', product)  # remove protein weight in (k)Da
    if(product != old_product):
        logger.info('fix product: remove protein weight in (k)Da. new=%s, old=%s', product, old_product)

    old_product = product
    product = re.sub(RE_PROTEIN_PERIOD_SEPARATOR, r'\1-\2', product)  # replace separator periods
    if(product != old_product):
        logger.info('fix product: replace separator periods. new=%s, old=%s', product, old_product)

    old_product = product
    if(product[0] in RE_PROTEIN_SUSPECT_CHARS_BEGINNING):  # remove suspect first character
        product = product[1:]
        logger.info('fix product: replace invalid first character. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_PROTEIN_SUSPECT_CHARS_DISCARD.sub('', product)  # remove suspect characters
    if(product != old_product):
        logger.info('fix product: replace invalid characters. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_PROTEIN_SUSPECT_CHARS_REPLACE.sub(' ', product)  # replace suspect characters by single whitespace
    if(product != old_product):
        logger.info('fix product: replace invalid characters. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_PROTEIN_WRONG_PRIMES.sub('\u0027', product)  # replace wrong prime characters with single quote (U+0027) (') according to https://www.ncbi.nlm.nih.gov/genome/doc/internatprot_nomenguide/
    if(product != old_product):
        logger.info('fix product: replace wrong prime characters. new=%s, old=%s', product, old_product)

    old_product = product
    product = product.replace('FOG:', '')  # remove FOG ids
    if(product != old_product):
        logger.info('fix product: replace FOG ids. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_PROTEIN_REMNANT.sub('', product)  # remove 'Remnant of's
    if(product != old_product):
        logger.info('fix product: replace remnant ofs. new=%s, old=%s', product, old_product)

    old_product = product
    dufs = []  # replace DUF-containing products
    for m in RE_DOMAIN_OF_UNKNOWN_FUNCTION.finditer(product):
        dufs.append(m.group(1).upper())
    if(len(dufs) >= 1):
        product = f"{' '.join(dufs)} domain{'s' if len(dufs) > 1 else ''}-containing protein"
        if(product != old_product):
            logger.info('fix product: revise DUF. new=%s, old=%s', product, old_product)

    old_product = product
    if('conserved' in product.lower()):  # replace conserved UPF proteins
        upfs = []
        for m in RE_UNCHARACTERIZED_PROTEIN_FAMILY.finditer(product):
            upfs.append(m.group(1).upper())
        if(len(upfs) >= 1):
            product = f"{' '.join(upfs)} protein"
            if(product != old_product):
                logger.info('fix product: revise UPF. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_PROTEIN_HOMOLOG.sub('-like protein', product)  # replace Homologs
    if(product != old_product):
        if(product.count('protein') == 2):
            product = product.replace('protein', '', 1)  # remove former protein term if existing
        logger.info('fix product: replace Homolog. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_MULTIWHITESPACE.sub(' ', product).strip()  # revise whitespaces
    if(product != old_product):
        logger.info('fix product: revise whitespaces. new=%s, old=%s', product, old_product)

    old_product = product
    product = RE_PROTEIN_PUTATIVE.sub('putative', product)  # replace putative synonyms)
    if(product != old_product):
        logger.info('fix product: replace putative synonyms. new=%s, old=%s', product, old_product)

    old_product = product
    if(RE_PROTEIN_DOMAIN_CONTAINING.search(product)):  # replace domain name underscores in domain names
        product = product.replace('_', '-')
        if(product != old_product):
            logger.info('fix product: replace domain name underscores. new=%s, old=%s', product, old_product)

    old_product = product
    if(RE_PROTEIN_TMRNA.fullmatch(product)):
        product = ''
        logger.info('fix product: discard pure tmRNA product descriptions. new=%s, old=%s', product, old_product)

    old_product = product
    if(
        RE_PROTEIN_CONTIG.search(product) or  # protein containing 'sequence'
        RE_PROTEIN_NODE.search(product) or  # potential contig name (SPAdes)
        RE_PROTEIN_POTENTIAL_CONTIG_NAME.search(product) or  # potential contig name (SPAdes)
        RE_PROTEIN_NO_LETTERS.fullmatch(product)  # no letters -> set to Hypothetical
        ):  # remove suspect products and mark as hypothetical
        product = None
        logger.info('remove product: mark proteins with suspect products as hypothetical. old=%s', old_product)

    return product

unmark_as_hypothetical(feature)

Removes the hypothetical key from the given feature dictionary.

Parameters:

Name Type Description Default
feature dict

The feature dictionary to remove the hypothetical key from.

required

Returns:

Type Description

None

Examples:

>>> feature = {'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}
>>> unmark_as_hypothetical(feature)
>>> feature
{'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}
Source code in src/baktfold/bakta/annotation.py
def unmark_as_hypothetical(feature: dict):
    """
    Removes the hypothetical key from the given feature dictionary.

    Args:
      feature (dict): The feature dictionary to remove the hypothetical key from.

    Returns:
      None

    Examples:
      >>> feature = {'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}
      >>> unmark_as_hypothetical(feature)
      >>> feature
      {'sequence': 'ATG', 'start': 1, 'stop': 3, 'strand': '+'}
    """
    # logger.info(
    #     f'unmarked as hypothetical: seq={feature['sequence']}, start={feature['start']}, stop={feature['stop']}, strand={feature['strand']}'
    # )
    feature.pop('hypothetical', None)  # remove completely

check_content_size(file_name, file_path)

Checks if a file is empty.

Parameters:

Name Type Description Default
file_name str

The name of the file to check.

required
file_path Path

The path to the file to check.

required

Returns:

Type Description

None.

Examples:

>>> check_content_size('file', Path('path/to/file'))
None
Source code in src/baktfold/bakta/config.py
def check_content_size(file_name: str, file_path: Path):
    """
    Checks if a file is empty.

    Args:
      file_name (str): The name of the file to check.
      file_path (Path): The path to the file to check.

    Returns:
      None.

    Examples:
      >>> check_content_size('file', Path('path/to/file'))
      None
    """
    if(file_path.stat().st_size == 0):
        log.error('empty %s file! path=%s', file_name, file_path)
        sys.exit(f'ERROR: {file_name} file ({file_path}) is empty!')

check_db_path(args)

Checks the path to the database.

Parameters:

Name Type Description Default
args Namespace

The arguments passed to the program.

required

Returns:

Name Type Description
Path Path

The path to the database.

Examples:

>>> check_db_path(args)
Path('path/to/db')
Source code in src/baktfold/bakta/config.py
def check_db_path(args: Namespace) -> Path:
    """
    Checks the path to the database.

    Args:
      args (Namespace): The arguments passed to the program.

    Returns:
      Path: The path to the database.

    Examples:
      >>> check_db_path(args)
      Path('path/to/db')
    """
    global db_path
    env = os.environ.copy()
    if(args.db):
        db_dir = args.db
        log.debug('test parameter db: db_tmp=%s', db_dir)
        try:
            db_tmp_path = Path(db_dir).resolve()
            if(db_tmp_path.is_dir()):
                db_path = db_tmp_path
                log.info('database: type=parameter, path=%s', db_path)
            else:
                log.error('unvalid database path: type=parameter, path=%s', db_tmp_path)
                raise IOError()
        except:
            sys.exit(f'ERROR: wrong database path! --db={db_dir}')
    elif('BAKTA_DB' in env):
        db_dir = env['BAKTA_DB']
        log.debug('test env db: db_tmp=%s', db_dir)
        try:
            db_tmp_path = Path(db_dir).resolve()
            if(db_tmp_path.is_dir()):
                db_path = db_tmp_path
                log.info('database: type=environment, path=%s', db_path)
            else:
                log.error('unvalid database path: type=environment, path=%s', db_tmp_path)
                raise IOError()
        except:
            sys.exit(f'ERROR: wrong database path! BAKTA_DB={db_dir}')
    else:
        base_dir = Path(__file__).parent
        db_tmp_path = base_dir.joinpath('db')
        log.debug('test base_dir db: db_tmp=%s', db_tmp_path)
        if(db_tmp_path.is_dir()):
            db_path = db_tmp_path
            log.info('database: type=base-dir, path=%s', db_path)
        else:
            log.error('unvalid database path: type=base-dir, path=%s', db_tmp_path)
            sys.exit('ERROR: database neither provided nor auto-detected!\nPlease, download the mandatory db and provide it via either the --db parameter, a BAKTA_DB environment variable or copy it into the Bakta base directory.\nFor further information please read the readme.md')
    return db_path

check_output_path(output, force_override)

Check provided output path

Parameters:

Name Type Description Default
output string

The output directory destination path

required
force_override Bool

Whether to override existing output directories

required
Source code in src/baktfold/bakta/config.py
def check_output_path(output: str, force_override: bool) -> Path:
    """Check provided output path
    Args:
        output (string): The output directory destination path
        force_override (Bool): Whether to override existing output directories
    """
    global output_path
    output_path = Path(output)
    if(not output_path.exists()):
        try:
            output_path.mkdir(parents=True, exist_ok=True)
        except:
            sys.exit(f'ERROR: could not resolve or create output directory ({output})!')
    else:
        if(output_path == Path(os.getcwd())):
            pass
        elif(force_override is False):
            sys.exit(f'ERROR: output path ({output_path}) already exists! Either provide a non-existent new path or force overwriting it via \'--force\'')
        elif(not os.access(str(output_path), os.X_OK)):
            sys.exit(f'ERROR: output path ({output_path}) not accessible!')
        elif(not os.access(str(output_path), os.W_OK)):
            sys.exit(f'ERROR: output path ({output_path}) not writable!')
    output_path = output_path.resolve()
    return output_path

check_readability(file_name, file_Path)

Checks if a file is readable.

Parameters:

Name Type Description Default
file_name str

The name of the file to check.

required
file_Path Path

The path to the file to check.

required

Returns:

Type Description

None.

Examples:

>>> check_readability('file', Path('path/to/file'))
None
Source code in src/baktfold/bakta/config.py
def check_readability(file_name: str, file_Path: Path):
    """
    Checks if a file is readable.

    Args:
      file_name (str): The name of the file to check.
      file_Path (Path): The path to the file to check.

    Returns:
      None.

    Examples:
      >>> check_readability('file', Path('path/to/file'))
      None
    """
    if(not os.access(str(file_Path), os.R_OK)):
        log.error('%s file not readable! path=%s', file_name, file_Path)
        sys.exit(f'ERROR: {file_name} file ({file_Path}) not readable!')

check_threads(args)

Checks the number of threads to use.

Parameters:

Name Type Description Default
args Namespace

The arguments passed to the program.

required

Returns:

Name Type Description
int int

The number of threads to use.

Examples:

>>> check_threads(args)
4
Source code in src/baktfold/bakta/config.py
def check_threads(args: Namespace) -> int:
    """
    Checks the number of threads to use.

    Args:
      args (Namespace): The arguments passed to the program.

    Returns:
      int: The number of threads to use.

    Examples:
      >>> check_threads(args)
      4
    """
    global threads
    threads = args.threads

    try:
        max_threads = len(os.sched_getaffinity(0))
        log.debug(f"max-threads={max_threads}")
    except AttributeError:
        max_threads = mp.cpu_count()
        log.debug(f"scheduler affinity not availabe! max-threads={max_threads}")

    if(threads == 0):
        threads = max_threads
        log.debug("request max threads.")
    elif(threads < 0):
        log.error("wrong argument for 'threads' parameter! threads=%i", threads)
        sys.exit(f"ERROR: wrong argument ({threads}) for 'threads' parameter! Value must be larger than/equal to 0.")
    elif(threads > max_threads):
        log.error("wrong argument for 'threads' parameter! More threads requested than available: requested=%i, available=%i", threads, max_threads)
        sys.exit(f"ERROR: wrong argument ({threads}) for 'threads' parameter! More threads requested ({threads}) than available ({max_threads}).")
    log.info('threads=%i', threads)
    return threads

check_tmp_path(args)

Checks the path to the temporary directory.

Parameters:

Name Type Description Default
args Namespace

The arguments passed to the program.

required

Returns:

Name Type Description
Path Path

The path to the temporary directory.

Examples:

>>> check_tmp_path(args)
Path('path/to/tmp_dir')
Source code in src/baktfold/bakta/config.py
def check_tmp_path(args: Namespace) -> Path:
    """
    Checks the path to the temporary directory.

    Args:
      args (Namespace): The arguments passed to the program.

    Returns:
      Path: The path to the temporary directory.

    Examples:
      >>> check_tmp_path(args)
      Path('path/to/tmp_dir')
    """
    global tmp_path
    if(args.tmp_dir is not None):
        tmp_path = Path(args.tmp_dir)
        if(not tmp_path.exists()):
            log.debug('dedicated temp dir does not exist! tmp-dir=%s', tmp_path)
            sys.exit(f'ERROR: dedicated temporary directory ({tmp_path}) does not exist!')
        else:
            log.info('use dedicated temp dir: path=%s', tmp_path)
            tmp_path = Path(tempfile.mkdtemp(dir=str(tmp_path))).resolve()
    else:
        tmp_path = Path(tempfile.mkdtemp()).resolve()
    log.info('tmp-path=%s', tmp_path)
    return tmp_path

check_user_proteins(args)

Checks the path to the user proteins file.

Parameters:

Name Type Description Default
args Namespace

The arguments passed to the program.

required

Returns:

Name Type Description
Path

The path to the user proteins file.

Examples:

>>> check_user_proteins(args)
Path('path/to/user_proteins')
Source code in src/baktfold/bakta/config.py
def check_user_proteins(args: Namespace):
    """
    Checks the path to the user proteins file.

    Args:
      args (Namespace): The arguments passed to the program.

    Returns:
      Path: The path to the user proteins file.

    Examples:
      >>> check_user_proteins(args)
      Path('path/to/user_proteins')
    """
    global user_proteins
    user_proteins = args.proteins
    if(user_proteins is not None):
        try:
            if(user_proteins == ''):
                raise ValueError('File path argument must be non-empty')
            user_proteins_path = Path(args.proteins).resolve()
            check_readability('user proteins', user_proteins_path)
            check_content_size('user proteins', user_proteins_path)
            user_proteins = user_proteins_path
            log.info('user-proteins=%s', user_proteins)
            return user_proteins
        except:
            log.error('provided user proteins file not valid! path=%s', user_proteins)
            sys.exit(f'ERROR: user proteins file ({user_proteins}) not valid!')
    else:
        return None

setup(args)

Test environment and build a runtime configuration.

Source code in src/baktfold/bakta/config.py
def setup(args):
    """Test environment and build a runtime configuration."""
    # runtime configurations
    global env, threads, verbose, debug
    env['BLAST_USAGE_REPORT'] = 'false'  # prevent BLAST from contacting NCBI

    threads = check_threads(args)
    verbose = args.verbose
    log.info('verbose=%s', verbose)
    debug = args.debug
    log.info('debug=%s', debug)
    if(debug):
        verbose = True

    # input / output path configurations
    global db_path, db_info, tmp_path, genome_path, min_sequence_length, prefix, output_path, force
    db_path = check_db_path(args)
    tmp_path = check_tmp_path(args)

    try:
        if(args.genome == ''):
            raise ValueError('File path argument must be non-empty')
        genome_path = Path(args.genome).resolve()
        check_readability('genome', genome_path)
        check_content_size('genome', genome_path)
    except:
        log.error('provided genome file not valid! path=%s', args.genome)
        sys.exit(f'ERROR: genome file ({args.genome}) not valid!')
    log.info('genome-path=%s', genome_path)

    # input / output configurations
    min_sequence_length = args.min_contig_length
    if(min_sequence_length <= 0):
        log.error("wrong argument for 'min-contig-length' parameter! min_contig_length=%s", min_sequence_length)
        sys.exit(f"ERROR: wrong argument ({min_sequence_length}) for 'min- contig-length' parameter! Value must be larger than 0")
    log.info('min_contig_length=%s', min_sequence_length)
    log.info('prefix=%s', prefix)  # set in main.py before global logger config
    log.info('output-path=%s', output_path)
    force = args.force
    log.info('force=%s', force)

    # organism configurations
    global genus, species, strain, plasmid, taxon
    genus = args.genus
    if(genus is not None):
        genus = genus.strip()
        if(genus == ''):
            log.error("Empty 'genus' parameter! genus=%s", genus)
            sys.exit(f"ERROR: empty 'genus' parameter!")
        else:
            genus = genus.capitalize()
    log.info('genus=%s', genus)
    species = args.species
    if(species is not None):
        species = species.strip()
        if(species == ''):
            log.error("Empty 'species' parameter! species=%s", species)
            sys.exit(f"ERROR: empty 'species' parameter!")
        else:
            species = species.lower()
    log.info('species=%s', species)
    strain = args.strain
    if(strain is not None):
        strain = strain.strip()
        if(strain == ''):
            log.error("Empty 'strain' parameter! strain=%s", species)
            sys.exit(f"ERROR: empty 'strain' parameter!")
    log.info('strain=%s', strain)
    plasmid = args.plasmid
    if(plasmid is not None):
        plasmid = plasmid.strip()
        if(plasmid == ''):
            log.error("Empty 'plasmid' parameter! plasmid=%s", plasmid)
            sys.exit(f"ERROR: empty 'plasmid' parameter!")
        elif('plasmid' in plasmid.lower()):
            log.error("Wrong 'plasmid' parameter! plasmid=%s", plasmid)
            sys.exit(f"ERROR: wrong 'plasmid' parameter! The plasmid name mustn't contain the word 'plasmid'.")
        elif(PLASMID_NAME_PATTERN.fullmatch(plasmid) is None and PLASMID_UNNAMED_PATTERN.fullmatch(plasmid) is None):
            log.error("Wrong 'plasmid' name! plasmid=%s", plasmid)
            sys.exit(f"ERROR: wrong 'plasmid' name! Plasmid names must either be named as 'unnamed', 'unnamed1', ... or start with a lower 'p', contain only digits, dots, underscores and letters, and are limited to 20 characters in total.")
    log.info('plasmid=%s', plasmid)
    taxon = ' '.join([t for t in [genus, species, strain] if t is not None])
    if(taxon == ''):
        taxon = None

    # annotation configurations
    global complete, prodigal_tf, translation_table, keep_sequence_headers, locus, locus_tag, locus_tag_increment, gram, replicons, compliant, user_proteins, user_hmms, meta, regions
    complete = args.complete
    log.info('complete=%s', complete)
    prodigal_tf = args.prodigal_tf
    if(prodigal_tf is not None):
        try:
            if(prodigal_tf == ''):
                raise ValueError('File path argument must be non-empty')
            prodigal_tf_path = Path(args.prodigal_tf).resolve()
            check_readability('prodigal training', prodigal_tf_path)
            check_content_size('prodigal training', prodigal_tf_path)
            prodigal_tf = prodigal_tf_path
        except:
            log.error('provided prodigal training file not valid! path=%s', prodigal_tf)
            sys.exit(f'ERROR: Prodigal training file ({prodigal_tf}) not valid!')
    log.info('prodigal_tf=%s', prodigal_tf)
    translation_table = args.translation_table
    log.info('translation_table=%s', translation_table)
    gram = args.gram
    log.info('gram=%s', gram)
    compliant = args.compliant
    log.info('compliant=%s', compliant)
    if(compliant):
        min_sequence_length = 200
        log.info('compliant mode! min_contig_length=%s', min_sequence_length)
    meta = args.meta
    log.info('meta=%s', meta)
    locus = args.locus
    if(locus is not None):
        if(locus == ''):
            log.error("Empty 'locus' parameter! locus=%s", locus)
            sys.exit(f"ERROR: empty 'locus' parameter!")
        if(' ' in locus):
            log.error("Whitespace character in 'locus' parameter! locus=%s", locus)
            sys.exit(f"ERROR: whitespace character ({locus}) in 'locus' parameter!")
        if(bc.RE_INSDC_ID_PREFIX.fullmatch(locus) is None):
            log.error("Invalid 'locus' parameter! locus=%s", locus)
            sys.exit(f"ERROR: invalid 'locus' parameter ({locus})!\nLocus prefixes must contain between 1 and 20 alphanumeric or '-_' characters.")
    log.info('locus=%s', locus)
    locus_tag = args.locus_tag
    if(locus_tag is not None):
        if(locus_tag == ''):
            log.error("Empty 'locus-tag' parameter! locus=%s", locus_tag)
            sys.exit(f"ERROR: empty 'locus-tag' parameter!")
        if(' ' in locus_tag):
            log.error("Whitespace character in 'locus-tag' parameter! locus-tag=%s", locus_tag)
            sys.exit(f"ERROR: whitespace character ({locus_tag}) in 'locus-tag' parameter!")
        if(compliant):
            if(bc.RE_INSDC_LOCUSTAG_PREFIX.fullmatch(locus_tag) is None):
                log.error("INSDC-incompliant 'locus-tag' parameter! locus-tag=%s", locus_tag)
                sys.exit(f"ERROR: INSDC-incompliant 'locus-tag' parameter ({locus_tag})!\nINSDC Locus tag prefixes must contain between 3 and 12 alphanumeric uppercase characters and start with a letter.")
        else:
            if(bc.RE_LOCUSTAG_PREFIX.fullmatch(locus_tag) is None):
                log.error("Invalid 'locus-tag' parameter! locus-tag=%s", locus_tag)
                sys.exit(f"ERROR: invalid 'locus-tag' parameter ({locus_tag})!\nLocus tag prefixes must contain between 1 and 24 alphanumeric characters or '_.-' signs.")
    log.info('locus-tag=%s', locus_tag)
    locus_tag_increment = args.locus_tag_increment
    log.info('locus-tag-increment=%s', locus_tag_increment)
    keep_sequence_headers = args.keep_contig_headers
    log.info('keep_contig_headers=%s', keep_sequence_headers)
    replicons = args.replicons
    if(replicons is not None):
        try:
            if(replicons == ''):
                raise ValueError('File path argument must be non-empty')
            replicon_table_path = Path(args.replicons).resolve()
            check_readability('replicon table', replicon_table_path)
            check_content_size('replicon table', replicon_table_path)
            replicons = replicon_table_path
        except:
            log.error('provided replicon file not valid! path=%s', replicons)
            sys.exit(f'ERROR: replicon table file ({replicons}) not valid!')
    log.info('replicon-table=%s', replicons)
    user_proteins = check_user_proteins(args)
    user_hmms = args.hmms
    if(user_hmms is not None):
        try:
            if(user_hmms == ''):
                raise ValueError('File path argument must be non-empty')
            user_hmms_path = Path(user_hmms).resolve()
            check_readability('HMM', user_hmms_path)
            check_content_size('HMM', user_hmms_path)
            user_hmms = user_hmms_path
        except:
            log.error('provided HMM file not valid! path=%s', user_hmms)
            sys.exit(f'ERROR: HMM file ({user_hmms}) not valid!')

    regions = args.regions
    if(regions is not None):
        try:
            if(regions == ''):
                raise ValueError('File path argument must be non-empty')
            regions_path = Path(args.regions).resolve()
            check_readability('regions', regions_path)
            check_content_size('regions', regions_path)
            regions = regions_path
        except:
            log.error('provided regions file not valid! path=%s', regions)
            sys.exit(f'ERROR: regions file ({regions}) not valid!')
    log.info('regions=%s', regions)


    # workflow configurations
    global skip_trna, skip_tmrna, skip_rrna, skip_ncrna, skip_ncrna_region, skip_crispr, skip_cds, skip_pseudo, skip_sorf, skip_gap, skip_ori, skip_filter, skip_plot
    skip_trna = args.skip_trna
    log.info('skip-tRNA=%s', skip_trna)
    skip_tmrna = args.skip_tmrna
    log.info('skip-tmRNA=%s', skip_tmrna)
    skip_rrna = args.skip_rrna
    log.info('skip-rRNA=%s', skip_rrna)
    skip_ncrna = args.skip_ncrna
    log.info('skip-ncRNA=%s', skip_ncrna)
    skip_ncrna_region = args.skip_ncrna_region
    log.info('skip-ncRNA-region=%s', skip_ncrna_region)
    skip_crispr = args.skip_crispr
    log.info('skip-CRISPR=%s', skip_crispr)
    skip_cds = args.skip_cds
    log.info('skip-CDS=%s', skip_cds)
    skip_pseudo = args.skip_pseudo
    log.info('skip-pseudo=%s', skip_pseudo)
    skip_sorf = args.skip_sorf
    log.info('skip-sORF=%s', skip_sorf)
    skip_gap = args.skip_gap
    log.info('skip-gap=%s', skip_gap)
    skip_ori = args.skip_ori
    log.info('skip-ori=%s', skip_ori)
    skip_filter = args.skip_filter
    log.info('skip-filter=%s', skip_filter)
    skip_plot = args.skip_plot
    log.info('skip-plot=%s', skip_plot)

fetch_db_pscc_result(conn, uniref50_id)

Fetches the PSCC result for a given uniref50_id from a sqlite3 database.

Parameters:

Name Type Description Default
conn sqlite3.Connection

The connection to the sqlite3 database.

required
uniref50_id str

The uniref50_id to fetch the PSCC result for.

required

Returns:

Name Type Description
tuple

The PSCC result for the given uniref50_id.

Source code in src/baktfold/bakta/pstc.py
def fetch_db_pscc_result(conn: sqlite3.Connection, uniref50_id: str):
    """
    Fetches the PSCC result for a given uniref50_id from a sqlite3 database.

    Args:
      conn (sqlite3.Connection): The connection to the sqlite3 database.
      uniref50_id (str): The uniref50_id to fetch the PSCC result for.

    Returns:
      tuple: The PSCC result for the given uniref50_id.
    """
    c = conn.cursor()
    c.execute('select * from pscc where uniref50_id=?', (uniref50_id,))
    rec = c.fetchone()
    c.close()
    return rec

fetch_sql_description(conn, source, accession)

Fetches the product description for a given source and accession from a sqlite3 database.

Parameters:

Name Type Description Default
conn sqlite3.Connection

The connection to the sqlite3 database.

required
source str

The source of the accession.

required
accession str

The accession to fetch the description for.

required

Returns:

Name Type Description
str

The product description for the given source and accession.

Source code in src/baktfold/bakta/pstc.py
def fetch_sql_description(conn, source, accession):
    """
    Fetches the product description for a given source and accession from a sqlite3 database.

    Args:
      conn (sqlite3.Connection): The connection to the sqlite3 database.
      source (str): The source of the accession.
      accession (str): The accession to fetch the description for.

    Returns:
      str: The product description for the given source and accession.
    """
    table_map = {
        'swissprot': 'swissprot',
        'afdb': 'afdbclusters',
        'pdb': 'pdb',
        'cath': 'cath',
    }

    table = table_map.get(source)
    if table is None:
        return None

    # special case for cath, which can have multiple top hits (greedy) - multidomain proteins
    if table == 'cath':
        cursor = conn.execute("SELECT product FROM cath WHERE id = ?", (accession,))
    else:
        cursor = conn.execute(f"SELECT product FROM {table} WHERE id = ?", (accession,))

    row = cursor.fetchone()
    return row[0] if row else None

fetch_sql_description_threadsafe(db_path, source, accession)

makes new connection every time so don't have 2 CATH accessions colliding (for multi domain proteins)

Source code in src/baktfold/bakta/pstc.py
def fetch_sql_description_threadsafe(db_path, source, accession):
    """
    makes new connection every time so don't have 2 CATH accessions colliding (for multi domain proteins)
    """
    import sqlite3
    conn = sqlite3.connect(db_path, uri=True, check_same_thread=False)
    try:
        result = fetch_sql_description(conn, source, accession)
    finally:
        conn.close()
    return result

lookup_custom(features, baktfold_db, custom_annotations)

Lookup PSTC information from custom db

Source code in src/baktfold/bakta/pstc.py
def lookup_custom(features: Sequence[dict], baktfold_db: Path, custom_annotations: Path):
    """Lookup PSTC information from custom db """
    no_pstc_lookups = 0

    # custom
    if custom_annotations:
        custom_dict = {}
        with open(f"{custom_annotations}", "r") as f:
            reader = csv.reader(f, delimiter="\t")
            for row in reader:
                if len(row) >= 2:
                    custom_dict[row[0]] = row[1]

    for feat in features:
        pstc = feat.get('pstc')
        if not pstc:
            continue

        # Normalize to list for consistent handling
        pstc_entries = pstc if isinstance(pstc, list) else [pstc]

        for entry in pstc_entries:
            accession = entry.get('id')
            source = entry.get('source')
            if source == 'custom_db':
                if accession in custom_dict:
                    entry['description'] = custom_dict[accession]
                else:
                    entry['description'] = accession # mark as accession if no annotation given for custom for now

        # Write back normalized list or single entry
        feat['pstc'] = pstc_entries if isinstance(pstc, list) else pstc_entries[0]

    return features

lookup_sql(features, baktfold_db, threads)

Lookup PSTC information

Source code in src/baktfold/bakta/pstc.py
def lookup_sql(features: Sequence[dict], baktfold_db: Path, threads: int):
    """Lookup PSTC information"""

    no_pstc_lookups = 0
    # try:
    rec_futures = []
    logger.info("Looking up PSTC descriptions")
    # with sqlite3.connect(f"file:{baktfold_db.joinpath('baktfold.db')}?mode=ro&nolock=1&cache=shared", uri=True, check_same_thread=False) as conn:
    #     conn.execute('PRAGMA omit_readlock;')
    #     conn.row_factory = sqlite3.Row
    with ThreadPoolExecutor(max_workers=max(10, threads)) as tpe:  # use min 10 threads for IO bound non-CPU lookups
        for feat in features:
            pstc = feat.get('pstc')
            if not pstc:
                continue

            # Normalize to list for consistent handling
            pstc_entries = pstc if isinstance(pstc, list) else [pstc]

            rec_futures = []
            for entry in pstc_entries:
                accession = entry.get('id')

                source = entry.get('source')

                # submit database query as a future
                future = tpe.submit(fetch_sql_description_threadsafe, baktfold_db.joinpath('baktfold.db'), source, accession)
                rec_futures.append((entry, future))


            # Collect results
            for entry, future in rec_futures:
                desc = future.result()
                if desc:
                    entry['description'] = desc
                else:
                    if entry.get('source') == 'custom_db':
                        entry['description'] = accession  # keep accession if custom_db but missing
                    else:
                        entry['description'] = "hypothetical protein"

            # Write back normalized list or single entry
            feat['pstc'] = pstc_entries if isinstance(pstc, list) else pstc_entries[0]

    # except Exception as ex:
    #     logger.error('Could not read PSTCs from db!')
    #     raise Exception('SQL error!', ex)
    # log.info('looked-up=%i', no_pstc_lookups)

    return features

parse(features, foldseek_df, db_name='swissprot', has_duplicate_locus=False)

Update CDS in place with PSTC hits from foldseek_df if they pass filters.

has_duplicate_locus - some euks have multiple CDS per locus tag

Source code in src/baktfold/bakta/pstc.py
def parse(features: Sequence[dict], foldseek_df: pd.DataFrame, db_name: str = 'swissprot', has_duplicate_locus: bool = False) -> None:
    """Update CDS in place with PSTC hits from foldseek_df if they pass filters.

    has_duplicate_locus - some euks have multiple CDS per locus tag

    """ 

    if foldseek_df.empty:
        return features

    # Convert foldseek_df to a lookup table keyed by query ID
    foldseek_hits = {row['query']: row for _, row in foldseek_df.iterrows()}

    # each query maps to a list of rows now (to handle multiple CATH greedy tophits for multidomain proteins)
    foldseek_hits = defaultdict(list)
    for _, row in foldseek_df.iterrows():
        foldseek_hits[row['query']].append(row)

    updated_count = 0


    for cds in features:
        if has_duplicate_locus:
            aa_identifier = cds.get('id')
        else:
            aa_identifier = cds.get('locus')

        if aa_identifier not in foldseek_hits:
            continue  # no hits, skip

        cds_updated = False  

        # Iterate over *all* hits for this query
        for row in foldseek_hits[aa_identifier]:
            query_cov = float(row['qCov'])
            subject_cov = float(row['tCov'])
            identity = float(row['fident'])
            evalue = float(row['evalue'])
            bitscore = float(row['bitscore'])
            target_id = row['target']

            # Extract accession depending on database
            if db_name in {"swissprot", "afdb"}:
                accession = target_id.split('-')[1]
            elif db_name == "pdb":
                accession = target_id.split('-')[0]
            else:  # cath and custom
                accession = target_id

            # Apply your filters
            if (
                query_cov >= bc.MIN_PSTC_QCOVERAGE
                and subject_cov >= bc.MIN_PSTC_TCOVERAGE
                and identity >= bc.MIN_PSTC_IDENTITY
            ):
                new_pstc = {
                    'source': db_name,
                    'id': accession,
                    'query_cov': query_cov,
                    'subject_cov': subject_cov,
                    'identity': identity,
                    'score': bitscore,
                    'evalue': evalue,
                }

                # Append or initialize 'pstc'
                if 'pstc' in cds:
                    if isinstance(cds['pstc'], dict):
                        cds['pstc'] = [cds['pstc'], new_pstc]
                    elif isinstance(cds['pstc'], list):
                        cds['pstc'].append(new_pstc)
                    else:
                        cds['pstc'] = [new_pstc]
                else:
                    cds['pstc'] = [new_pstc]  # ← ensure list, since we may have many hits


                cds_updated = True  

        # Increment only once per CDS that had at least one valid hit (CATH might have multiple)
        if cds_updated:
            updated_count += 1

    logger.info(f"PSTC for {db_name} updated in place for {updated_count} CDSs")
    return features