Module backend.export.libs.lib_db_results
Classes
class DB (db_cnf: dict, job_server: str, refresh_time: int)-
Database interaction class for managing and querying a PostgreSQL database.
Initializes the DB object with the given database configuration, job server, and refresh time.
Args
db_cnf:dict- Dictionary containing database configuration details.
job_server:str- Name of the job server.
refresh_time:int- Time in hours to refresh scraped sources.
Expand source code
class DB: """Database interaction class for managing and querying a PostgreSQL database.""" db_cnf: dict """Dictionary containing the database connection configuration.""" job_server: str """Name of the job server that is processing the data.""" refresh_time: int """Time in hours to refresh scraped sources.""" def __init__(self, db_cnf: dict, job_server: str, refresh_time: int): """ Initializes the DB object with the given database configuration, job server, and refresh time. Args: db_cnf (dict): Dictionary containing database configuration details. job_server (str): Name of the job server. refresh_time (int): Time in hours to refresh scraped sources. """ self.db_cnf = db_cnf self.job_server = job_server self.refresh_time = refresh_time def __del__(self): """Destructor for the DB object, prints a message when the object is destroyed.""" print('DB object destroyed') def connect_to_db(self): """ Establishes a connection to the PostgreSQL database using psycopg2. Returns: psycopg2.extensions.connection: Database connection object. """ conn = psycopg2.connect(**self.db_cnf) return conn def insert_result_source(self, result_id, progress, created_at, job_server): """ Inserts a placeholder entry into the result_source table to track sources and avoid duplicate scraping jobs. Args: result_id (int): ID of the result. progress (int): Progress status of the scraping job. created_at (datetime): Timestamp when the entry was created. job_server (str): Job server name. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" INSERT INTO result_source (result, progress, created_at, job_server) VALUES (%s, %s, %s, %s); """, (result_id, progress, created_at, job_server)) conn.commit() conn.close() def get_sources_pending(self): """ Retrieves sources that have failed (progress = 2 or progress = -1) and have not exceeded retry limit. Returns: list: List of dictionaries containing failed sources with their details. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT source.id, source.created_at, result_source.result FROM source JOIN result_source ON result_source.source = source.id WHERE (source.progress = 2 OR source.progress = -1) AND result_source.counter < 11 """) conn.commit() sources_pending = cur.fetchall() conn.close() return sources_pending def get_source_check(self, url): """ Checks if a source with the specified URL exists and if it should be refreshed based on its creation time. Args: url (str): URL of the source. Returns: int or bool: Source ID if the source should be refreshed, otherwise False. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT id, created_at FROM source WHERE progress = 1 AND url = %s ORDER BY created_at DESC """, (url,)) conn.commit() sc = cur.fetchone() conn.close() if sc: timestamp = datetime.now() source_id = sc[0] created_at = sc[1] diff = timestamp - created_at diff_in_hours = diff.total_seconds() / 3600 if diff_in_hours < self.refresh_time: return source_id else: return False else: return False def get_source_check_by_result_id(self, result_id): """ Checks if a result has already been declared as a scraping job. Args: result_id (int): ID of the result. Returns: tuple or None: Source ID if a scraping job is declared, otherwise None. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT id FROM result_source WHERE result = %s AND progress = 2 """, (result_id,)) conn.commit() scr = cur.fetchone() conn.close() return scr def get_result_content(self, source_id): """ Retrieves content from an existing result associated with the specified source. Args: source_id (int): ID of the source. Returns: tuple: Content details of the associated result. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT result.ip, result.main, result.final_url FROM result JOIN result_source ON result_source.result = result.id WHERE result_source.source = %s """, (source_id,)) conn.commit() rc = cur.fetchone() conn.close() return rc def insert_source(self, url, progress, created_at, job_server): """ Inserts a new source into the database. Args: url (str): URL of the source. progress (int): Progress status of the source. created_at (datetime): Timestamp when the source was created. job_server (str): Job server name. Returns: int: ID of the newly inserted source. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" INSERT INTO source (url, progress, created_at, job_server) VALUES (%s, %s, %s, %s) RETURNING id; """, (url, progress, created_at, job_server)) conn.commit() lastrowid = cur.fetchone()[0] conn.close() return lastrowid def check_progress(self, url, result_id): """ Checks if a result is already declared as a scraping job for the given URL. Args: url (str): URL of the source. result_id (int): ID of the result. Returns: bool: True if the result is already declared, otherwise False. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT result_source.id FROM result_source JOIN source ON result_source.source = source.id WHERE source.url = %s AND source.progress = 2 AND result_source.result = %s """, (url, result_id)) conn.commit() check_progress = cur.fetchall() conn.close() return bool(check_progress) def update_source(self, source_id, code, bin, progress, content_type, error_code, status_code, created_at, content_dict): """ Updates the content of a source when the scraping job is completed. Args: source_id (int): ID of the source to be updated. code (int): HTTP response code. bin (bytearray): Binary content of the source. progress (int): Progress status of the source. content_type (str): MIME type of the content. error_code (int): Error code if any. status_code (int): Status code of the request. created_at (datetime): Timestamp when the source was created. content_dict (dict): Additional content details. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" UPDATE source SET code = %s, bin = %s, progress = %s, content_type = %s, error_code = %s, status_code = %s, created_at = %s, content_dict = %s WHERE id = %s """, (code, bin, progress, content_type, error_code, status_code, created_at, content_dict, source_id)) conn.commit() conn.close() def replace_source_bin(self, source_id, bin): """ Updates the binary content of a source when the scraping job is completed. Args: source_id (int): ID of the source to be updated. bin (bytearray): New binary content for the source. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" UPDATE source SET bin = %s WHERE id = %s """, (bin, source_id)) conn.commit() conn.close() def update_result(self, result_id, ip, main, final_url): """ Updates the result content when the scraping job is completed. Args: result_id (int): ID of the result to be updated. ip (str): IP address associated with the result. main (str): Main content of the result. final_url (str): Final URL after scraping. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" UPDATE result SET ip = %s, main = %s, final_url = %s WHERE id = %s """, (ip, main, final_url, result_id)) conn.commit() conn.close() def get_source_counter_result(self, result_id): """ Retrieves the counter value for a result from the result_source table. Args: result_id (int): ID of the result. Returns: int: Counter value associated with the result. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT counter FROM result_source WHERE result = %s """, (result_id,)) conn.commit() counter = cur.fetchall() conn.close() return counter[0][0] def update_result_source(self, result_id, source_id, progress, counter, created_at, job_server): """ Updates the result_source table with new details for a scraping job. Args: result_id (int): ID of the result. source_id (int): ID of the source. progress (int): Progress status of the scraping job. counter (int): Counter value for the result. created_at (datetime): Timestamp when the entry was created. job_server (str): Name of the job server. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" UPDATE result_source SET source = %s, progress = %s, counter = %s, created_at = %s, job_server = %s WHERE result = %s """, (source_id, progress, counter, created_at, job_server, result_id)) conn.commit() conn.close() def update_result_source_result(self, result_id, progress, counter, created_at): """ Updates the result_source table with new details for a result. Args: result_id (int): ID of the result. progress (int): Progress status of the scraping job. counter (int): Counter value for the result. created_at (datetime): Timestamp when the entry was created. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" UPDATE result_source SET progress = %s, counter = %s, created_at = %s WHERE result = %s """, (progress, counter, created_at, result_id)) conn.commit() conn.close() def delete_source_pending(self, source_id, progress, created_at): """ Deletes or updates pending sources that have not yet completed. Args: source_id (int): ID of the source to be updated. progress (int): New progress status of the source. created_at (datetime): Timestamp when the entry was created. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" UPDATE source SET progress = %s, created_at = %s WHERE id = %s """, (progress, created_at, source_id)) conn.commit() conn.close() def reset_result_source(self, progress, counter, created_at, source_id): """ Resets the result_source entry for a source. Args: progress (int): New progress status of the source. counter (int): New counter value for the result. created_at (datetime): Timestamp when the entry was created. source_id (int): ID of the source to be reset. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) created_at = datetime.now() cur.execute(""" UPDATE result_source SET progress = %s, counter = %s, created_at = %s WHERE source = %s """, (progress, counter, created_at, source_id)) conn.commit() conn.close() def reset(self): """ Resets all pending sources by deleting or updating them and resetting result_source entries. """ sources_pending = self.get_sources_pending() for source in sources_pending: source_id = source[0] self.delete_source_pending(source_id, source[1], source[2]) self.reset_result_source(source[1], source[2], source[2], source_id) def get_result_source(self, result_id): """ Retrieves the result_source entry for a given result ID. Args: result_id (int): ID of the result. Returns: tuple: Result source entry if exists, otherwise None. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT id FROM result_source WHERE result = %s """, (result_id,)) conn.commit() scr = cur.fetchone() conn.close() return scr def get_result_source_source(self, result_id): """ Retrieves the source ID for a given result ID. Args: result_id (int): ID of the result. Returns: int: Source ID associated with the result. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT source FROM result_source WHERE result = %s """, (result_id,)) conn.commit() scr = cur.fetchone() conn.close() return scr[0] def get_sources(self, job_server): """ Retrieves results that do not have an associated source and are eligible for scraping. Args: job_server (str): Name of the job server requesting the sources. Returns: list: List of tuples containing result IDs and URLs that are ready for scraping. """ conn = self.connect_to_db() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute(""" SELECT result.id, result.url, study.studytype FROM result JOIN study ON result.study = study.id LEFT JOIN result_source ON result_source.result = result.id WHERE (result_source.source IS NULL AND NOT EXISTS (SELECT result FROM result_source WHERE result = result.id) OR result_source.progress = 0 AND result_source.counter < 11) AND study.studytype != 6 LIMIT 5 """) conn.commit() sources = cur.fetchall() conn.close() sources_list = [] for s in sources: progress = 2 result_id = s[0] result_url = s[1] if self.get_result_source(result_id): counter = self.get_source_counter_result(result_id) counter += 1 created_at = datetime.now() self.update_result_source_result(result_id, progress, counter, created_at) else: created_at = datetime.now() self.insert_result_source(result_id, progress, created_at, job_server) sources_list.append([result_id, result_url]) return sources_list def check_db_connection(self): """ Tests the connection to the database to ensure it is active. Returns: bool: True if the connection is successful, False otherwise. """ try: conn = self.connect_to_db() conn.close() return True except: return FalseClass variables
var db_cnf : dict-
Dictionary containing the database connection configuration.
var job_server : str-
Name of the job server that is processing the data.
var refresh_time : int-
Time in hours to refresh scraped sources.
Methods
def check_db_connection(self)-
Tests the connection to the database to ensure it is active.
Returns
bool- True if the connection is successful, False otherwise.
def check_progress(self, url, result_id)-
Checks if a result is already declared as a scraping job for the given URL.
Args
url:str- URL of the source.
result_id:int- ID of the result.
Returns
bool- True if the result is already declared, otherwise False.
def connect_to_db(self)-
Establishes a connection to the PostgreSQL database using psycopg2.
Returns
psycopg2.extensions.connection- Database connection object.
def delete_source_pending(self, source_id, progress, created_at)-
Deletes or updates pending sources that have not yet completed.
Args
source_id:int- ID of the source to be updated.
progress:int- New progress status of the source.
created_at:datetime- Timestamp when the entry was created.
def get_result_content(self, source_id)-
Retrieves content from an existing result associated with the specified source.
Args
source_id:int- ID of the source.
Returns
tuple- Content details of the associated result.
def get_result_source(self, result_id)-
Retrieves the result_source entry for a given result ID.
Args
result_id:int- ID of the result.
Returns
tuple- Result source entry if exists, otherwise None.
def get_result_source_source(self, result_id)-
Retrieves the source ID for a given result ID.
Args
result_id:int- ID of the result.
Returns
int- Source ID associated with the result.
def get_source_check(self, url)-
Checks if a source with the specified URL exists and if it should be refreshed based on its creation time.
Args
url:str- URL of the source.
Returns
intorbool- Source ID if the source should be refreshed, otherwise False.
def get_source_check_by_result_id(self, result_id)-
Checks if a result has already been declared as a scraping job.
Args
result_id:int- ID of the result.
Returns
tupleorNone- Source ID if a scraping job is declared, otherwise None.
def get_source_counter_result(self, result_id)-
Retrieves the counter value for a result from the result_source table.
Args
result_id:int- ID of the result.
Returns
int- Counter value associated with the result.
def get_sources(self, job_server)-
Retrieves results that do not have an associated source and are eligible for scraping.
Args
job_server:str- Name of the job server requesting the sources.
Returns
list- List of tuples containing result IDs and URLs that are ready for scraping.
def get_sources_pending(self)-
Retrieves sources that have failed (progress = 2 or progress = -1) and have not exceeded retry limit.
Returns
list- List of dictionaries containing failed sources with their details.
def insert_result_source(self, result_id, progress, created_at, job_server)-
Inserts a placeholder entry into the result_source table to track sources and avoid duplicate scraping jobs.
Args
result_id:int- ID of the result.
progress:int- Progress status of the scraping job.
created_at:datetime- Timestamp when the entry was created.
job_server:str- Job server name.
def insert_source(self, url, progress, created_at, job_server)-
Inserts a new source into the database.
Args
url:str- URL of the source.
progress:int- Progress status of the source.
created_at:datetime- Timestamp when the source was created.
job_server:str- Job server name.
Returns
int- ID of the newly inserted source.
def replace_source_bin(self, source_id, bin)-
Updates the binary content of a source when the scraping job is completed.
Args
source_id:int- ID of the source to be updated.
bin:bytearray- New binary content for the source.
def reset(self)-
Resets all pending sources by deleting or updating them and resetting result_source entries.
def reset_result_source(self, progress, counter, created_at, source_id)-
Resets the result_source entry for a source.
Args
progress:int- New progress status of the source.
counter:int- New counter value for the result.
created_at:datetime- Timestamp when the entry was created.
source_id:int- ID of the source to be reset.
def update_result(self, result_id, ip, main, final_url)-
Updates the result content when the scraping job is completed.
Args
result_id:int- ID of the result to be updated.
ip:str- IP address associated with the result.
main:str- Main content of the result.
final_url:str- Final URL after scraping.
def update_result_source(self, result_id, source_id, progress, counter, created_at, job_server)-
Updates the result_source table with new details for a scraping job.
Args
result_id:int- ID of the result.
source_id:int- ID of the source.
progress:int- Progress status of the scraping job.
counter:int- Counter value for the result.
created_at:datetime- Timestamp when the entry was created.
job_server:str- Name of the job server.
def update_result_source_result(self, result_id, progress, counter, created_at)-
Updates the result_source table with new details for a result.
Args
result_id:int- ID of the result.
progress:int- Progress status of the scraping job.
counter:int- Counter value for the result.
created_at:datetime- Timestamp when the entry was created.
def update_source(self, source_id, code, bin, progress, content_type, error_code, status_code, created_at, content_dict)-
Updates the content of a source when the scraping job is completed.
Args
source_id:int- ID of the source to be updated.
code:int- HTTP response code.
bin:bytearray- Binary content of the source.
progress:int- Progress status of the source.
content_type:str- MIME type of the content.
error_code:int- Error code if any.
status_code:int- Status code of the request.
created_at:datetime- Timestamp when the source was created.
content_dict:dict- Additional content details.