From fdba5af54d198d535581616b6bc8f40cf521e666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 3 Jul 2025 14:07:23 +0200 Subject: [PATCH] fix(integrations): store integration results before emit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- crowdgit/cm_maintainers_data/cm_database.py | 52 +++++++++++++--- crowdgit/get_bad_commits.py | 24 ++++++-- crowdgit/ingest.py | 68 ++++++++++++++++++--- crowdgit/maintainers.py | 3 +- crowdgit/server.py | 23 +++---- 5 files changed, 128 insertions(+), 42 deletions(-) diff --git a/crowdgit/cm_maintainers_data/cm_database.py b/crowdgit/cm_maintainers_data/cm_database.py index 935c3af..1a000a1 100644 --- a/crowdgit/cm_maintainers_data/cm_database.py +++ b/crowdgit/cm_maintainers_data/cm_database.py @@ -8,8 +8,29 @@ load_dotenv() +_read_conn = None +_write_conn = None + + +async def close_db_connections(): + global _read_conn, _write_conn + + logger.info("Closing database connections") + + if _read_conn: + await _read_conn.close() + if _write_conn: + await _write_conn.close() + async def get_db_connection(is_read_operation: bool = True): + global _read_conn, _write_conn + + if is_read_operation and _read_conn: + return _read_conn + elif not is_read_operation and _write_conn: + return _write_conn + db_params = { "database": os.getenv("DB_DATABASE"), "user": os.getenv("DB_USER"), @@ -30,17 +51,20 @@ async def get_db_connection(is_read_operation: bool = True): raise ValueError( f"The following environment variables are not set: {', '.join(missing_env_vars)}" ) - return await asyncpg.connect(**db_params) + + if is_read_operation: + _read_conn = await asyncpg.connect(**db_params) + return _read_conn + else: + _write_conn = await asyncpg.connect(**db_params) + return _write_conn async def query(sql: str, params: tuple = None) -> List[Dict[str, Any]]: try: conn = await get_db_connection(is_read_operation=True) - try: - results = await conn.fetch(sql, *params) if params else await conn.fetch(sql) - return [dict(row) for row in results] - finally: - await conn.close() + results = await conn.fetch(sql, *params) if params else await conn.fetch(sql) + return [dict(row) for row in results] except Exception as error: logger.error(f"Error executing query: {error}") raise @@ -49,10 +73,18 @@ async def query(sql: str, params: tuple = None) -> List[Dict[str, Any]]: async def execute(sql: str, params: tuple = None) -> None: try: conn = await get_db_connection(is_read_operation=False) - try: - await conn.execute(sql, *params) if params else await conn.execute(sql) - finally: - await conn.close() + await conn.execute(sql, *params) if params else await conn.execute(sql) except Exception as error: logger.error(f"Error executing query: {error}") raise + + +async def batch_insert(sql: str, records: List[Any], batch_size=100) -> None: + try: + conn = await get_db_connection(is_read_operation=False) + for i in range(0, len(records), batch_size): + batch = records[i : i + batch_size] + await conn.executemany(sql, batch) + except Exception as error: + logger.error(f"Error executing batch insert: {error}") + raise diff --git a/crowdgit/get_bad_commits.py b/crowdgit/get_bad_commits.py index fa47681..76bc10f 100644 --- a/crowdgit/get_bad_commits.py +++ b/crowdgit/get_bad_commits.py @@ -1,9 +1,11 @@ +import asyncio from git import Repo, Git from pprint import pprint as pp import os import requests import json from tqdm import tqdm +from crowdgit.cm_maintainers_data.cm_database import close_db_connections from crowdgit.ingest import Queue from dotenv import load_dotenv import time @@ -40,7 +42,7 @@ def send_api_call(endpoint, body=None, method="POST"): return False -def get_commit_info(repo, segment_id, integration_id, remote, commit_id, repo_path="."): +async def get_commit_info(repo, segment_id, integration_id, remote, commit_id, repo_path="."): try: commit = repo.commit(commit_id) except Exception as e: @@ -114,10 +116,10 @@ def get_commit_info(repo, segment_id, integration_id, remote, commit_id, repo_pa queue = Queue() - return queue.send_messages(segment_id, integration_id, [activity]) + return await queue.send_messages(segment_id, integration_id, [activity]) -def parse_commit_file(commit_file_path, repo_path): +async def parse_commit_file(commit_file_path, repo_path): with open(commit_file_path, "r") as f: commit_data = f.read() @@ -147,7 +149,7 @@ def parse_commit_file(commit_file_path, repo_path): bad_commits = [] for commit in tqdm(commits): commit_id = commit.split("\n")[0] - commit_info = get_commit_info( + commit_info = await get_commit_info( repo, segment_id, integration_id, remote, commit_id, repo_path ) if not commit_info: @@ -155,14 +157,14 @@ def parse_commit_file(commit_file_path, repo_path): return bad_commits -def main(): +async def process_bad_commits(): import glob from crowdgit import LOCAL_DIR commit_files = glob.glob(f"{LOCAL_DIR}/bad-commits/[!DONE]*.txt") for commit_file_path in commit_files: repo_path = f"{LOCAL_DIR}/repos/" + os.path.basename(commit_file_path).replace(".txt", "") - parse_commit_file(commit_file_path, repo_path) + await parse_commit_file(commit_file_path, repo_path) os.rename( commit_file_path, f"{LOCAL_DIR}/bad-commits/DONE_" @@ -172,5 +174,15 @@ def main(): ) +async def run(): + async with asyncio.TaskGroup() as tg: + tg.create_task(process_bad_commits) + await close_db_connections() + + +def main(): + asyncio.run(run()) + + if __name__ == "__main__": main() diff --git a/crowdgit/ingest.py b/crowdgit/ingest.py index 08a1653..ad8088e 100644 --- a/crowdgit/ingest.py +++ b/crowdgit/ingest.py @@ -5,6 +5,7 @@ It will prepare the activites (which will include either cloning it to ENV[REPO_DIR] if it has not been cloned yet) and send them to SQS. """ +import asyncio import os import json from datetime import datetime @@ -22,6 +23,12 @@ from crowdgit.logger import get_logger +from crowdgit.cm_maintainers_data.cm_database import ( + batch_insert, + close_db_connections, + execute as db_execute, +) + logger = get_logger(__name__) @@ -88,16 +95,16 @@ def __init__(self): """ Initialise class to handle SQS requests. """ - self.kafka_topic = os.environ['KAFKA_TOPIC'] + self.kafka_topic = os.environ["KAFKA_TOPIC"] self.kafka_producer = Producer( { - 'bootstrap.servers': os.environ['KAFKA_BROKERS'], - 'client.id': 'git-integration', - **json.loads(os.environ['KAFKA_CONFIG']), + "bootstrap.servers": os.environ["KAFKA_BROKERS"], + "client.id": "git-integration", + **json.loads(os.environ["KAFKA_CONFIG"]), } ) - def send_messages( + async def send_messages( self, segment_id: str, integration_id: str, @@ -116,13 +123,17 @@ def send_messages( operation = "upsert_activities_with_members" - def get_body_json(record): + data_to_insert = [] + payloads_to_emit = [] + + def get_body_json(result_id, record): body = json.dumps( { "type": "create_and_process_activity_result", "tenantId": os.environ["TENANT_ID"], "segmentId": segment_id, "integrationId": integration_id, + "resultId": result_id, "activityData": record, }, default=string_converter, @@ -142,6 +153,7 @@ def get_body_json(record): "tenantId": os.environ["TENANT_ID"], "segmentId": segment_id, "integrationId": integration_id, + "resultId": result_id, "activityData": record, }, default=string_converter, @@ -151,6 +163,24 @@ def get_body_json(record): return body + for record in records: + result_id = str(uuid()) + record["segmentId"] = segment_id + + payloads_to_emit.append(get_body_json(result_id, record)) + data_to_insert.append( + { + "id": result_id, + "state": "pending", + "tenantId": os.environ["TENANT_ID"], + "integrationId": integration_id, + "data": { + "type": "activity", + "data": record, + }, + } + ) + platform = "git" responses = [] @@ -159,6 +189,14 @@ def get_body_json(record): else: commits_iter = records + await batch_insert( + """ + insert into integration.results(id, state, data, "tenantId", "integrationId") + values($1, $2, $3, $4, $5) + """, + records, + ) + for record in commits_iter: deduplication_id = str(uuid()) message_id = f"{os.environ['TENANT_ID']}-{operation}-{platform}-{deduplication_id}" @@ -170,7 +208,7 @@ def get_body_json(record): return responses - def ingest_remote( + async def ingest_remote( self, segment_id: str, integration_id: str, @@ -206,7 +244,7 @@ def ingest_remote( return try: - self.send_messages(segment_id, integration_id, activities, verbose=verbose) + await self.send_messages(segment_id, integration_id, activities, verbose=verbose) except Exception as e: logger.error("Failed trying to send messages for %s", remote, str(e)) finally: @@ -218,7 +256,7 @@ def make_id() -> str: return str(uuid()) -def main(): +async def ingest(): import argparse parser = argparse.ArgumentParser(description="Ingest remote.") @@ -285,7 +323,7 @@ def main(): logger.info("Bad commits for repo %s not found", remote) logger.info(f"Ingesting {remote} for segment {segment_id} ") - queue.ingest_remote( + await queue.ingest_remote( segment_id, integration_id, remote, @@ -295,5 +333,15 @@ def main(): ) +async def run(): + async with asyncio.TaskGroup() as tg: + tg.create_task(ingest()) + await close_db_connections() + + +def main(): + asyncio.run(run()) + + if __name__ == "__main__": main() diff --git a/crowdgit/maintainers.py b/crowdgit/maintainers.py index ecb7734..e7a4955 100644 --- a/crowdgit/maintainers.py +++ b/crowdgit/maintainers.py @@ -1,6 +1,6 @@ from urllib.parse import urlparse from crowdgit.cm_maintainers_data.scraper import scrape, check_for_updates -from crowdgit.cm_maintainers_data.cm_database import query, execute +from crowdgit.cm_maintainers_data.cm_database import close_db_connections, query, execute import os from datetime import datetime from crowdgit.cm_maintainers_data.process_maintainers import ( @@ -298,6 +298,7 @@ async def run(): tg.create_task(parse_not_parsed()) tg.create_task(parse_already_parsed()) tg.create_task(reidentify_repos_with_no_maintainer_file()) + await close_db_connections() def main(): diff --git a/crowdgit/server.py b/crowdgit/server.py index b2c4a1e..383d350 100644 --- a/crowdgit/server.py +++ b/crowdgit/server.py @@ -27,11 +27,12 @@ semaphore = Semaphore(3) + def get_local_repo(remote: str, repos_dir: str) -> str: return os.path.join(repos_dir, get_repo_name(remote)) -def reonboard_repo(remote: str, since: str = None, until: str = None): +async def reonboard_repo(remote: str, since: str = None, until: str = None): """Reonboard a repository by deleting and re-ingesting it. :param remote: The remote URL of the repository to reonboard @@ -65,7 +66,7 @@ def reonboard_repo(remote: str, since: str = None, until: str = None): logging.info("Bad commits for repo %s not found", remote) logging.info("Ingesting %s for segment %s", remote, segment_id) - queue.ingest_remote( + await queue.ingest_remote( segment_id=segment_id, integration_id=integration_id, remote=remote, @@ -161,10 +162,7 @@ async def get_user_name( @app.get("/commits-in-range") async def get_commits_in_range( - remote: str, - since: str, - until: str, - token: HTTPAuthorizationCredentials = Depends(auth_scheme) + remote: str, since: str, until: str, token: HTTPAuthorizationCredentials = Depends(auth_scheme) ): if not secrets.compare_digest(token.credentials, os.environ["AUTH_TOKEN"]): raise HTTPException( @@ -180,7 +178,7 @@ async def get_commits_in_range( # Use git rev-list to count commits between dates cmd = f"""git -C {repo_dir} rev-list --count HEAD --since="{since}" --until="{until}" """ - + process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) @@ -191,12 +189,7 @@ async def get_commits_in_range( logging.error(f"Error while executing command: {cmd}. Error: {error_message}") raise HTTPException(status_code=500, detail="Internal server error") - return { - "remote": remote, - "since": since, - "until": until, - "num_commits": int(stdout) - } + return {"remote": remote, "since": since, "until": until, "num_commits": int(stdout)} @app.get("/reonboard-period") @@ -213,7 +206,7 @@ async def reonboard_remote_period( detail="Incorrect bearer token", headers={"WWW-Authenticate": "Bearer"}, ) - + repo_dir = get_local_repo(remote, REPOS_DIR) if not os.path.exists(repo_dir): @@ -227,7 +220,7 @@ async def reonboard_remote_period( timestamp = fin.read().strip() logging.info("Skipping %s, already running since %s", repo_name, timestamp) return {"message": f"Repository {repo_name} is already being processed since {timestamp}"} - + bg_tasks.add_task(reonboard_repo, remote, since, until) return {"message": "Reonboarding started"}