Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions crowdgit/cm_maintainers_data/cm_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,29 @@

load_dotenv()

_read_conn = None
_write_conn = None


async def close_db_connections():
global _read_conn, _write_conn

logger.info("Closing database connections")

if _read_conn:
await _read_conn.close()
if _write_conn:
await _write_conn.close()


async def get_db_connection(is_read_operation: bool = True):
global _read_conn, _write_conn

if is_read_operation and _read_conn:
return _read_conn
elif not is_read_operation and _write_conn:
return _write_conn

db_params = {
"database": os.getenv("DB_DATABASE"),
"user": os.getenv("DB_USER"),
Expand All @@ -30,17 +51,20 @@ async def get_db_connection(is_read_operation: bool = True):
raise ValueError(
f"The following environment variables are not set: {', '.join(missing_env_vars)}"
)
return await asyncpg.connect(**db_params)

if is_read_operation:
_read_conn = await asyncpg.connect(**db_params)
return _read_conn
else:
_write_conn = await asyncpg.connect(**db_params)
return _write_conn


async def query(sql: str, params: tuple = None) -> List[Dict[str, Any]]:
try:
conn = await get_db_connection(is_read_operation=True)
try:
results = await conn.fetch(sql, *params) if params else await conn.fetch(sql)
return [dict(row) for row in results]
finally:
await conn.close()
results = await conn.fetch(sql, *params) if params else await conn.fetch(sql)
return [dict(row) for row in results]
except Exception as error:
logger.error(f"Error executing query: {error}")
raise
Expand All @@ -49,10 +73,18 @@ async def query(sql: str, params: tuple = None) -> List[Dict[str, Any]]:
async def execute(sql: str, params: tuple = None) -> None:
try:
conn = await get_db_connection(is_read_operation=False)
try:
await conn.execute(sql, *params) if params else await conn.execute(sql)
finally:
await conn.close()
await conn.execute(sql, *params) if params else await conn.execute(sql)
except Exception as error:
logger.error(f"Error executing query: {error}")
raise


async def batch_insert(sql: str, records: List[Any], batch_size=100) -> None:
try:
conn = await get_db_connection(is_read_operation=False)
for i in range(0, len(records), batch_size):
batch = records[i : i + batch_size]
await conn.executemany(sql, batch)
except Exception as error:
logger.error(f"Error executing batch insert: {error}")
raise
24 changes: 18 additions & 6 deletions crowdgit/get_bad_commits.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from git import Repo, Git
from pprint import pprint as pp
import os
import requests
import json
from tqdm import tqdm
from crowdgit.cm_maintainers_data.cm_database import close_db_connections
from crowdgit.ingest import Queue
from dotenv import load_dotenv
import time
Expand Down Expand Up @@ -40,7 +42,7 @@ def send_api_call(endpoint, body=None, method="POST"):
return False


def get_commit_info(repo, segment_id, integration_id, remote, commit_id, repo_path="."):
async def get_commit_info(repo, segment_id, integration_id, remote, commit_id, repo_path="."):
try:
commit = repo.commit(commit_id)
except Exception as e:
Expand Down Expand Up @@ -114,10 +116,10 @@ def get_commit_info(repo, segment_id, integration_id, remote, commit_id, repo_pa

queue = Queue()

return queue.send_messages(segment_id, integration_id, [activity])
return await queue.send_messages(segment_id, integration_id, [activity])


def parse_commit_file(commit_file_path, repo_path):
async def parse_commit_file(commit_file_path, repo_path):
with open(commit_file_path, "r") as f:
commit_data = f.read()

Expand Down Expand Up @@ -147,22 +149,22 @@ def parse_commit_file(commit_file_path, repo_path):
bad_commits = []
for commit in tqdm(commits):
commit_id = commit.split("\n")[0]
commit_info = get_commit_info(
commit_info = await get_commit_info(
repo, segment_id, integration_id, remote, commit_id, repo_path
)
if not commit_info:
bad_commits.append(commit_id)
return bad_commits


def main():
async def process_bad_commits():
import glob
from crowdgit import LOCAL_DIR

commit_files = glob.glob(f"{LOCAL_DIR}/bad-commits/[!DONE]*.txt")
for commit_file_path in commit_files:
repo_path = f"{LOCAL_DIR}/repos/" + os.path.basename(commit_file_path).replace(".txt", "")
parse_commit_file(commit_file_path, repo_path)
await parse_commit_file(commit_file_path, repo_path)
os.rename(
commit_file_path,
f"{LOCAL_DIR}/bad-commits/DONE_"
Expand All @@ -172,5 +174,15 @@ def main():
)


async def run():
async with asyncio.TaskGroup() as tg:
tg.create_task(process_bad_commits)
await close_db_connections()


def main():
asyncio.run(run())


if __name__ == "__main__":
main()
68 changes: 58 additions & 10 deletions crowdgit/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
It will prepare the activites (which will include either cloning it to ENV[REPO_DIR]
if it has not been cloned yet) and send them to SQS.
"""
import asyncio
import os
import json
from datetime import datetime
Expand All @@ -22,6 +23,12 @@

from crowdgit.logger import get_logger

from crowdgit.cm_maintainers_data.cm_database import (
batch_insert,
close_db_connections,
execute as db_execute,
)

logger = get_logger(__name__)


Expand Down Expand Up @@ -88,16 +95,16 @@ def __init__(self):
"""
Initialise class to handle SQS requests.
"""
self.kafka_topic = os.environ['KAFKA_TOPIC']
self.kafka_topic = os.environ["KAFKA_TOPIC"]
self.kafka_producer = Producer(
{
'bootstrap.servers': os.environ['KAFKA_BROKERS'],
'client.id': 'git-integration',
**json.loads(os.environ['KAFKA_CONFIG']),
"bootstrap.servers": os.environ["KAFKA_BROKERS"],
"client.id": "git-integration",
**json.loads(os.environ["KAFKA_CONFIG"]),
}
)

def send_messages(
async def send_messages(
self,
segment_id: str,
integration_id: str,
Expand All @@ -116,13 +123,17 @@ def send_messages(

operation = "upsert_activities_with_members"

def get_body_json(record):
data_to_insert = []
payloads_to_emit = []

def get_body_json(result_id, record):
body = json.dumps(
{
"type": "create_and_process_activity_result",
"tenantId": os.environ["TENANT_ID"],
"segmentId": segment_id,
"integrationId": integration_id,
"resultId": result_id,
"activityData": record,
},
default=string_converter,
Expand All @@ -142,6 +153,7 @@ def get_body_json(record):
"tenantId": os.environ["TENANT_ID"],
"segmentId": segment_id,
"integrationId": integration_id,
"resultId": result_id,
"activityData": record,
},
default=string_converter,
Expand All @@ -151,6 +163,24 @@ def get_body_json(record):

return body

for record in records:
result_id = str(uuid())
record["segmentId"] = segment_id

payloads_to_emit.append(get_body_json(result_id, record))
data_to_insert.append(
{
"id": result_id,
"state": "pending",
"tenantId": os.environ["TENANT_ID"],
"integrationId": integration_id,
"data": {
"type": "activity",
"data": record,
},
}
)

platform = "git"
responses = []

Expand All @@ -159,6 +189,14 @@ def get_body_json(record):
else:
commits_iter = records

await batch_insert(
"""
insert into integration.results(id, state, data, "tenantId", "integrationId")
values($1, $2, $3, $4, $5)
""",
records,
)

for record in commits_iter:
deduplication_id = str(uuid())
message_id = f"{os.environ['TENANT_ID']}-{operation}-{platform}-{deduplication_id}"
Expand All @@ -170,7 +208,7 @@ def get_body_json(record):

return responses

def ingest_remote(
async def ingest_remote(
self,
segment_id: str,
integration_id: str,
Expand Down Expand Up @@ -206,7 +244,7 @@ def ingest_remote(
return

try:
self.send_messages(segment_id, integration_id, activities, verbose=verbose)
await self.send_messages(segment_id, integration_id, activities, verbose=verbose)
except Exception as e:
logger.error("Failed trying to send messages for %s", remote, str(e))
finally:
Expand All @@ -218,7 +256,7 @@ def make_id() -> str:
return str(uuid())


def main():
async def ingest():
import argparse

parser = argparse.ArgumentParser(description="Ingest remote.")
Expand Down Expand Up @@ -285,7 +323,7 @@ def main():
logger.info("Bad commits for repo %s not found", remote)

logger.info(f"Ingesting {remote} for segment {segment_id} ")
queue.ingest_remote(
await queue.ingest_remote(
segment_id,
integration_id,
remote,
Expand All @@ -295,5 +333,15 @@ def main():
)


async def run():
async with asyncio.TaskGroup() as tg:
tg.create_task(ingest())
await close_db_connections()


def main():
asyncio.run(run())


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion crowdgit/maintainers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from urllib.parse import urlparse
from crowdgit.cm_maintainers_data.scraper import scrape, check_for_updates
from crowdgit.cm_maintainers_data.cm_database import query, execute
from crowdgit.cm_maintainers_data.cm_database import close_db_connections, query, execute
import os
from datetime import datetime
from crowdgit.cm_maintainers_data.process_maintainers import (
Expand Down Expand Up @@ -298,6 +298,7 @@ async def run():
tg.create_task(parse_not_parsed())
tg.create_task(parse_already_parsed())
tg.create_task(reidentify_repos_with_no_maintainer_file())
await close_db_connections()


def main():
Expand Down
Loading