Code development platform for open source projects from the European Union institutions :large_blue_circle: EU Login authentication by SMS has been phased out. To see alternatives please check here

Skip to content
Snippets Groups Projects

[#17] Authorize administrator on taxonomy creation

Merged Andrei Patrascu requested to merge 17-authorize-taxonomy-insertion into development
3 files
+ 168
77
Compare changes
  • Side-by-side
  • Inline

Files

from flask_restx import Namespace, Resource, reqparse, abort, fields
from flask import current_app as app, request, jsonify
import json
from http import HTTPStatus
import jsonschema
from flask_restx import Namespace, Resource, reqparse, abort
from flask import current_app as app, request
from search.infrastructure.auth_validator import auth_validator
from search.infrastructure.ApiLogicError import ApiLogicError
from search.infrastructure.auth_validator import auth_validator, is_administrator
from search.infrastructure.ApiLogicError import ApiLogicError, ForbiddenResourceError
from .taxonomy import Taxonomy
import json
import jsonschema
taxonomy_api = Namespace("Taxonomy")
json_schema = {
"type": "object",
"patternProperties": {
"^[a-zA-Z0-9_]+$": {
"type": "object",
"properties": {
"data": {"type": "array", "items": {"$ref": "#/definitions/node"}}
},
"required": ["data"],
"additionalProperties": False,
}
},
"definitions": {
"node": {
"type": "object",
"properties": {
"label": {"type": "string"},
"data": {"type": "string"},
"notation": {"type": "string"},
"type": {"type": "string"},
"children": {"type": "array", "items": {"$ref": "#/definitions/node"}},
"uuid": {"type": "string"},
},
"required": ["label", "notation"],
"additionalProperties": False,
}
},
}
# Define the parser for the file upload
upload_parser = reqparse.RequestParser()
upload_parser.add_argument('file', type='file', location='files', required=True, help='JSON file to be uploaded')
upload_parser.add_argument(
"file",
type="file",
location="files",
required=True,
help="JSON file to be uploaded",
)
API_DESCRIPTION = f"""
Adds new taxonomies in the taxonomy index.
This is a restricted endpoint, only users with elevated access can access it.
### JSON Schema:
```json
{json.dumps(json_schema, indent=2)}
```
"""
@taxonomy_api.route("taxonomy")
@taxonomy_api.doc(
description="This endpoint will add new taxonomies in the taxonomy index. "
"Tree format is required, JSON schema is described in SEARCH API documentation.",
description=API_DESCRIPTION,
security="apikey",
responses={
int(HTTPStatus.CREATED): "Taxonomy created.",
int(HTTPStatus.BAD_REQUEST): "Errors in request file",
int(HTTPStatus.FORBIDDEN): "Insufficient rights",
int(HTTPStatus.INTERNAL_SERVER_ERROR): "Internal server error",
},
)
class CreateTaxonomy(Resource):
# TODO add link to documentation
@taxonomy_api.doc(params={'file': "JSON file to be uploaded. JSON schema is described in SEARCH API documentation."})
@taxonomy_api.doc(params={"file": "JSON file to be uploaded."})
@taxonomy_api.expect(upload_parser)
@auth_validator()
def post(self):
"""Upload a new taxonomy."""
try:
if 'file' in request.files:
file = request.files['file']
if not is_administrator():
raise ForbiddenResourceError(
resource_id="",
message="Forbidden.",
)
if "file" in request.files:
file = request.files["file"]
else:
args = upload_parser.parse_args()
file = args['file']
file = args["file"]
if not file:
raise ApiLogicError('No file part', 400)
if file.filename == '':
raise ApiLogicError('No selected file', 400)
if file and is_valid_json(file):
raise ApiLogicError("No file part", 400)
if file.filename == "":
raise ApiLogicError("No selected file", 400)
if file and _is_valid_json(file):
json_data = json.load(file)
else:
raise ApiLogicError('Invalid file type, must be json', 400)
raise ApiLogicError("Invalid file type, must be json", 400)
jsonschema.validate(instance=json_data, schema=json_schema)
tax = Taxonomy(app.config["INDEX_TAXONOMY"], app.es)
tax.from_eurovoc_tree_to_index_format(json_data)
return {"ok": "Success"}, 201
except jsonschema.exceptions.ValidationError as e:
app.logger.exception("ValidationError")
message = e.schema.get("error_msg", e.message)
abort(400, f"JSON Validation failed: {message}")
except json.JSONDecodeError:
abort(400, 'Invalid JSON file format')
app.logger.exception("JSONDecodeError")
abort(400, "Invalid JSON file format")
except ForbiddenResourceError as fre:
app.logger.exception("ForbiddenResourceError")
abort(403, fre.message)
except ApiLogicError as aex:
app.logger.exception("ApiLogicError")
abort(400, str(aex))
except Exception:
app.logger.exception("corpus/taxonomy -> post")
abort(500, "Internal server error")
return {"ok": "Success"}, 201
def is_valid_json(file):
#TODO replace with nlp-api call which will use libmagic to identify mimetype
if 'json' in file.mimetype.lower():
def _is_valid_json(file):
if "json" in file.mimetype.lower():
return True
return False
def convert_file_to_json(file):
file_content = file.stream.read().decode('utf-8')
lines = file_content.split('\n')
def _convert_file_to_json(file):
file_content = file.stream.read().decode("utf-8")
lines = file_content.split("\n")
parsed_data = {}
for line in lines:
if ':' in line:
key, value = line.strip().split(':', 1)
if ":" in line:
key, value = line.strip().split(":", 1)
parsed_data[key.strip()] = value.strip()
json_data = json.dumps(parsed_data, indent=2)
return json_data
json_schema = {
"type": "object",
"patternProperties": {
"^[a-zA-Z0-9_]+$": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/node"
}
}
},
"required": ["data"],
"additionalProperties": False
}
},
"definitions": {
"node": {
"type": "object",
"properties": {
"label": {"type": "string"},
"data": {"type": "string"},
"notation": {"type": "string"},
"type": {"type": "string"},
"children": {
"type": "array",
"items": {"$ref": "#/definitions/node"}
},
"uuid": {"type": "string"}
},
"required": ["label", "notation"],
"additionalProperties": False
}
}
}
Loading