Module backend.classifier.libs.lib_db

Classes

class DB (db_cnf: dict)

Database class to handle various database operations.

Attributes

db_cnf : dict
Dictionary containing database configuration.

Initialize the DB object with database configuration.

Args

db_cnf : dict
Database configuration dictionary.
Expand source code
class DB:
    """
    Database class to handle various database operations.

    Attributes:
        db_cnf (dict): Dictionary containing database configuration.
    """
    db_cnf: dict

    def __init__(self, db_cnf: dict):
        """
        Initialize the DB object with database configuration.

        Args:
            db_cnf (dict): Database configuration dictionary.
        """
        self.db_cnf = db_cnf

    def __del__(self):
        """Destroy Database object and print a message."""
        print('DB object destroyed')

    def connect_to_db(self):
        """
        Context manager for database connection.

        Returns:
            ConnectionManager: A context manager for database connections.
        """
        class ConnectionManager:
            def __init__(self, db_cnf):
                self.db_cnf = db_cnf
                self.conn = None

            def __enter__(self):
                self.conn = psycopg2.connect(**self.db_cnf)
                return self.conn

            def __exit__(self, exc_type, exc_val, exc_tb):
                if self.conn:
                    self.conn.close()

        return ConnectionManager(self.db_cnf)

    def get_classifiers(self):
        """
        Get the classifiers from the database.

        Returns:
            list: List of classifiers with their IDs and names.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=RealDictCursor)
            cur.execute("SELECT classifier.id, classifier.name FROM classifier")
            conn.commit()
            classifiers = cur.fetchall()
        return classifiers

    def get_search_engines(self, results):
        """
        Get search engines for results.

        Args:
            results (list): List of results.

        Returns:
            list: Updated list of results with search engines.
        """
        with self.connect_to_db() as conn:
            for result in results:
                result_id = result['id']
                cur = conn.cursor(cursor_factory=RealDictCursor)
                cur.execute("SELECT searchengine.name FROM result, scraper, searchengine WHERE result.scraper = scraper.id AND scraper.searchengine = searchengine.id AND result.id = %s", (result_id,))
                conn.commit()
                searchengine = cur.fetchone()
                result['searchengine'] = searchengine['name'] if searchengine else "N/A"
        return results

    def get_queries(self, results):
        """
        Get queries for results.

        Args:
            results (list): List of results.

        Returns:
            list: Updated list of results with queries.
        """
        with self.connect_to_db() as conn:
            for result in results:
                result_id = result['id']
                cur = conn.cursor(cursor_factory=RealDictCursor)
                cur.execute("SELECT query.query FROM query, result WHERE result.query = query.id AND result.id = %s", (result_id,))
                conn.commit()
                query = cur.fetchone()
                result['query'] = query['query'] if query else "N/A"
        return results

    def get_results(self, classifier_id):
        """
        Get the results for a given classifier ID.

        Args:
            classifier_id (int): ID of the classifier.

        Returns:
            list: List of results for the given classifier ID.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=RealDictCursor)
            cur.execute("""
                SELECT result.id, result.url, result.main, result.position, result.title, result.description, result.ip, 
                       result.final_url, source.code, source.bin, source.content_type, source.error_code, source.status_code, 
                       result_source.source 
                FROM result, source, result_source, classifier_study 
                WHERE result.study = classifier_study.study AND classifier_study.classifier = %s 
                      AND result_source.result = result.id AND result_source.source = source.id 
                      AND (source.progress = 1 OR source.progress = -1) 
                      AND result.id NOT IN (SELECT classifier_result.result FROM classifier_result WHERE classifier_result.classifier = %s) 
                ORDER BY result.created_at, result.id 
                LIMIT 10
            """, (classifier_id, classifier_id))
            conn.commit()
            results = cur.fetchall()
        results = self.get_search_engines(results)
        results = self.get_queries(results)
        return results

    def insert_classification_result(self, classifier_id, value, result, job_server):
        """
        Insert a classification result into the database.

        Args:
            classifier_id (int): ID of the classifier.
            value (str): Value of the classification.
            result (int): ID of the result.

        Returns:
            None
        """
        try:
            created_at = datetime.now()
            with self.connect_to_db() as conn:
                cur = conn.cursor(cursor_factory=DictCursor)
                cur.execute("INSERT INTO classifier_result (classifier, value, result, created_at, job_server) VALUES (%s, %s, %s, %s, %s);", 
                            (classifier_id, value, result, created_at, job_server))
                conn.commit()
        except Exception as e:
            print(f"Error inserting classification result: {e}")

    def insert_indicator(self, indicator, value, classifier_id, result, job_server):
        """
        Insert an indicator into the database.

        Args:
            indicator (str): Indicator name.
            value (str): Value of the indicator.
            classifier_id (int): ID of the classifier.
            result (int): ID of the result.

        Returns:
            None
        """
        try:
            created_at = datetime.now()
            with self.connect_to_db() as conn:
                cur = conn.cursor(cursor_factory=DictCursor)
                cur.execute("INSERT INTO classifier_indicator (indicator, value, classifier, result, created_at, job_server) VALUES (%s, %s, %s, %s, %s, %s);", 
                            (indicator, value, classifier_id, result, created_at, job_server))
                conn.commit()
        except Exception as e:
            print(f"Error inserting indicator: {e}")

    def update_classification_result(self, classifier_id, value, result):
        """
        Update a classification result in the database.

        Args:
            classifier_id (int): ID of the classifier.
            value (str): Updated value of the classification.
            result (int): ID of the result.

        Returns:
            None
        """
        try:
            created_at = datetime.now()
            with self.connect_to_db() as conn:
                cur = conn.cursor(cursor_factory=DictCursor)
                cur.execute("UPDATE classifier_result SET classifier=%s, value=%s, created_at=%s WHERE result = %s", 
                            (classifier_id, value, created_at, result))
                conn.commit()
        except Exception as e:
            print(f"Error updating classification result: {e}")

    def reset_classifiers(self, result):
        """
        Reset the classifiers for a given result.

        Args:
            result (int): ID of the result.

        Returns:
            None
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("DELETE FROM classifier_indicator WHERE result = %s", (result,))
            cur.execute("DELETE FROM classifier_result WHERE result = %s", (result,))
            cur.execute("DELETE FROM classifier_result WHERE value = 'in process'", (result,))
            conn.commit()

    def reset(self, job_server):
        """
       Reset any unfinished classifiers in the database if not all indicators could be resolved.

        Returns:
            None
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=RealDictCursor)
            cur.execute("SELECT classifier_indicator.result FROM classifier_indicator, classifier_result WHERE classifier_indicator.job_server = %s AND classifier_indicator.result NOT IN (SELECT classifier_result.result FROM classifier_result) GROUP BY classifier_indicator.result", (job_server,))
            conn.commit()
            values = cur.fetchall()
        for v in values:
            result = v['result']
            self.reset_classifiers(result)

    def check_classification_result(self, classifier, result):
        """
        Check if a result is already declared as a scraping job.

        Args:
            classifier (int): ID of the classifier.
            result (int): ID of the result.

        Returns:
            bool: True if the result is already declared, False otherwise.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("SELECT id FROM classifier_result WHERE classifier = %s AND result = %s AND value !='in process'", 
                        (classifier, result))
            conn.commit()
            check_progress = cur.fetchall()
        return bool(check_progress)

    def check_indicator_result(self, classifier, result):
        """
        Check if a result is already declared as a classifier job.

        Args:
            classifier (int): ID of the classifier.
            result (int): ID of the result.

        Returns:
            bool: True if the result is already declared, False otherwise.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("SELECT id FROM classifier_indicator WHERE classifier = %s AND result = %s", 
                        (classifier, result))
            conn.commit()
            check_progress = cur.fetchall()
        return bool(check_progress)

    def check_source_dup(self, source):
        """
        Check for duplicate sources in the database.

        Args:
            source (int): ID of the source.

        Returns:
            list: List of duplicate sources.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("SELECT * FROM result_source WHERE source = %s", (source,))
            conn.commit()
            check_source = cur.fetchall()
        return check_source

    def duplicate_classification_result(self, source):
        """
        Get duplicate classification results for a given source.

        Args:
            source (int): ID of the source.

        Returns:
            list: List of duplicate classification results.
        """
        return self.get_results_result_source(source)

    def get_results_result_source(self, source):
        """
        Get results for a given source.

        Args:
            source (int): ID of the source.

        Returns:
            list: List of results for the given source.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("SELECT result FROM result_source WHERE source = %s", (source,))
            conn.commit()
            result_sources = cur.fetchall()
        return result_sources

    def get_classifier_result(self, result):
        """
        Get the classifier result for a given result ID.

        Args:
            result (int): ID of the result.

        Returns:
            list: List of classifier results for the given result ID.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("SELECT value FROM classifier_result WHERE result = %s and value !='in process'", 
                        (result,))
            conn.commit()
            result_sources = cur.fetchall()
        return result_sources

    def get_indicators(self, result):
        """
        Get the indicators for a given result ID.

        Args:
            result (int): ID of the result.

        Returns:
            list: List of indicators for the given result ID.
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("SELECT * FROM classifier_indicator WHERE result = %s", (result,))
            conn.commit()
            result_indicators = cur.fetchall()
        return result_indicators

    def deleteClassifierDuplicates(self):
        """
        Delete duplicate classifier results.

        Returns:
            None
        """
        with self.connect_to_db() as conn:
            cur = conn.cursor(cursor_factory=DictCursor)
            cur.execute("""
                DELETE FROM classifier_result 
                WHERE id IN (
                    SELECT id FROM (
                        SELECT id, ROW_NUMBER() OVER (PARTITION BY result ORDER BY id) AS row_num 
                        FROM classifier_result
                    ) t WHERE t.row_num > 1
                );
            """)
            conn.commit()

    def check_db_connection(self):
        """
        Test the database connection.

        Returns:
            bool: True if the connection is successful, False otherwise.
        """
        try:
            with self.connect_to_db():
                return True
        except Exception as e:
            print(f"Error checking DB connection: {e}")
            return False

Class variables

var db_cnf : dict

Methods

def check_classification_result(self, classifier, result)

Check if a result is already declared as a scraping job.

Args

classifier : int
ID of the classifier.
result : int
ID of the result.

Returns

bool
True if the result is already declared, False otherwise.
def check_db_connection(self)

Test the database connection.

Returns

bool
True if the connection is successful, False otherwise.
def check_indicator_result(self, classifier, result)

Check if a result is already declared as a classifier job.

Args

classifier : int
ID of the classifier.
result : int
ID of the result.

Returns

bool
True if the result is already declared, False otherwise.
def check_source_dup(self, source)

Check for duplicate sources in the database.

Args

source : int
ID of the source.

Returns

list
List of duplicate sources.
def connect_to_db(self)

Context manager for database connection.

Returns

ConnectionManager
A context manager for database connections.
def deleteClassifierDuplicates(self)

Delete duplicate classifier results.

Returns

None

def duplicate_classification_result(self, source)

Get duplicate classification results for a given source.

Args

source : int
ID of the source.

Returns

list
List of duplicate classification results.
def get_classifier_result(self, result)

Get the classifier result for a given result ID.

Args

result : int
ID of the result.

Returns

list
List of classifier results for the given result ID.
def get_classifiers(self)

Get the classifiers from the database.

Returns

list
List of classifiers with their IDs and names.
def get_indicators(self, result)

Get the indicators for a given result ID.

Args

result : int
ID of the result.

Returns

list
List of indicators for the given result ID.
def get_queries(self, results)

Get queries for results.

Args

results : list
List of results.

Returns

list
Updated list of results with queries.
def get_results(self, classifier_id)

Get the results for a given classifier ID.

Args

classifier_id : int
ID of the classifier.

Returns

list
List of results for the given classifier ID.
def get_results_result_source(self, source)

Get results for a given source.

Args

source : int
ID of the source.

Returns

list
List of results for the given source.
def get_search_engines(self, results)

Get search engines for results.

Args

results : list
List of results.

Returns

list
Updated list of results with search engines.
def insert_classification_result(self, classifier_id, value, result, job_server)

Insert a classification result into the database.

Args

classifier_id : int
ID of the classifier.
value : str
Value of the classification.
result : int
ID of the result.

Returns

None

def insert_indicator(self, indicator, value, classifier_id, result, job_server)

Insert an indicator into the database.

Args

indicator : str
Indicator name.
value : str
Value of the indicator.
classifier_id : int
ID of the classifier.
result : int
ID of the result.

Returns

None

def reset(self, job_server)

Reset any unfinished classifiers in the database if not all indicators could be resolved.

Returns: None

def reset_classifiers(self, result)

Reset the classifiers for a given result.

Args

result : int
ID of the result.

Returns

None

def update_classification_result(self, classifier_id, value, result)

Update a classification result in the database.

Args

classifier_id : int
ID of the classifier.
value : str
Updated value of the classification.
result : int
ID of the result.

Returns

None