diff --git a/ingest_model_predictions_in_postgres/01_query_athena.py b/ingest_model_predictions_in_postgres/01_query_athena.py new file mode 100755 index 0000000000000000000000000000000000000000..05c0f48e38f9d955a4f7ec1e078a75eb63c8c0df --- /dev/null +++ b/ingest_model_predictions_in_postgres/01_query_athena.py @@ -0,0 +1,57 @@ +import time +import json + +import boto3 + + +CLIENT = boto3.client("athena") + +DATABASE_NAME = "ted-ai" +RESULT_OUTPUT_LOCATION = "s3://d-ew1-ted-ai-aws-glue-temporary/" +TABLE_NAME = "d_ew1_ted_ai_curated_notice_data" +PAGE_SIZE = 1000 + + +def fetchall_athena(): + query = f'''SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}" WHERE language = 'EN' AND short_description != '' AND title != '';''' + query_id = CLIENT.start_query_execution( + QueryString=query, + QueryExecutionContext={ + 'Database': DATABASE_NAME + }, + ResultConfiguration={ + 'OutputLocation': RESULT_OUTPUT_LOCATION + } + )['QueryExecutionId'] + query_status = None + while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None: + query_status = CLIENT.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] + if query_status == 'FAILED' or query_status == 'CANCELLED': + raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query)) + time.sleep(1) + results_paginator = CLIENT.get_paginator('get_query_results') + results_iter = results_paginator.paginate( + QueryExecutionId=query_id, + PaginationConfig={ + 'PageSize': PAGE_SIZE + } + ) + results = [] + column_names = None + + for results_page in results_iter: + for row in results_page['ResultSet']['Rows']: + column_values = [col.get('VarCharValue', None) for col in row['Data']] + if not column_names: + column_names = column_values + else: + results.append(dict(zip(column_names, column_values))) + return results + + +results_query = fetchall_athena() + +json_object = json.dumps(results_query, indent=4) + +with open("result_queries_athena/results_query.json", "w") as outfile: + outfile.write(json_object) diff --git a/ingest_model_predictions_in_postgres/02_predict_divisions_from_models.py b/ingest_model_predictions_in_postgres/02_predict_divisions_from_models.py new file mode 100644 index 0000000000000000000000000000000000000000..d2be6b89afe910f8e907087e3343d5e1be57e6b2 --- /dev/null +++ b/ingest_model_predictions_in_postgres/02_predict_divisions_from_models.py @@ -0,0 +1,282 @@ +import os +import glob +import tarfile +import json +from typing import List, Dict +import re +from unidecode import unidecode + +from pydantic import BaseModel, Field, root_validator +import spacy +import boto3 +import joblib +import numpy as np +import torch +from transformers import AutoTokenizer, AutoModelForSequenceClassification + + +# 1. Downloads models + +print("Downloading models from S3") + + +def extract_all_files(tar_file_path, extract_to): + with tarfile.open(tar_file_path, 'r') as tar: + tar.extractall(extract_to) + + +BUCKET_MODELS_NAME = "d-ew1-ted-ai-ml-models" +OPENTENDER_MODEL_S3_KEY = "models/opentender-multi-label-division-classifier/v0.0.2/model.tar.gz" +LINEARSVC_MULTILABEL_CPV_CLASSIFIER_MODEL_S3_KEY = "models/multi-label-division-classifier/v0.0.5/model.tar.gz" +ROBERTA_MULTILABEL_CPV_CLASSIFIER_MODEL_S3_KEY = "models/roberta-multi-label-division-classifier/v0.0.1/model.tar.gz" + +OPENTENDER_MODEL_OUTPUT_NAME = "opentender_model.tar.gz" +LINEARSVC_MULTILABEL_CPV_CLASSIFIER__OUTPUT_NAME = "linearsvc_model.tar.gz" +ROBERTA_MULTILABEL_CPV_CLASSIFIER_MODEL_OUTPUT_NAME = "roberta_model.tar.gz" +OUTPUT_MODELS_PATH = "models" + + +if not os.path.exists(OUTPUT_MODELS_PATH): + os.makedirs(OUTPUT_MODELS_PATH) +else: + files = glob.glob(f'{OUTPUT_MODELS_PATH}/**/*.*', recursive=True) + for f in files: + os.remove(f) + +resource = boto3.resource('s3') +s3_bucket_models = resource.Bucket(BUCKET_MODELS_NAME) +versions_models = {} + +for s3_key, output_filename in zip([OPENTENDER_MODEL_S3_KEY, LINEARSVC_MULTILABEL_CPV_CLASSIFIER_MODEL_S3_KEY, + ROBERTA_MULTILABEL_CPV_CLASSIFIER_MODEL_S3_KEY], + [OPENTENDER_MODEL_OUTPUT_NAME, LINEARSVC_MULTILABEL_CPV_CLASSIFIER__OUTPUT_NAME, + ROBERTA_MULTILABEL_CPV_CLASSIFIER_MODEL_OUTPUT_NAME]): + tar_local_path = f"{OUTPUT_MODELS_PATH}/{output_filename}" + s3_bucket_models.download_file(s3_key, tar_local_path) + extract_path = f"{OUTPUT_MODELS_PATH}/{output_filename.split('.')[0]}/" + extract_all_files(tar_local_path, extract_path) + os.remove(tar_local_path) + version = s3_key.split('/')[-2] + model_name = s3_key.split('/')[-3] + versions_models[model_name] = version + +print("Models downloaded from S3") + +# 2. Load models +print("Loading models") + +opentender_model = joblib.load(f"{OUTPUT_MODELS_PATH}/{OPENTENDER_MODEL_OUTPUT_NAME.split('.')[0]}/model.joblib") +linearsvc_model = joblib.load( + f"{OUTPUT_MODELS_PATH}/{LINEARSVC_MULTILABEL_CPV_CLASSIFIER__OUTPUT_NAME.split('.')[0]}/model.joblib") +ROBERTA_PATH = f"{OUTPUT_MODELS_PATH}/{ROBERTA_MULTILABEL_CPV_CLASSIFIER_MODEL_OUTPUT_NAME.split('.')[0]}/roberta-cpv-multilabel/" +roberta_tokenizer = AutoTokenizer.from_pretrained(ROBERTA_PATH, local_files_only=True) +roberta_model = AutoModelForSequenceClassification.from_pretrained(ROBERTA_PATH) +print("Models loaded") + +# 3. Load Athena data from local file +AVAILABLE_DIV = ['85', '44', '50', '80', '73', '45', '71', '79', '90', '30', '35', '33', '55', '72', '48', '38', '09', + '75', '66', '64', '42', '34', '60', '92', '39', '31', '98', '51', '32', '65', '77', '22', '63', '15', + '70', '18', '03', '24', '43', '19', '41', '37', '14', '16', '76'] + + +class TableModel(BaseModel): + notice_id: str = Field(alias='id') + notice_type: str = Field(alias='type') + title: str + short_description: str + cpvs: List[str] = [] + divisions: List[str] = [] + divisions_opentender_model: List[str] = [] + divisions_linearsvc_model: List[str] = [] + divisions_roberta_model: List[str] = [] + versions_model: Dict[str, str] = {} + version: str + is_eu_institution: bool + + @root_validator(pre=True) + def _set_fields(cls, values: dict) -> dict: + all_cpvs = values["additional_cpvs"].split(",") + all_cpvs.append(values["main_cpv"]) + all_cpvs = [i for i in all_cpvs if i != ""] + all_cpvs = list(set(all_cpvs)) + values["cpvs"] = all_cpvs + all_divisions = [] + for single_cpv in all_cpvs: + division = single_cpv[:2] + if division in AVAILABLE_DIV and division not in all_divisions: + all_divisions.append(division) + values["divisions"] = all_divisions + return values + + +print("Parsing Athena records") + +with open('result_queries_athena/results_query.json') as results_query_athena: + file_contents = results_query_athena.read() + +parsed_json = json.loads(file_contents) + +parsed_object_table = [TableModel(**i) for i in parsed_json] +number_records_to_process = len(parsed_object_table) +print(f"Number records to process: {number_records_to_process}") + +# 4. Preprocessing and prediction functions for each model + +print("Downloading Spacy model") +spacy.cli.download("en_core_web_sm") +print("Loading Spacy model") +NLP = spacy.load("en_core_web_sm") +print("Spacy model loaded") + +STOP_WORDS = NLP.Defaults.stop_words +CHARACTERS_TO_REPLACE = ["\\n", "\\r", "\\t", "\\W", "•", "\t", "-", "(", ")", ":", ";", "?", "!", "&", "\n", "\r", ".", + ",", "'", "’", "´", + "‘", "’", '"', "“", "”", "'", "/", "\\", "%", "—", "#", "$", "[", "]", "|", "{", "}", "~", "`", + "+", "*"] + +MONTHS = [" january ", " february ", " march ", " april ", " may ", " june ", " july ", " august ", " september ", + " october ", " november ", " december ", + " jan ", " feb ", " mar ", " apr ", " jun ", " jul ", " aug ", " sep ", " oct ", " nov ", " dec "] + + +def _remove_multiple_spaces(text: str) -> str: + return re.sub('\s+', ' ', text) + + +def _remove_special_characters(text: str) -> str: + for chars in CHARACTERS_TO_REPLACE: + text = text.replace(chars, " ") + return text + + +def _remove_stop_words(text: str) -> str: + token_list = text.split() + removed_list = [x for x in token_list if x not in STOP_WORDS] + return ' '.join(removed_list) + + +def _replace_digits(text): + return re.sub(r'[\d-]+', 'NUMBER', text) + + +def _delete_one_letter_word(text): + text_as_list = text.split() + text_as_list = [element for element in text_as_list if len(element) > 1] + return ' '.join(text_as_list) + + +def _remove_consecutive_duplicates(text): + text_as_list = text.split() + last_seen = None + result = [] + for x in text_as_list: + if x != last_seen: + result.append(x) + last_seen = x + return ' '.join(result) + + +def _replace_months(text: str) -> str: + text = " " + text + " " + for month in MONTHS: + text = text.replace(month, " MONTH ") + return text + + +def _replace_with_lemma(text: str) -> str: + doc = NLP(text) + lemmatized_list = [] + for token in doc: + lemmatized_list.append(token.lemma_) + return " ".join(lemmatized_list) + + +def _preprocess_input(text: str) -> str: + x = unidecode(str(text).lower()) + x = _replace_with_lemma(x) + x = _remove_special_characters(x) + x = _remove_stop_words(x) + x = _remove_multiple_spaces(x) + x = _replace_digits(x) + x = _delete_one_letter_word(x) + x = _remove_consecutive_duplicates(x) + x = _replace_months(x) + return x + + +with open(f"./{ROBERTA_PATH}config.json") as file: + config_file = file.read() + +ID2LABEL = json.loads(config_file)["id2label"] + + +def predict_roberta(tokenizer_model, prediction_model, title, description): + if title: + title = f'{title}. ' + else: + title = "" + if not description: + description = "" + + title_description = f'{title}{description}' + encoded_input = tokenizer_model(title_description, return_tensors='pt', padding=True, truncation=True) + + results = prediction_model(**encoded_input) + logits = results.logits + sigmoid = torch.nn.Sigmoid() + probs = sigmoid(logits.squeeze().cpu()) + predictions = np.zeros(probs.shape) + predictions[np.where(probs >= 0.5)] = 1 + predicted_labels = [ID2LABEL[str(idx)] for idx, label in enumerate(predictions) if label == 1.0] + return predicted_labels + + +def predict_with_linear_svc_models(model_var, labels, preprocessed_text): + predictions = model_var.predict(preprocessed_text) + predictions_merged = predictions.max(axis=0) + predictions_merged = predictions_merged.tolist() + results = [] + for index, element in enumerate(predictions_merged): + if element: + cpv_number = labels[index] + results.append(cpv_number) + return results + + +# 5. Loop over each record and make predictions + +ALL_CPVS_OPENTENDER_MODEL = ['71', '44', '50', '80', '73', '45', '85', '79', '90', '30', '35', '33', '55', '72', + '48', '38', '09', '75', '66', '64', '42', '34', '60', '92', '39', '31', '98', '51', + '77', '22', '32', '63', '15', '65', '70', '18', '03', '43', '24', '19', '41', '37', + '14', '16', '76'] + +ALL_CPVS_LINEAR_SVC_MODEL = ['85', '44', '50', '80', '73', '45', '71', '79', '90', '30', '35', '33', '55', '72', '48', + '38', '09', '75', + '66', '64', '42', '34', '60', '92', '39', '31', '98', '51', '32', '65', '77', '22', '63', + '15', '70', '18', + '03', '24', '43', '19', '41', '37', '14', '16', '76'] + +print("Predicting divisions") +i = 0 +for element in parsed_object_table: + i += 1 + preprocessed_title = _preprocess_input(element.title) + preprocessed_description = _preprocess_input(element.short_description) + preprocessed_title_description = f"{preprocessed_title} {preprocessed_description}" + preprocessed_input = [preprocessed_title, preprocessed_description, preprocessed_title_description] + predictions_linear_svc_model = predict_with_linear_svc_models(linearsvc_model, ALL_CPVS_LINEAR_SVC_MODEL, + preprocessed_input) + predictions_opentender_model = predict_with_linear_svc_models(opentender_model, ALL_CPVS_OPENTENDER_MODEL, + preprocessed_input) + predictions_roberta_model = predict_roberta(roberta_tokenizer, roberta_model, element.title, element.short_description) + element.divisions_roberta_model = predictions_roberta_model + element.divisions_linearsvc_model = predictions_linear_svc_model + element.divisions_opentender_model = predictions_opentender_model + element.versions_model = versions_models + if i % 100 == 0: + print(f'{100*i/number_records_to_process}%') + +output_formatted = [element.dict() for element in parsed_object_table] +json_object = json.dumps(output_formatted, indent=4) +with open("predicted_results/predicted_records.json", "w") as outfile: + outfile.write(json_object) diff --git a/ingest_model_predictions_in_postgres/03_ingest_data_in_postgres.py b/ingest_model_predictions_in_postgres/03_ingest_data_in_postgres.py new file mode 100644 index 0000000000000000000000000000000000000000..db79687fc488811befbd2834043dec4845f83a57 --- /dev/null +++ b/ingest_model_predictions_in_postgres/03_ingest_data_in_postgres.py @@ -0,0 +1,55 @@ +import sys +import json +import psycopg2 +from psycopg2.extras import execute_values + +POSTGRES_USERNAME = "" +POSTGRES_PASSWORD = "" +POSTGRES_HOST = "" +POSTGRES_PORT = "" +POSTGRES_DATABASE = "" +FILENAME = "predicted_records.json" + +print(f"Reading file '{FILENAME}'") +with open(FILENAME) as file: + file_content = file.read() + +print(f"Parsing file '{FILENAME}'") +parsed_json = json.loads(file_content) + +print(f"Converting versions_model field to str") +for element in parsed_json: + element["versions_model"] = json.dumps(element.get("versions_model", {})) + +total_notices = len(parsed_json) +print(f"Number of notices to ingest : {total_notices}") + +connection = psycopg2.connect(user=POSTGRES_USERNAME, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + database=POSTGRES_DATABASE) + +i = 0 +columns = parsed_json[0].keys() +BATCH_SIZE = 1000 + +for end_slice in range(BATCH_SIZE, total_notices + BATCH_SIZE, BATCH_SIZE): + print("==================================") + start_slice = end_slice - BATCH_SIZE + iter_list = parsed_json[start_slice:end_slice] + print(f"Current slice: '{start_slice}:{end_slice}' \t\t\t {100*end_slice / total_notices}%") + try: + cursor = connection.cursor() + query = "INSERT INTO aggregated_results ({}) VALUES %s".format(','.join(columns)) + values = [[value for value in notice.values()] for notice in iter_list] + execute_values(cursor, query, values) + connection.commit() + except Exception as e: + print(e) + print(f"Exception at iteration: '{start_slice}:{end_slice}'") + cursor.close() + connection.close() + sys.exit(1) +cursor.close() +connection.close() diff --git a/ingest_model_predictions_in_postgres/README.md b/ingest_model_predictions_in_postgres/README.md new file mode 100644 index 0000000000000000000000000000000000000000..72a31e0299b606428edb3725f7032e1be258fdce --- /dev/null +++ b/ingest_model_predictions_in_postgres/README.md @@ -0,0 +1,19 @@ +# Script to extract data from Athena, make predictions from models deployed, and ingest data in RDS + +## Pre-requisites +- Python3.10, +- Pip, +- AWS CLI profile setup, +- postgresql-devel installed. + + +## Dependencies +Install dependencies with following command: +```bash +pip3 install -r requirements.txt +``` + +## Scripts +1. `01_query_athena.py`: Execute this script to extract all notices with language being EN and having both a title and a short description. Output is available in file `result_queries_athena/results_query.json`. +2. `02_predict_divisions_from_models.py`: Execute this script to make division predictions on file `result_queries_athena/results_query.json` with the three models deployed. Output is available in file `predicted_results/predicted_records.json`. +3. `03_ingest_data_in_postgres.py`: To execute this script , you first need to create the table `aggregated_results` if it does not exist. The schema is defined in `schema_table_aggregated_results.sql`. In addition, you have to complete the variables `POSTGRES_USERNAME`, `POSTGRES_PASSWORD`, `POSTGRES_HOST`, `POSTGRES_PORT`, `POSTGRES_DATABASE` , and then execute this script to ingest records with predictions from the different model into postgres. \ No newline at end of file diff --git a/ingest_model_predictions_in_postgres/requirements.txt b/ingest_model_predictions_in_postgres/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..bb1e431e50c5246f8e02ef0f86ff4cde8cded1ce --- /dev/null +++ b/ingest_model_predictions_in_postgres/requirements.txt @@ -0,0 +1,73 @@ +annotated-types==0.6.0 +blis==0.7.11 +boto3==1.28.74 +botocore==1.31.74 +catalogue==2.0.10 +certifi==2023.7.22 +charset-normalizer==3.3.2 +click==8.1.7 +cloudpathlib==0.16.0 +confection==0.1.3 +cymem==2.0.8 +en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.0/en_core_web_sm-3.7.0-py3-none-any.whl#sha256=6215d71a3212690e9aec49408a27e3fe6ad7cd6c715476e93d70dc784041e93e +filelock==3.13.1 +fsspec==2023.10.0 +huggingface-hub==0.17.3 +idna==3.4 +Jinja2==3.1.2 +jmespath==1.0.1 +joblib==1.3.2 +langcodes==3.3.0 +MarkupSafe==2.1.3 +mpmath==1.3.0 +murmurhash==1.0.10 +networkx==3.2.1 +numpy==1.26.1 +nvidia-cublas-cu12==12.1.3.1 +nvidia-cuda-cupti-cu12==12.1.105 +nvidia-cuda-nvrtc-cu12==12.1.105 +nvidia-cuda-runtime-cu12==12.1.105 +nvidia-cudnn-cu12==8.9.2.26 +nvidia-cufft-cu12==11.0.2.54 +nvidia-curand-cu12==10.3.2.106 +nvidia-cusolver-cu12==11.4.5.107 +nvidia-cusparse-cu12==12.1.0.106 +nvidia-nccl-cu12==2.18.1 +nvidia-nvjitlink-cu12==12.3.52 +nvidia-nvtx-cu12==12.1.105 +packaging==23.2 +pandas==2.1.2 +psycopg2==2.9.9 +preshed==3.0.9 +pydantic==2.1.1 +pydantic_core==2.4.0 +python-dateutil==2.8.2 +pytz==2023.3.post1 +PyYAML==6.0.1 +regex==2023.10.3 +requests==2.31.0 +s3transfer==0.7.0 +safetensors==0.4.0 +scikit-learn==1.2.2 +scipy==1.11.3 +six==1.16.0 +smart-open==6.4.0 +spacy==3.7.2 +spacy-legacy==3.0.12 +spacy-loggers==1.0.5 +srsly==2.4.8 +sympy==1.12 +thinc==8.2.1 +threadpoolctl==3.2.0 +tokenizers==0.14.1 +torch==2.1.0 +tqdm==4.66.1 +transformers==4.34.1 +triton==2.1.0 +typer==0.9.0 +typing_extensions==4.8.0 +tzdata==2023.3 +Unidecode==1.3.7 +urllib3==2.0.7 +wasabi==1.1.2 +weasel==0.3.3 \ No newline at end of file diff --git a/ingest_model_predictions_in_postgres/schema_table_aggregated_results.sql b/ingest_model_predictions_in_postgres/schema_table_aggregated_results.sql new file mode 100644 index 0000000000000000000000000000000000000000..bf2fc1ec2ba1ae7a028f034d5a87ba110c8a3bec --- /dev/null +++ b/ingest_model_predictions_in_postgres/schema_table_aggregated_results.sql @@ -0,0 +1,17 @@ +CREATE TABLE IF NOT EXISTS aggregated_results ( +id BIGSERIAL, +notice_id TEXT, +notice_type TEXT, +title TEXT, +short_description TEXT, +version TEXT, +is_eu_institution BOOLEAN, +cpvs TEXT[], +divisions TEXT[], +divisions_opentender_model TEXT[], +divisions_linearsvc_model TEXT[], +divisions_roberta_model TEXT[], +versions_model JSON +); + +ALTER TABLE aggregated_results SET UNLOGGED;