Code development platform for open source projects from the European Union institutions

Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ai4xml/playground
1 result
Show changes
Commits on Source (2)
......@@ -79,10 +79,7 @@ def extract_preface(content):
return None
def extract_preamble_from_text(content, file_path):
import re
print("processing ", file_path)
if "32019R0537" in file_path:
print("just breakpoint!")
# Pattern to capture the preamble components
pattern = r"((?:THE EUROPEAN COMMISSION,|THE COUNCIL OF THE EUROPEAN UNION,|THE EUROPEAN PARLIAMENT AND THE COUNCIL OF THE EUROPEAN UNION,|THE EUROPEAN PARLIAMENT AND THE COUNCIL OF THE EUROPEAN UNION|THE EUROPEAN COMMISSION|THE GOVERNING COUNCIL OF THE EUROPEAN CENTRAL BANK,|THE COUNCIL OF THE EUROPEAN UNION|THE COUNCIL OF THE OPEAN UNION,|THE EUROPEAN PARLIAMENT,|THE EUROPEAN COMMUNITIES,|THE EUROPEAN PARLIAMENT AND THE COUNCIL,|The GOVERNING COUNCIL OF THE EUROPEAN CENTRAL BANK,|THE MANAGEMENT BOARD,).*?)(?=HAS ADOPTED THIS REGULATION:|HAVE ADOPTED THIS REGULATION:|HAS ADOPTED THIS DIRECTIVE:|HAVE ADOPTED THIS DIRECTIVE:|HAS ADOPTED THIS DECISION:|HAVE ADOPTED THIS DECISION:|HAS ADOPTED THE FOLLOWING REGULATION:)"
......@@ -101,16 +98,23 @@ def extract_preamble_from_text(content, file_path):
citations_part, recitals_part = extract_citations_and_recitals(preamble)
citations = extract_citations(citations_part)
if citations and citations[0] == formula:
citations = citations[1:] # Remove the formula from citations
else:
print("formula is not the first citation in the citations part of file: ", file_path)
print("formula: ", formula)
print("citations: ", citations)
recitals = extract_recitals(recitals_part)
# Extract preamble final using a separate function
preamble_final = extract_preamble_final(content)
return {
"formula": formula.strip(),
"citations": citations,
"recitals": recitals,
"preamble_final": preamble_final.strip()
"preamble_final": preamble_final.strip(),
}
else:
print("No preamble found in the content of file: ", file_path)
......@@ -143,7 +147,6 @@ def extract_formula(preamble):
return formula_match.group(1) if formula_match else None
def extract_citations_and_recitals(preamble):
import re
# Split the preamble into citations and recitals using 'Whereas' as the separator
splitters = ['Whereas:', 'WHEREAS:', 'Whereas,', 'Whereas','whereas:']
......@@ -166,22 +169,36 @@ def extract_citations(citations_part):
return []
def extract_recitals(recitals_text):
import re
recitals = {}
duplicates = []
# Corrected pattern to match actual new lines and the digit in parentheses
pattern = r'\n\n\\\((\d+)\\\)'
pattern = r'\n\n(?:\\\((\d+)\\\)|\((\d+)\))'
# Find all matches of the pattern
matches = list(re.finditer(pattern, recitals_text))
for i in range(len(matches)):
start = matches[i].end()
start = matches[i].start()
end = matches[i+1].start() if i+1 < len(matches) else len(recitals_text)
recital_number = int(matches[i].group(1))
recital_number = int(matches[i].group(1) or matches[i].group(2))
recital_text = recitals_text[start:end].strip()
recitals[recital_number] = recital_text
# THERE is bug in md conveter, some documents have recital incorrect numbers, see doc : 32017R1270
if recital_number in recitals:
duplicates.append(recital_number)
# Extract the number value from the beginning of the recital text
number_match = re.match(r'(?:\\\((\d+)\\\)|\((\d+)\))', recital_text)
if number_match:
corrected_number = max(recitals.keys()) + 1
recital_text = re.sub(r'(?:\\\(\d+\\\)|\(\d+\))', f'({corrected_number})', recital_text)
recitals[corrected_number] = recital_text
else:
print(f"Recital with no number found: {recital_text}")
else:
recitals[recital_number] = recital_text
if duplicates:
print(f"Recitals with duplicate numbers: {', '.join(map(str, duplicates))}")
return recitals
......@@ -211,11 +228,16 @@ def extract_preamble_from_xml(xml_content):
num = num_element.text if num_element is not None else ""
text = " ".join(p.text for p in recital.findall(".//{*}p") if p.text)
recital_texts.append(f"{num} {text}")
# Extract preamble_final
preamble_final = preamble.find(".//{*}block[@name='preamble.final']")
preamble_final_text = preamble_final.text if preamble_final is not None else ""
return {
"formula": formula_text,
"citations": citation_texts,
"recitals": recital_texts
"recitals": recital_texts,
"preamble_final": preamble_final_text
}
from tqdm import tqdm
......@@ -254,10 +276,11 @@ def analyse_preamble_dataset_from_xml(xml_files, output_file):
last_tag_content[xml_file] = extract_last_tag_or_content(preamble_data['recitals'], xml_content)
preamble_dataset.append({
'document': xml_file,
'celex_id': os.path.splitext(os.path.basename(xml_file))[0],
'formula': preamble_data['formula'],
'citations': preamble_data['citations'],
'recitals': preamble_data['recitals']
'recitals': preamble_data['recitals'],
'preamble_final': preamble_data['preamble_final']
})
# Count unique tags in the preamble content
......@@ -334,27 +357,54 @@ def extract_last_tag_or_content(recitals, xml_content):
return "No content found between the last recital and the preamble end tag."
def create_preamble_dataset_from_content(content_files, output_file):
def create_preamble_dataset_from_md(content_files, output_file):
preamble_dataset = []
for file_path in content_files:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
filename_no_ext = os.path.splitext(os.path.basename(file_path))[0]
if filename_no_ext == "32020R1080":
print(content)
# Check if the content starts with an image reference
if content.strip().startswith("![]("):
print(f"Ignoring file {file_path} as it starts with an image reference.")
continue
preamble_data = extract_preamble_from_text(content, file_path)
if preamble_data:
# Remove the file extension from the filename
filename_no_ext = os.path.splitext(os.path.basename(file_path))[0]
preamble_dataset.append({
'celex_id': filename_no_ext,
'formula': preamble_data['formula'],
'citations': preamble_data['citations'],
'recitals': preamble_data['recitals'],
'final_phrase': preamble_data['preamble_final']
'preamble_final': preamble_data['preamble_final']
})
with open(output_file, 'w', encoding='utf-8') as jsonfile:
json.dump(preamble_dataset, jsonfile, indent=2)
def create_preamble_dataset_from_xml(xml_directory, output_file):
preamble_dataset = []
for filename in os.listdir(xml_directory):
if filename.endswith('.xml'):
file_path = os.path.join(xml_directory, filename)
celex_id = os.path.splitext(filename)[0] # Remove .xml extension
with open(file_path, 'r', encoding='utf-8') as file:
xml_content = file.read()
preamble_data = extract_preamble_from_xml(xml_content)
if preamble_data:
preamble_dataset.append({
'celex_id': celex_id,
'formula': preamble_data['formula'],
'citations': preamble_data['citations'],
'recitals': preamble_data['recitals']
})
with open(output_file, 'w', encoding='utf-8') as jsonfile:
json.dump(preamble_dataset, jsonfile, indent=2)
def remove_namespaces(xml_element):
......@@ -682,4 +732,244 @@ def validate_akn(xml_content_or_path, schema, is_content=True):
xml_doc = etree.fromstring(xml_content_or_path)
else:
xml_doc = etree.parse(xml_content_or_path)
return schema.validate(xml_doc), schema.error_log
\ No newline at end of file
return schema.validate(xml_doc), schema.error_log
# ------- Similarity functions
import string
from fuzzywuzzy import fuzz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from difflib import SequenceMatcher
def calculate_levenshtein_score(text1, text2):
return fuzz.ratio(text1, text2) / 100
def calculate_cosine_similarity(text1, text2):
vectorizer = TfidfVectorizer()
results = vectorizer.fit_transform([text1, text2])
return cosine_similarity(results[0], results[1])[0][0]
def calculate_sequence_matcher_score(text1, text2):
return SequenceMatcher(lambda x: x in string.punctuation + string.whitespace, text1, text2).ratio()
def jaccard_similarity(s1, s2):
set1 = set(s1.lower().split())
set2 = set(s2.lower().split())
return len(set1.intersection(set2)) / len(set1.union(set2))
# ---------------
def obsfucate(var):
return f"{var[:4]}{'*' * (len(var) - 4)}"
def check_env_vars():
aws_vars = ['OPENAI_API_KEY', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN', 'AWS_REGION_NAME']
for var in aws_vars:
value = os.environ.get(var)
if value:
print(f"{var} is set. Value: {obsfucate(value)}")
else:
print(f"{var} is not set.")
import dspy
def get_llms(cache=True):
llms = {
"us.meta.llama3-2-1b-instruct-v1:0": dspy.LM(
model="bedrock/us.meta.llama3-2-1b-instruct-v1:0",cache=cache
),
"us.meta.llama3-2-3b-instruct-v1:0": dspy.LM(
model="bedrock/us.meta.llama3-2-3b-instruct-v1:0",cache=cache
),
# "us.meta.llama3-2-11b-instruct-v1:0": dspy.LM(
# model="bedrock/us.meta.llama3-2-11b-instruct-v1:0",
# ),
# "meta.llama3-1-70b-instruct-v1:0": dspy.LM(
# model="bedrock/meta.llama3-1-70b-instruct-v1:0",
# ),
# "meta.llama3-1-8b-instruct-v1:0": dspy.LM(
# model="bedrock/meta.llama3-1-8b-instruct-v1:0",
# ),
"mistral.mistral-7b-instruct-v0:2": dspy.LM(
model="bedrock/mistral.mistral-7b-instruct-v0:2",cache=cache
),
"mistral.mixtral-8x7b-instruct-v0:1": dspy.LM(
model="bedrock/mistral.mixtral-8x7b-instruct-v0:1",cache=cache
),
'mistral.mistral-large-2402-v1:0': dspy.LM(
model="bedrock/mistral.mistral-large-2402-v1:0",cache=cache
),
'mistral.mistral-small-2402-v1:0': dspy.LM(
model="bedrock/mistral.mistral-small-2402-v1:0",cache=cache
),
# "gpt4o-mini" : dspy.LM('openai/gpt-4o-mini'),
}
return llms
# --------------- dataset
import pandas as pd
from dspy.datasets.dataset import Dataset
class AKNDataset(Dataset):
def __init__(self, file_path, similarity_threshold=0.95, train_size=700, dev_size=100, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
df = pd.read_json(file_path)
filtered_df = df[df['similarity'] > similarity_threshold]
self._train = filtered_df.iloc[:train_size].to_dict(orient='records')
self._dev = filtered_df.iloc[train_size:train_size+dev_size].to_dict(orient='records')
# --------------- metrics
from rouge_score import rouge_scorer
import xml.etree.ElementTree as ET
import lxml.etree as etree
import os
def insert_xml_into_akn_empty(xml_content):
try:
# Determine the path to the empty AKN file based on the current working directory
current_dir = os.path.dirname(os.path.abspath(__file__))
akn_empty_path = os.path.join(current_dir, 'data/akn_files/akn_empty.xml')
# Load the empty AKN file
tree = etree.parse(akn_empty_path)
root = tree.getroot()
# Parse the generated XML content
generated_xml = etree.fromstring(xml_content)
# Find the meta element
meta_element = root.find('.//{http://docs.oasis-open.org/legaldocml/ns/akn/3.0}meta')
if meta_element is not None:
# Insert the generated XML after the meta element
meta_element.addnext(generated_xml)
else:
print("Meta element not found in the empty AKN file.")
return None, "Meta element not found in the empty AKN file."
# Return the modified AKN XML as a string
return etree.tostring(root, encoding='unicode'), None
except Exception as e:
return None, str(e)
import logging
# Configure logging
def configure_program_compiling_logging(model_name):
log_filename = f'{model_name}_program_compiling.log'
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
return logging.getLogger()
def validation_metric(reference, prediction, trace=None):
# Insert the generated XML into the empty AKN file
logging.info(f"reference: {reference['celex_id']}")
reference_xml = reference['xml']
reference_text = reference['text']
generated_xml = prediction['xml']
akn_xml, error = insert_xml_into_akn_empty(generated_xml)
if akn_xml is None:
logging.error(error)
return False
# Validate XML structure using validate_akn function
current_dir = os.path.dirname(os.path.abspath(__file__))
schema_file_path = os.path.join(current_dir, 'data/akn_files/schema/akomantoso30.xsd')
schema = etree.XMLSchema(file=schema_file_path)
is_valid, error_log = validate_akn(akn_xml, schema)
if not is_valid:
logging.error("XML is not valid according to AKN schema")
logging.error(error_log)
return False # Return false if XML is not valid
scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
scores = scorer.score(reference_xml, generated_xml)
# Extracting the F1 scores from the results
rouge1_f1 = scores['rouge1'].fmeasure
rougeL_f1 = scores['rougeL'].fmeasure
logging.info("rougeL_f1: %s", rougeL_f1)
if rougeL_f1 <= 0.9:
logging.warning(f"Low ROUGE-L F1 Score Detected: {rougeL_f1}")
logging.warning("-" * 10)
logging.warning("Reference Text:\n%s", reference_text)
logging.warning("-" * 10)
logging.warning("Reference XML:\n%s", reference_xml)
logging.warning("-" * 10)
logging.warning("Predicted XML:\n%s", generated_xml)
logging.warning("-" * 100)
# Setting a threshold for ROUGE-L
return rougeL_f1 >= 0.96 # Threshold can be adjusted as needed
def validation_metric_p2(reference, prediction, trace=None):
# Insert the generated XML into the empty AKN file
logging.info(f"reference: {reference['celex_id']}")
reference_xml = reference['xml']
# reference_text = reference['text']
generated_xml = prediction['xml']
akn_xml, error = insert_xml_into_akn_empty(generated_xml)
if akn_xml is None:
logging.error(error)
return False
# Validate XML structure using validate_akn function
current_dir = os.path.dirname(os.path.abspath(__file__))
schema_file_path = os.path.join(current_dir, 'data/akn_files/schema/akomantoso30.xsd')
schema = etree.XMLSchema(file=schema_file_path)
is_valid, error_log = validate_akn(akn_xml, schema)
if not is_valid:
logging.error("XML is not valid according to AKN schema")
logging.error(error_log)
return False # Return false if XML is not valid
scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
scores = scorer.score(reference_xml, generated_xml)
# Extracting the F1 scores from the results
rouge1_f1 = scores['rouge1'].fmeasure
rougeL_f1 = scores['rougeL'].fmeasure
logging.info("rougeL_f1: %s", rougeL_f1)
if rougeL_f1 <= 0.9:
logging.warning(f"Low ROUGE-L F1 Score Detected: {rougeL_f1}")
logging.warning("-" * 10)
# logging.warning("Reference Text:\n%s", reference_text)
logging.warning("-" * 10)
logging.warning("Reference XML:\n%s", reference_xml)
logging.warning("-" * 10)
logging.warning("Predicted XML:\n%s", generated_xml)
logging.warning("-" * 100)
# Setting a threshold for ROUGE-L
return rougeL_f1 >= 0.96
# ---------------
def setup_phoenix():
import phoenix as px
px.launch_app()
from openinference.instrumentation.dspy import DSPyInstrumentor
from openinference.instrumentation.litellm import LiteLLMInstrumentor
from phoenix.otel import register
register(endpoint="http://127.0.0.1:6006/v1/traces")
DSPyInstrumentor().instrument(skip_dep_check=True)
LiteLLMInstrumentor().instrument(skip_dep_check=True)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.