Code development platform for open source projects from the European Union institutions :large_blue_circle: EU Login authentication by SMS has been phased out. To see alternatives please check here

Skip to content
Snippets Groups Projects

Script to ingest prediction in postgresql

6 files
+ 503
0
Compare changes
  • Side-by-side
  • Inline

Files

 
import time
 
import json
 
 
import boto3
 
 
 
CLIENT = boto3.client("athena")
 
 
DATABASE_NAME = "ted-ai"
 
RESULT_OUTPUT_LOCATION = "s3://d-ew1-ted-ai-aws-glue-temporary/"
 
TABLE_NAME = "d_ew1_ted_ai_curated_notice_data"
 
PAGE_SIZE = 1000
 
 
 
def fetchall_athena():
 
query = f'''SELECT * FROM "{DATABASE_NAME}"."{TABLE_NAME}" WHERE language = 'EN' AND short_description != '' AND title != '';'''
 
query_id = CLIENT.start_query_execution(
 
QueryString=query,
 
QueryExecutionContext={
 
'Database': DATABASE_NAME
 
},
 
ResultConfiguration={
 
'OutputLocation': RESULT_OUTPUT_LOCATION
 
}
 
)['QueryExecutionId']
 
query_status = None
 
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
 
query_status = CLIENT.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
 
if query_status == 'FAILED' or query_status == 'CANCELLED':
 
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query))
 
time.sleep(1)
 
results_paginator = CLIENT.get_paginator('get_query_results')
 
results_iter = results_paginator.paginate(
 
QueryExecutionId=query_id,
 
PaginationConfig={
 
'PageSize': PAGE_SIZE
 
}
 
)
 
results = []
 
column_names = None
 
 
for results_page in results_iter:
 
for row in results_page['ResultSet']['Rows']:
 
column_values = [col.get('VarCharValue', None) for col in row['Data']]
 
if not column_names:
 
column_names = column_values
 
else:
 
results.append(dict(zip(column_names, column_values)))
 
return results
 
 
 
results_query = fetchall_athena()
 
 
json_object = json.dumps(results_query, indent=4)
 
 
with open("result_queries_athena/results_query.json", "w") as outfile:
 
outfile.write(json_object)
Loading