Module backend.scraper.libs.lib_db

Database class for interacting with the database.

Args

db_cnf : dict
Dictionary containing the database configuration.

Methods

init: Initialize the DB object. del: Destructor for the DB object. connect_to_db: Connect to the database using psycopg2. get_scraper_jobs: Get scraper jobs from the database. update_scraper_job: Update the progress of a scraper job in the database. insert_result: Insert a result into the database. insert_serp: Insert a SERP (Search Engine Results Page) into the database. check_progress: Check if a result is already declared as a scraping job. check_scraper_progress: Check if a scraper job is already in progress. check_duplicate_result: Check if a result with the same URL, main, study, and scraper ID already exists. reset: Reset the progress of scraper jobs in the database. get_searchengines: Get the list of search engines from the database. update_searchengine_test: Update the test field of a search engine in the database.

Classes

class DB (db_cnf: dict)

Initialize the DB object.

Args

db_cnf : dict
Dictionary containing the database configuration.
Expand source code
class DB:
    def __init__(self, db_cnf: dict):
        """
        Initialize the DB object.

        Args:
            db_cnf (dict): Dictionary containing the database configuration.
        """        
        self.db_cnf = db_cnf

    def __del__(self):
        """
        Destructor for the DB object.
        """        
        print('DB Controller object destroyed')

    def connect_to_db(self):
        """
        Connect to the database using psycopg2
        """
        conn =  psycopg2.connect(**self.db_cnf)
        return conn

    def get_scraper_jobs(self):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND progress = 0 and counter < 11 ORDER BY scraper.id ASC  LIMIT 2")
        conn.commit()
        scraper_jobs = cur.fetchall()
        conn.close()
        return scraper_jobs
    
    def get_all_open_scraper_jobs(self):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND progress = 0 and counter < 11 ORDER BY scraper.id ASC")
        conn.commit()
        scraper_jobs = cur.fetchall()
        conn.close()
        return scraper_jobs    
    
    def get_scraper_jobs_searchengine(self, searchengine):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND progress = 0 and counter < 11 AND scraper.searchengine = %s ORDER BY scraper.id ASC", (searchengine,))
        conn.commit()
        scraper_jobs = cur.fetchall()
        conn.close()
        return scraper_jobs    
    
    def get_scraper_jobs_filter_searchengine(self, searchengine):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND progress = 0 and counter < 11 AND scraper.searchengine != %s ORDER BY scraper.id ASC  LIMIT 2", (searchengine,))
        conn.commit()
        scraper_jobs = cur.fetchall()
        conn.close()
        return scraper_jobs     
    
    
    def get_scraper_job(self, job_id):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND scraper.id = %s ORDER BY scraper.id ASC  LIMIT 2", (job_id,))
        conn.commit()
        scraper_jobs = cur.fetchall()
        conn.close()
        return scraper_jobs    

    def get_failed_scraper_jobs(self):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND progress = -1 and counter < 11")
        conn.commit()
        failed_scraper_jobs = cur.fetchall()
        conn.close()
        return failed_scraper_jobs
    
    def get_failed_scraper_jobs_server(self, job_server):
        """
        Get scraper jobs from the database.

        Returns:
            list: List of scraper jobs.
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("SELECT DISTINCT scraper.id AS scraper_id, scraper.searchengine, scraper.study, scraper.counter, scraper.query AS query_id, scraper.limit, query.query, searchengine.module FROM scraper, searchengine, query, searchengine_study WHERE scraper.searchengine = searchengine.id AND query.id = scraper.query AND searchengine_study.searchengine = searchengine.id AND progress = -1 and counter < 11 and scraper.job_server = %s", (job_server,))
        conn.commit()
        failed_scraper_jobs = cur.fetchall()
        conn.close()
        return failed_scraper_jobs    

    def update_scraper_job(self, progress, counter, error_code, job_server, scraper_id, created_at):
        """
        Update the progress of a scraper job in the database.

        Args:
            progress (int): Progress value.
            counter (int): Counter value.
            error_code (str): Error code.
            job_server (str): Job server.
            scraper_id (int): Scraper ID.

        Returns:
            None
        """        
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("Update scraper SET progress=%s, counter=%s, error_code=%s, job_server=%s, created_at=%s WHERE id = %s", (progress, counter, error_code, job_server, created_at, scraper_id))
        conn.commit()
        conn.close()

    def insert_result(self, title, description, url, position, created_at, main, ip, study, scraper, query, serp):
        """
        Insert a result into the database.

        Args:
            title (str): Title of the result.
            description (str): Description of the result.
            url (str): URL of the result.
            position (int): Position of the result.
            created_at (datetime): Creation timestamp of the result.
            main (bool): Flag indicating if it's a main result.
            ip (str): IP address of the result.
            study (int): Study ID.
            scraper (int): Scraper ID.
            query (int): Query ID.
            serp (int): SERP ID.

        Returns:
            None
        """
        #result: id     title   description     url     position        created_at      main    ip      origin  imported        study   scraper old_id  resulttype      monitoring      serp    query   final_url
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("INSERT INTO result (title, description, url, position, created_at, main, ip, study, scraper, query, serp) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", (title, description, url, position, created_at, main, ip, study, scraper, query, serp))
        conn.commit()
        conn.close()

    def insert_serp(self, scraper, page, code, img, created_at, query):
        """
        Insert a SERP (Search Engine Results Page) into the database.

        Args:
            scraper (int): Scraper ID.
            page (int): Page number.
            code (str): Code of the SERP.
            img (str): Image of the SERP.
            created_at (datetime): Creation timestamp of the SERP.
            query (int): Query ID.

        Returns:
            int: ID of the inserted SERP.
        """
        #serp id        scraper page    code    img     progress        created_at      old_id  monitoring      query
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT id from serp where scraper = %s and page = %s and query = %s",(scraper, page, query,))
        conn.commit()
        serp = cur.fetchone()

        if not serp:
            cur.execute("INSERT INTO serp (scraper, page, code, img, created_at, query) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id;", (scraper, page, code, img, created_at, query))
            conn.commit()
            serp = cur.fetchone()

        conn.close()
        return serp

    def check_progress(self, study, query_id):
        """
        Check if a result is already declared as a scraping job.

        Args:
            study (int): Study ID.
            query_id (int): Query ID.

        Returns:
            bool: True if the progress is 2, False otherwise.
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT scraper.id FROM scraper WHERE study = %s AND query = %s AND progress = 2", (study, query_id))
        conn.commit()
        check_progress = cur.fetchall()
        conn.close()
        if check_progress:
            return True
        else:
            return False

    def check_scraper_progress(self, scraper_id):
        """
        Check if a scraper job is already in progress.

        Args:
            scraper_id (int): Scraper ID.

        Returns:
            bool: True if the progress is 2, False otherwise.
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT scraper.id FROM scraper WHERE id = %s AND progress = 2", (scraper_id,))
        conn.commit()
        check_progress = cur.fetchall()
        conn.close()
        if check_progress:
            return True
        else:
            return False


    def check_duplicate_result(self, url, main, study, scraper_id, position):
        """
        Check if a result with the same URL, main, study, and scraper ID already exists.

        Args:
            url (str): URL of the result.
            main (bool): Flag indicating if it's a main result.
            study (int): Study ID.
            scraper_id (int): Scraper ID.

        Returns:
            bool: True if a duplicate result exists, False otherwise.
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT id FROM result WHERE url = %s AND main = %s AND study = %s AND scraper = %s AND position = %s", (url, main, study, scraper_id, position))
        conn.commit()
        check_progress = cur.fetchall()
        conn.close()
        if check_progress:
            return True
        else:
            return False

    def reset(self, job_server):
        """
        Reset the progress of scraper jobs in the database.

        Returns:
            None
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("Update scraper SET progress=0 WHERE (progress = -1 OR progress = 2) and counter < 11 and created_at < NOW() - INTERVAL '30 minutes' and job_server = %s", (job_server,))
        conn.commit()
        conn.close()
        
    def get_searchengines(self):
        """
        Get the list of search engines from the database.

        Returns:
            list: List of search engines.
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT * from searchengine ORDER BY id ASC")
        conn.commit()
        searchengines = cur.fetchall()
        conn.close()   
        return searchengines

    def get_failed_searchengines(self):
        """
        Get the list of search engines from the database.

        Returns:
            list: List of search engines.
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT * from searchengine WHERE test = -1 ORDER BY id ASC")
        conn.commit()
        searchengines = cur.fetchall()
        conn.close()   
        return searchengines
    
    def update_searchengine_test(self, se_id, test):     
        """
        Update the test field of a search engine in the database.

        Args:
            se_id (int): Search engine ID.
            test (str): Test value.

        Returns:
            None
        """
        conn = DB.connect_to_db(self)
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("Update searchengine SET test=%s WHERE id = %s", (test, se_id))
        conn.commit()
        conn.close()

Methods

def check_duplicate_result(self, url, main, study, scraper_id, position)

Check if a result with the same URL, main, study, and scraper ID already exists.

Args

url : str
URL of the result.
main : bool
Flag indicating if it's a main result.
study : int
Study ID.
scraper_id : int
Scraper ID.

Returns

bool
True if a duplicate result exists, False otherwise.
def check_progress(self, study, query_id)

Check if a result is already declared as a scraping job.

Args

study : int
Study ID.
query_id : int
Query ID.

Returns

bool
True if the progress is 2, False otherwise.
def check_scraper_progress(self, scraper_id)

Check if a scraper job is already in progress.

Args

scraper_id : int
Scraper ID.

Returns

bool
True if the progress is 2, False otherwise.
def connect_to_db(self)

Connect to the database using psycopg2

def get_all_open_scraper_jobs(self)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_failed_scraper_jobs(self)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_failed_scraper_jobs_server(self, job_server)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_failed_searchengines(self)

Get the list of search engines from the database.

Returns

list
List of search engines.
def get_scraper_job(self, job_id)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_scraper_jobs(self)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_scraper_jobs_filter_searchengine(self, searchengine)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_scraper_jobs_searchengine(self, searchengine)

Get scraper jobs from the database.

Returns

list
List of scraper jobs.
def get_searchengines(self)

Get the list of search engines from the database.

Returns

list
List of search engines.
def insert_result(self, title, description, url, position, created_at, main, ip, study, scraper, query, serp)

Insert a result into the database.

Args

title : str
Title of the result.
description : str
Description of the result.
url : str
URL of the result.
position : int
Position of the result.
created_at : datetime
Creation timestamp of the result.
main : bool
Flag indicating if it's a main result.
ip : str
IP address of the result.
study : int
Study ID.
scraper : int
Scraper ID.
query : int
Query ID.
serp : int
SERP ID.

Returns

None

def insert_serp(self, scraper, page, code, img, created_at, query)

Insert a SERP (Search Engine Results Page) into the database.

Args

scraper : int
Scraper ID.
page : int
Page number.
code : str
Code of the SERP.
img : str
Image of the SERP.
created_at : datetime
Creation timestamp of the SERP.
query : int
Query ID.

Returns

int
ID of the inserted SERP.
def reset(self, job_server)

Reset the progress of scraper jobs in the database.

Returns

None

def update_scraper_job(self, progress, counter, error_code, job_server, scraper_id, created_at)

Update the progress of a scraper job in the database.

Args

progress : int
Progress value.
counter : int
Counter value.
error_code : str
Error code.
job_server : str
Job server.
scraper_id : int
Scraper ID.

Returns

None

def update_searchengine_test(self, se_id, test)

Update the test field of a search engine in the database.

Args

se_id : int
Search engine ID.
test : str
Test value.

Returns

None