diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py index 76724b78ce4b8a6f1ad3dd519b23448ac8763bd2..f537df8658dd7ec2ec5503b5c0fc891393555645 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py @@ -2,7 +2,7 @@ from rest_framework import serializers from netbox.api.serializers import NetBoxModelSerializer, WritableNestedSerializer -from ..models import Mapping, HttpHeader, SamlConfig +from ..models import Mapping, HttpHeader, SamlConfig, clean_url class NestedMappingSerializer(WritableNestedSerializer): @@ -132,3 +132,23 @@ class MappingSerializer(NetBoxModelSerializer): "http_headers", "saml_config", ) + + def create(self, validated_data): + """Be sure that URL is cleaned""" + + instance = super().create(validated_data) + + instance.source = clean_url(instance.source) + instance.target = clean_url(instance.target) + + instance.save() + + return instance + + def update(self, instance, validated_data): + """Be sure that URL is cleaned""" + + validated_data["source"] = clean_url(validated_data["source"]) + validated_data["target"] = clean_url(validated_data["target"]) + + return super().update(instance, validated_data) diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py index 2f64d8141350c3182e69b182a0336fd30b9662a3..30d19f4ae44b79eec89a13d4d4a387c1559f793a 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py @@ -1,8 +1,11 @@ """Models definitions""" +from typing import Any +from urllib.parse import urlparse from django.core.exceptions import ValidationError from django.conf import settings from django.db import models +from django.db.models import Model from django.urls import reverse from django.core.validators import URLValidator, MaxValueValidator, MinValueValidator from django.contrib.postgres.fields.array import ArrayField @@ -13,6 +16,33 @@ URL_MAX_SIZE = 2000 DEFAULT_SORRY_PAGE = settings.PLUGINS_CONFIG["netbox_rps_plugin"]["default_sorry_page"] +def clean_url(raw_url): + """Clean an URL""" + + o = urlparse(raw_url) + + credential = ":".join( + tuple(filter(lambda item: item is not None, (o.username, o.password))) + ) + + hostname = o.hostname if o.port is None else ":".join((o.hostname, str(o.port))) + + return ( + o._replace(netloc=hostname) + if len(credential) == 0 + else o._replace(netloc="@".join((credential, hostname))) + ).geturl() + + +class FilteredURLField(models.URLField): + """URLField definition class""" + + def clean(self, value: Any, model_instance: Model | None) -> Any: + """Clean Field value""" + + return clean_url(super().clean(value, model_instance)) + + class AuthenticationChoices(ChoiceSet): """Authentication choices definition class""" @@ -60,18 +90,18 @@ def default_protocol(): class Mapping(NetBoxModel): """Mapping definition class""" - source = models.CharField( + source = FilteredURLField( max_length=URL_MAX_SIZE, blank=False, verbose_name="Source", - validators=[URLValidator(message="It must be a url")], + validators=[URLValidator(schemes=["http", "https"])], unique=True, ) - target = models.CharField( + target = FilteredURLField( max_length=URL_MAX_SIZE, blank=False, verbose_name="Target", - validators=[URLValidator(message="It must be a url")], + validators=[URLValidator(schemes=["http", "https"])], ) authentication = models.CharField( max_length=30, @@ -80,11 +110,11 @@ class Mapping(NetBoxModel): blank=False, verbose_name="Auth", ) - testingpage = models.CharField( + testingpage = models.URLField( max_length=URL_MAX_SIZE, blank=True, null=True, - validators=[URLValidator(message="It must be a url")], + validators=[URLValidator(schemes=["http", "https"])], ) webdav = models.BooleanField( default=False, @@ -104,11 +134,11 @@ class Mapping(NetBoxModel): client_max_body_size = models.IntegerField( default=1, validators=[MinValueValidator(1), MaxValueValidator(255)] ) - sorry_page = models.CharField( + sorry_page = models.URLField( max_length=URL_MAX_SIZE, blank=False, verbose_name="Sorry Page", - validators=[URLValidator(message="It must be a url")], + validators=[URLValidator(schemes=["http", "https"])], default=DEFAULT_SORRY_PAGE, ) extra_protocols = ArrayField( diff --git a/plugins/netbox-rps-plugin/tests/e2e/test_mapping_unique.py b/plugins/netbox-rps-plugin/tests/e2e/test_mapping_unique.py index e0c17cd8b77c0ce502d297a98f95839bc03d0302..72f84e380e5dfd286500ba207369a052fae92626 100644 --- a/plugins/netbox-rps-plugin/tests/e2e/test_mapping_unique.py +++ b/plugins/netbox-rps-plugin/tests/e2e/test_mapping_unique.py @@ -56,8 +56,8 @@ class TestMappingUnique(Base): response = requests.post( url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", json={ - "source": "https://truc8.com/api", - "target": "https://truc8.com/api", + "source": "https://truc9.com/api", + "target": "https://truc9.com/api", "authentication": "none", "testingpage": None, }, @@ -71,6 +71,152 @@ class TestMappingUnique(Base): b'{"target":["Target URL cannot be equal to source URL."]}', ) + def test_that_mapping_is_case_sensitive_unique(self) -> None: + """Test that mapping is case sensitive unique""" + + response = requests.post( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json={ + "source": "https://truc10.com/api", + "target": "http://10.10.10.10:1888/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 201) + + self.mapping_id = json.loads(response.content)["id"] + + response = requests.post( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json={ + "source": "HTTPS://truc10.com/api", + "target": "http://10.10.10.10:1888/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.content, b'{"source":["Mapping with this Source already exists."]}' + ) + + response = requests.post( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json={ + "source": "https://TRUC10.COM/api", + "target": "http://10.10.10.10:1888/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.content, b'{"source":["Mapping with this Source already exists."]}' + ) + + response = requests.post( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json={ + "source": "HTTPS://TRUC10.com/API", + "target": "HTTP://Toto.10.com:1888/aPi", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 201) + + content = json.loads(response.content) + + self.assertEqual(content["source"], "https://truc10.com/API") + self.assertEqual(content["target"], "http://toto.10.com:1888/aPi") + + response = requests.get( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/{content['id']}/", + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content) + + self.assertEqual(content["source"], "https://truc10.com/API") + self.assertEqual(content["target"], "http://toto.10.com:1888/aPi") + + requests.delete( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json=[{"id": content["id"]}], + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + def test_that_mapping_update_is_case_sensitive_unique(self) -> None: + """Test that mapping update is case sensitive unique""" + response = requests.post( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json={ + "source": "HTTPS://TRUC11.com/API", + "target": "HTTP://Toto.11.com:1888/aPi", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 201) + + content = json.loads(response.content) + + response = requests.patch( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/{content['id']}/", + json={ + "source": "HTTPS://MUCHE11.com/API", + "target": "HTTP://Titi.11.com:1888/aPi", + }, + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content) + + self.assertEqual(content["source"], "https://muche11.com/API") + self.assertEqual(content["target"], "http://titi.11.com:1888/aPi") + + response = requests.get( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/{content['id']}/", + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) + + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content) + + self.assertEqual(content["source"], "https://muche11.com/API") + self.assertEqual(content["target"], "http://titi.11.com:1888/aPi") + + requests.delete( + url=f"http://{HOST}:{PORT}/api/plugins/rps/mapping/", + json=[{"id": content["id"]}], + headers={"Authorization": f"Token {API_KEY}"}, + timeout=5, + ) if __name__ == "__main__": unittest.main()