Source code for fepops.fepops_persistent.fepopsdb_sqlite

import bz2
import sqlite3
from pathlib import Path
from typing import Union

import numpy as np
from rdkit import Chem

from fepops.fepops import GetFepopStatusCode

from .fepops_persistent_abc import FepopsPersistentAbstractBaseClass


[docs]class FepopsDBSqlite(FepopsPersistentAbstractBaseClass): """FepopsDBSqlite - allows reading and writing to a sqlite cache/database""" def __init__( self, database_file: Union[str, Path], kmeans_method: str = "sklearn", parallel: bool = True, n_jobs: int = -1, ): """FepopsDBSqlite constructor Allows reading and writing to a sqlite file in place of a database/cache Parameters ---------- database_file : Union[str, Path] Filename as a Path or string denoting where the file is, or should be created kmeans_method : str, optional KMeans method which should be used by the OpenFEPOPS object, by default "sklearn" parallel : bool, optional Run in parallel (using joblib), by default True n_jobs : int, optional Number of jobs to be spawned with joblib. If -1, then use all available cores. By default -1 """ super().__init__( database_file=database_file, kmeans_method=kmeans_method, parallel=parallel, n_jobs=n_jobs, ) if not self.database_file.exists(): print(f"Database {self.database_file} not found, a new one will be created") self._register_sqlite_adaptors() self.con = sqlite3.connect( self.database_file, detect_types=sqlite3.PARSE_DECLTYPES ) self.cur = self.con.cursor() res = self.cur.execute("SELECT name FROM sqlite_master") if res.fetchone() is None: print(f"Creating new table in {self.database_file}") self.cur.execute( "CREATE TABLE fepops_lookup_table(cansmi text primary key, fepops array)" ) def _register_sqlite_adaptors(self) -> None: """Function to allow simple read/write of numpy arrays to sqlite DB""" def adapt_array(nparray): """ Adapted from http://stackoverflow.com/a/31312102/190597 (SoulNibbler) """ return sqlite3.Binary(bz2.compress(nparray.tobytes())) def convert_array(text): return np.frombuffer(bz2.decompress(text)) sqlite3.register_adapter(np.ndarray, adapt_array) sqlite3.register_converter("array", convert_array)
[docs] def add_fepop(self, rdkit_canonical_smiles: str, fepops: Union[np.ndarray, None]): """Add a FEPOP to the database using the supplied SMILES as a key Parameters ---------- rdkit_canonical_smiles : str Canonical SMILES string generated by RDKit which represents the molecule used to generate the FEPOPS fepops : Union[np.ndarray, None] Array containing calculated FEPOPS descriptors. If None, then None is stored in the database, which is useful for indicating that the canonical SMILES supplied did not succeed in generating a molecule and subsequent FEPOPS. Marking these difficult SMILES in the database means they can be checked and ignored without further time being spent to regenerate them again. """ if fepops is None: fepops = np.array([np.NaN]) super().add_fepop(rdkit_canonical_smiles=rdkit_canonical_smiles, fepops=fepops) if not self.fepop_exists(rdkit_canonical_smiles=rdkit_canonical_smiles): self.cur.execute( "insert into fepops_lookup_table (cansmi, fepops) values (?,?)", (rdkit_canonical_smiles, fepops), ) self.con.commit()
[docs] def fepop_exists(self, rdkit_canonical_smiles: str) -> bool: """Check if Fepop exists in the database If the fepops object was constructed with a database file, then query if the supplied canonical SMILES is included. If no database is present, then False is returned, as if it is not included. Parameters ---------- rdkit_canonical_smiles : str Canonical smiles to check Returns ------- bool True if supplied canonical smiles exists in the database """ if self.database_file is None: return False res = self.cur.execute( f"""SELECT EXISTS(SELECT 1 FROM fepops_lookup_table WHERE cansmi="{rdkit_canonical_smiles}" LIMIT 1);""" ) found = res.fetchone() if found[0] != 1: return False return True
[docs] def get_fepops(self, smiles, is_canonical=False) -> Union[np.ndarray, None]: """Get FEPOPS from the database for a given SMILES Parameters ---------- smiles : str The SMILES string of the molecule is_canonical : bool, optional If True, then we guarantee that the SMILES string supplied is canonical and generated by RDKit and in which case, we may skip a cleaning step, by default False Returns ------- Union[np.ndarray, None] Returns an array representing the retrieved FEPOPS, or None if None was stored in the database under the supplied SMILES key """ super().get_fepops(smiles=smiles) if isinstance(smiles, str): if is_canonical: rdkit_canonical_smiles = smiles mol = None else: rdkit_canonical_smiles, mol = self._get_can_smi_mol_tuple( smiles, smiles_guaranteed_rdkit_canonical=is_canonical ) elif isinstance(smiles, Chem.rdchem.Mol): mol = smiles rdkit_canonical_smiles = Chem.MolToSmiles(mol) else: # At this point is is guaranteed to be np.ndarray (type checking # performed by super), so just return the array (smiles) and # success. return GetFepopStatusCode.SUCCESS, smiles if self.fepop_exists(rdkit_canonical_smiles): res = self.cur.execute( f"""SELECT fepops FROM fepops_lookup_table where cansmi="{rdkit_canonical_smiles}" """ ) fepop = res.fetchone()[0] if np.isnan(fepop).any(): return GetFepopStatusCode.FAILED_RETRIEVED_NONE, None else: return GetFepopStatusCode.SUCCESS, fepop.reshape( -1, ( self.openfepops_object.num_centroids_per_fepop * self.openfepops_object.num_features_per_fepop ) + self.openfepops_object.num_distances_per_fepop, ) else: if mol is None: mol = rdkit_canonical_smiles status, fepops_descriptors = self.openfepops_object.get_fepops(mol) if status == GetFepopStatusCode.SUCCESS: self.add_fepop( rdkit_canonical_smiles=rdkit_canonical_smiles, fepops=fepops_descriptors, ) return status, fepops_descriptors else: return status, None