Code development platform for open source projects from the European Union institutions :large_blue_circle: EU Login authentication by SMS has been phased out. To see alternatives please check here

Skip to content
Snippets Groups Projects
Commit 9fae700b authored by Vidas DAUDARAVICIUS's avatar Vidas DAUDARAVICIUS
Browse files

Merge branch 'development' into 'main'

merging development to main before release v3.3.0

See merge request !12
parents 68c9c2e7 0caa60ff
No related branches found
No related tags found
1 merge request!12merging development to main before release v3.3.0
Showing
with 225 additions and 82 deletions
......@@ -54,4 +54,4 @@ COPY ./config/nlp.conf /etc/seta/
COPY ./nlp ./nlp
COPY ./*.py .
CMD ["gunicorn", "main:create_app()", "--timeout=600", "--workers=2", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind=0.0.0.0:8000"]
\ No newline at end of file
CMD ["gunicorn", "--conf", "/home/seta/gunicorn_conf.py", "main:create_app()", "--bind=0.0.0.0:8000"]
\ No newline at end of file
......@@ -19,13 +19,13 @@ propagate=0
[handler_consoleHandler]
class=StreamHandler
level=INFO
level=DEBUG
formatter=normalFormatter
args=(sys.stdout,)
[handler_detailedConsoleHandler]
class=StreamHandler
level=INFO
level=DEBUG
formatter=detailedFormatter
args=(sys.stdout,)
......
import os
# Gunicorn config variables
worker_class = "uvicorn.workers.UvicornWorker"
loglevel = os.environ.get("LOG_LEVEL", default="WARNING")
errorlog = "-" # stderr
accesslog = "-" # stdout
name = "seta-nlp"
worker_tmp_dir = "/dev/shm"
timeout = 600
workers = 2
from nlp.internal.seta_jwt import SetaJwtAccessBearerCookie
from .configuration import configuration
from nlp import configuration
# Read access token from bearer header and cookie (bearer priority)
access_security = SetaJwtAccessBearerCookie(
secret_key=configuration.SECRET_KEY,
secret_key=configuration.app_config.SECRET_KEY,
)
......@@ -4,13 +4,16 @@ from nlp.internal import config
STAGE = "Development"
CONFIG_APP_FILE = "/etc/seta/nlp.conf"
configuration: config.Config = None
app_config: config.Config = None
def init():
"""Initialize configuration"""
global configuration, STAGE # pylint: disable=global-statement
global app_config, STAGE # pylint: disable=global-statement
STAGE = os.environ.get("STAGE", default="Development")
configuration = config.Config(section_name=STAGE, config_file=CONFIG_APP_FILE)
app_config = config.Config(section_name=STAGE, config_file=CONFIG_APP_FILE)
# huggingface/tokenizers: disable parallelism to avoid deadlocks...
os.environ["TOKENIZERS_PARALLELISM"] = "false"
......@@ -2,13 +2,31 @@ import time
import logging
import random
import string
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from nlp import configuration
from nlp import configuration, globals as g
logger = logging.getLogger("nlp")
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
"""Lifespan context manager."""
g.init()
logger.info("NLP app lifespan initialized.")
yield
g.close()
def create_fastapi_app() -> FastAPI:
"""Web service app factory"""
......@@ -17,6 +35,15 @@ def create_fastapi_app() -> FastAPI:
title="SeTA NLP",
summary="Natural language processing web service.",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
......@@ -27,14 +54,14 @@ def create_fastapi_app() -> FastAPI:
return await call_next(request)
idem = "".join(random.choices(string.ascii_uppercase + string.digits, k=6))
logger.info("rid=%s start request path=%s", idem, request.url.path)
logger.debug("rid=%s start request path=%s", idem, request.url.path)
start_time = time.time()
response = await call_next(request)
process_time = (time.time() - start_time) * 1000
formatted_process_time = f"{process_time:.2f}"
logger.info(
logger.debug(
"rid=%s completed_in=%sms status_code=%s",
idem,
formatted_process_time,
......@@ -50,7 +77,10 @@ def create_fastapi_app() -> FastAPI:
def _register_routers(app: FastAPI):
# pylint: disable=import-outside-toplevel
from nlp.routers.file_parser import router as file_parser_router
from nlp.routers.file_parser import (
router as file_parser_router,
internal_router as internal_file_parser_router,
)
from nlp.routers.embeddings import (
router as embeddings_router,
internal_router as internal_embeddings_router,
......@@ -69,3 +99,8 @@ def _register_routers(app: FastAPI):
prefix="/internal",
include_in_schema=configuration.STAGE.lower() == "development",
)
app.include_router(
internal_file_parser_router,
prefix="/internal",
include_in_schema=configuration.STAGE.lower() == "development",
)
from nlp.internal.interfaces import file_parser as ifp, embeddings as ie
from nlp.internal import dependencies
file_parser: ifp.IFileParserAsync = None
embeddings_transformer: ie.IEmbeddingsAsync = None
def init():
"""Initialize globals."""
# pylint: disable-next=global-statement
global file_parser, embeddings_transformer
file_parser = dependencies.get_file_parser()
embeddings_transformer = dependencies.get_embeddings_client()
def close():
"""Clear resources."""
# pylint: disable-next=global-statement
global file_parser, embeddings_transformer
file_parser = None
embeddings_transformer = None
......@@ -5,6 +5,9 @@ import os
class Config:
"""Application configuration"""
SECRET_KEY = None
DISABLE_COMPUTE_EMBEDDINGS = False
def __init__(self, section_name: str, config_file: str) -> None:
config = configparser.ConfigParser()
config.read(config_file)
......@@ -60,6 +63,11 @@ class Config:
# flask
Config.SECRET_KEY = os.environ.get("API_SECRET_KEY")
# nlp
Config.DISABLE_COMPUTE_EMBEDDINGS = Config.get_env_boolean(
name="DISABLE_COMPUTE_EMBEDDINGS", default_value=False
)
@staticmethod
def _init_config_section(config_section: configparser.SectionProxy):
"""Check the seta_config/nlp.conf file for documentation."""
......
import os
from nlp import configuration
from nlp.internal.implementations import (
tika_parser,
sentence_transformer_embeddings as ste,
)
def get_file_parser():
"""Creates a Tika parser client."""
#! see ENV variable in Dockerfile
tika_path = os.environ["TIKA_PATH"]
return tika_parser.TikaParser(tika_path=tika_path)
def get_embeddings_client():
"""Creates embeddings client using SetaTransformer"""
return ste.SentenceTransformerEmbeddings(
use_workers=configuration.app_config.USE_EMBEDDINGS_WORKER,
disable_embeddings=configuration.app_config.DISABLE_COMPUTE_EMBEDDINGS,
)
class NLPException(Exception):
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(self.message)
class TikaParserException(NLPException):
def __init__(self, message: str):
super().__init__(message, status_code=500)
......@@ -14,7 +14,9 @@ from nlp.internal.interfaces import embeddings as iemb
logger = logging.getLogger("nlp")
SBERT_MODEL = SentenceTransformer("intfloat/multilingual-e5-large-instruct")
MODEL_NAME = "intfloat/multilingual-e5-large-instruct"
SBERT_MODEL = SentenceTransformer(MODEL_NAME)
SBERT_MODEL.max_seq_length = 512
......@@ -22,7 +24,7 @@ def _create_model():
"""Creates global model."""
global SBERT_MODEL # pylint: disable=global-statement
SBERT_MODEL = SentenceTransformer("intfloat/multilingual-e5-large-instruct")
SBERT_MODEL = SentenceTransformer(MODEL_NAME)
SBERT_MODEL.max_seq_length = 512
......@@ -60,28 +62,10 @@ def _execute_predict(text: str, convert_to_numpy: bool = True):
class SentenceTransformerEmbeddings(implements(iemb.IEmbeddingsAsync)):
CHUNK_SIZE = 300
def __init__(self, use_workers: bool = False):
self.version = "sbert model intfloat/multilingual-e5-large-instruct"
def __init__(self, use_workers: bool = False, disable_embeddings: bool = False):
self.version = f"sbert model {MODEL_NAME}"
self.use_workers = use_workers
async def chunks_and_embeddings_from_doc_fields(
self, title: str, abstract: str, text: str
) -> list[models.ComputedEmbedding]:
"""Compute embeddings from document fields."""
text_doc = ""
if title is not None:
text_doc = title + "\n"
if abstract is not None:
text_doc = text_doc + abstract + "\n"
if text is not None:
text_doc = text_doc + text
embeddings = await self._compute_embeddings(text_doc)
return embeddings
self.disable_embeddings = disable_embeddings
async def chunks_and_embeddings_from_text(self, text: str) -> models.Chunks:
"""Compute embeddings from text"""
......@@ -113,6 +97,9 @@ class SentenceTransformerEmbeddings(implements(iemb.IEmbeddingsAsync)):
sent = " ".join([str(w) for w in ww])
if self.disable_embeddings:
vector_list = None
else:
if self.use_workers:
vector = await _execute_predict_async(sent)
else:
......
import os
import asyncio
import logging
from interface import implements
from nlp.internal.interfaces import file_parser
from nlp.internal import exceptions
logger = logging.getLogger("nlp")
......@@ -12,11 +12,11 @@ class TikaParser(implements(file_parser.IFileParserAsync)):
def __init__(self, tika_path: str):
self.tika_path = tika_path
async def extract_text(self, content: bytes) -> str:
async def extract_text(self, content: bytes, output_format: str = "text") -> str:
"""Extracts text from file content."""
proc = await asyncio.create_subprocess_shell(
f"java -jar {self.tika_path} --text",
f"java -jar {self.tika_path} --{output_format} -r",
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
......@@ -28,16 +28,7 @@ class TikaParser(implements(file_parser.IFileParserAsync)):
return stdout.decode(encoding="utf-8")
if stderr:
logger.error(stderr.decode(encoding="utf-8"))
# TODO: raise an exception for stderr.decode(encoding="utf-8")
msg = stderr.decode(encoding="utf-8")
raise exceptions.TikaParserException(msg)
return None
async def get_file_parser():
"""Creates a Tika parser client."""
#! see ENV variable in Dockerfile
tika_path = os.environ["TIKA_PATH"]
return TikaParser(tika_path=tika_path)
......@@ -2,12 +2,6 @@ from interface import Interface
class IEmbeddingsAsync(Interface):
async def chunks_and_embeddings_from_doc_fields(
self, title: str, abstract: str, text: str
):
"""Compute embeddings from document fields."""
pass
async def chunks_and_embeddings_from_text(self, text: str):
"""Compute embeddings from text"""
......
......@@ -2,7 +2,7 @@ from interface import Interface
class IFileParserAsync(Interface):
async def extract_text(self, content: bytes) -> str:
async def extract_text(self, content: bytes, output_format: str = "text") -> str:
"""Extracts text from file content."""
pass
......@@ -22,5 +22,6 @@ class SetaJwtAccessBearerCookie(JwtAccessBearerCookie):
payload = await self._get_payload(bearer, cookie)
if payload:
return JwtAuthorizationCredentials(payload["sub"], payload.get("jti", None))
return JwtDecodedToken(payload)
return None
from fastapi import status
from nlp import globals as g
from nlp.internal import file_validation, exceptions
async def extract_text_from_file(accept: str, file_content: bytes) -> tuple[str, str]:
"""Extracts text from file content."""
is_allowed, mime = file_validation.allowed_extension(file_content)
if not is_allowed:
raise exceptions.NLPException(
status_code=status.HTTP_403_FORBIDDEN,
message=f"Detected file extension not allowed: {mime}",
)
match accept:
case "text/html":
extract_format = "html"
case "text/xml":
extract_format = "xml"
case _:
extract_format = "text"
text = await g.file_parser.extract_text(
content=file_content, output_format=extract_format
)
return text, mime
......@@ -5,4 +5,4 @@ class ComputedEmbedding(BaseModel):
chunk: int
version: str
text: str
vector: list[float]
vector: list[float] | None
import logging
from fastapi import APIRouter, HTTPException, Depends, Security, status
from fastapi import APIRouter, HTTPException, Security, status
from nlp import models
from nlp.internal import sentence_transformer_embeddings as ste
from nlp import globals as g
from nlp.internal import seta_jwt
from nlp.configuration import configuration
from nlp.access_security import access_security
router = APIRouter(tags=["Embeddings"])
......@@ -14,14 +14,6 @@ internal_router = APIRouter(tags=["Internal"])
logger = logging.getLogger("nlp")
async def get_embeddings_client():
"""Creates embeddings client using SetaTransformer"""
return ste.SentenceTransformerEmbeddings(
use_workers=configuration.USE_EMBEDDINGS_WORKER
)
@router.post(
"/compute_embeddings",
summary="Embeddings from plain text.",
......@@ -29,7 +21,6 @@ async def get_embeddings_client():
)
async def embeddings_from_text(
text: models.ParserText,
client: ste.SentenceTransformerEmbeddings = Depends(get_embeddings_client),
decoded_token: seta_jwt.JwtDecodedToken = Security(access_security),
) -> models.Chunks:
"""Extracts embeddings from text."""
......@@ -41,12 +32,17 @@ async def embeddings_from_text(
)
try:
return await client.chunks_and_embeddings_from_text(text=text.text)
return await g.embeddings_transformer.chunks_and_embeddings_from_text(
text=text.text
)
except Exception as e:
logger.exception(e)
# pylint: disable-next=raise-missing-from
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error.",
)
@router.post(
......@@ -56,7 +52,6 @@ async def embeddings_from_text(
)
async def vector_from_text(
text: models.ParserText,
client: ste.SentenceTransformerEmbeddings = Depends(get_embeddings_client),
decoded_token: seta_jwt.JwtDecodedToken = Security(access_security),
) -> models.Vector:
"""Extracts embedding vector from text."""
......@@ -68,12 +63,15 @@ async def vector_from_text(
)
try:
return await client.embedding_vector_from_text(text.text)
return await g.embeddings_transformer.embedding_vector_from_text(text.text)
except Exception as e:
logger.exception(e)
# pylint: disable-next=raise-missing-from
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error.",
)
@internal_router.post(
......@@ -83,14 +81,40 @@ async def vector_from_text(
)
async def internal_embeddings_from_text(
text: models.ParserText,
client: ste.SentenceTransformerEmbeddings = Depends(get_embeddings_client),
) -> models.Chunks:
"""Extracts embeddings from text."""
try:
return await client.chunks_and_embeddings_from_text(text=text.text)
return await g.embeddings_transformer.chunks_and_embeddings_from_text(
text=text.text
)
except Exception as e:
logger.exception(e)
# pylint: disable-next=raise-missing-from
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error.",
)
@internal_router.post(
"/compute_embedding",
summary="Vector from text",
description="Given a plain text, related embeddings vector is provided.",
)
async def internal_vector_from_text(
text: models.ParserText,
) -> models.Vector:
"""Extracts embedding vector from text."""
try:
return await g.embeddings_transformer.embedding_vector_from_text(text.text)
except Exception as e:
logger.exception(e)
# pylint: disable-next=raise-missing-from
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error.",
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment