Code development platform for open source projects from the European Union institutions

Skip to content
Snippets Groups Projects
Verified Commit e05abb51 authored by Vincent SIMONIN's avatar Vincent SIMONIN
Browse files

:bug: Fixed source and target URL are not cleaned before storage

parent f6ee6220
No related branches found
No related tags found
1 merge request!57🐛 Fixed source and target URL are not cleaned before storage
Pipeline #119200 passed
......@@ -2,7 +2,7 @@
from rest_framework import serializers
from netbox.api.serializers import NetBoxModelSerializer, WritableNestedSerializer
from ..models import Mapping, HttpHeader, SamlConfig
from ..models import Mapping, HttpHeader, SamlConfig, clean_url
class NestedMappingSerializer(WritableNestedSerializer):
......@@ -132,3 +132,23 @@ class MappingSerializer(NetBoxModelSerializer):
"http_headers",
"saml_config",
)
def create(self, validated_data):
"""Be sure that URL is cleaned"""
instance = super().create(validated_data)
instance.source = clean_url(instance.source)
instance.target = clean_url(instance.target)
instance.save()
return instance
def update(self, instance, validated_data):
"""Be sure that URL is cleaned"""
validated_data["source"] = clean_url(validated_data["source"])
validated_data["target"] = clean_url(validated_data["target"])
return super().update(instance, validated_data)
"""Models definitions"""
from typing import Any
from urllib.parse import urlparse
from django.core.exceptions import ValidationError
from django.conf import settings
from django.db import models
from django.db.models import Model
from django.urls import reverse
from django.core.validators import URLValidator, MaxValueValidator, MinValueValidator
from django.contrib.postgres.fields.array import ArrayField
......@@ -13,6 +16,33 @@ URL_MAX_SIZE = 2000
DEFAULT_SORRY_PAGE = settings.PLUGINS_CONFIG["netbox_rps_plugin"]["default_sorry_page"]
def clean_url(raw_url):
"""Clean an URL"""
o = urlparse(raw_url)
credential = ":".join(
tuple(filter(lambda item: item is not None, (o.username, o.password)))
)
hostname = o.hostname if o.port is None else ":".join((o.hostname, str(o.port)))
return (
o._replace(netloc=hostname)
if len(credential) == 0
else o._replace(netloc="@".join((credential, hostname)))
).geturl()
class FilteredURLField(models.URLField):
"""URLField definition class"""
def clean(self, value: Any, model_instance: Model | None) -> Any:
"""Clean Field value"""
return clean_url(super().clean(value, model_instance))
class AuthenticationChoices(ChoiceSet):
"""Authentication choices definition class"""
......@@ -60,18 +90,18 @@ def default_protocol():
class Mapping(NetBoxModel):
"""Mapping definition class"""
source = models.CharField(
source = FilteredURLField(
max_length=URL_MAX_SIZE,
blank=False,
verbose_name="Source",
validators=[URLValidator(message="It must be a url")],
validators=[URLValidator(schemes=["http", "https"])],
unique=True,
)
target = models.CharField(
target = FilteredURLField(
max_length=URL_MAX_SIZE,
blank=False,
verbose_name="Target",
validators=[URLValidator(message="It must be a url")],
validators=[URLValidator(schemes=["http", "https"])],
)
authentication = models.CharField(
max_length=30,
......@@ -80,11 +110,11 @@ class Mapping(NetBoxModel):
blank=False,
verbose_name="Auth",
)
testingpage = models.CharField(
testingpage = models.URLField(
max_length=URL_MAX_SIZE,
blank=True,
null=True,
validators=[URLValidator(message="It must be a url")],
validators=[URLValidator(schemes=["http", "https"])],
)
webdav = models.BooleanField(
default=False,
......@@ -104,11 +134,11 @@ class Mapping(NetBoxModel):
client_max_body_size = models.IntegerField(
default=1, validators=[MinValueValidator(1), MaxValueValidator(255)]
)
sorry_page = models.CharField(
sorry_page = models.URLField(
max_length=URL_MAX_SIZE,
blank=False,
verbose_name="Sorry Page",
validators=[URLValidator(message="It must be a url")],
validators=[URLValidator(schemes=["http", "https"])],
default=DEFAULT_SORRY_PAGE,
)
extra_protocols = ArrayField(
......
......@@ -56,8 +56,8 @@ class TestMappingUnique(Base):
response = requests.post(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json={
"source": "https://truc8.com/api",
"target": "https://truc8.com/api",
"source": "https://truc9.com/api",
"target": "https://truc9.com/api",
"authentication": "none",
"testingpage": None,
},
......@@ -71,6 +71,152 @@ class TestMappingUnique(Base):
b'{"target":["Target URL cannot be equal to source URL."]}',
)
def test_that_mapping_is_case_sensitive_unique(self) -> None:
"""Test that mapping is case sensitive unique"""
response = requests.post(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json={
"source": "https://truc10.com/api",
"target": "http://10.10.10.10:1888/api",
"authentication": "none",
"testingpage": None,
},
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 201)
self.mapping_id = json.loads(response.content)["id"]
response = requests.post(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json={
"source": "HTTPS://truc10.com/api",
"target": "http://10.10.10.10:1888/api",
"authentication": "none",
"testingpage": None,
},
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 400)
self.assertEqual(
response.content, b'{"source":["Mapping with this Source already exists."]}'
)
response = requests.post(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json={
"source": "https://TRUC10.COM/api",
"target": "http://10.10.10.10:1888/api",
"authentication": "none",
"testingpage": None,
},
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 400)
self.assertEqual(
response.content, b'{"source":["Mapping with this Source already exists."]}'
)
response = requests.post(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json={
"source": "HTTPS://TRUC10.com/API",
"target": "HTTP://Toto.10.com:1888/aPi",
"authentication": "none",
"testingpage": None,
},
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 201)
content = json.loads(response.content)
self.assertEqual(content["source"], "https://truc10.com/API")
self.assertEqual(content["target"], "http://toto.10.com:1888/aPi")
response = requests.get(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/{content['id']}/",
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
self.assertEqual(content["source"], "https://truc10.com/API")
self.assertEqual(content["target"], "http://toto.10.com:1888/aPi")
requests.delete(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json=[{"id": content["id"]}],
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
def test_that_mapping_update_is_case_sensitive_unique(self) -> None:
"""Test that mapping update is case sensitive unique"""
response = requests.post(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json={
"source": "HTTPS://TRUC11.com/API",
"target": "HTTP://Toto.11.com:1888/aPi",
"authentication": "none",
"testingpage": None,
},
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 201)
content = json.loads(response.content)
response = requests.patch(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/{content['id']}/",
json={
"source": "HTTPS://MUCHE11.com/API",
"target": "HTTP://Titi.11.com:1888/aPi",
},
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
self.assertEqual(content["source"], "https://muche11.com/API")
self.assertEqual(content["target"], "http://titi.11.com:1888/aPi")
response = requests.get(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/{content['id']}/",
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
self.assertEqual(content["source"], "https://muche11.com/API")
self.assertEqual(content["target"], "http://titi.11.com:1888/aPi")
requests.delete(
url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/",
json=[{"id": content["id"]}],
headers={"Authorization": f"Token {API_KEY}"},
timeout=5,
)
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment