From f69324414919ab6847663530bde0a1ba4cd3a09a Mon Sep 17 00:00:00 2001 From: Vincent Simonin <vincent.simonin@ext.ec.europa.eu> Date: Wed, 26 Jul 2023 17:12:27 +0200 Subject: [PATCH] Add new SAML Configuration for a Mapping Mapping can now have SAML Coniguration: * Only API was implemented --- .../netbox_rps_plugin/__init__.py | 2 +- .../netbox_rps_plugin/api/serializers.py | 96 ++++-- .../netbox_rps_plugin/api/urls.py | 1 + .../netbox_rps_plugin/api/views.py | 17 +- .../migrations/0007_saml_config.py | 50 +++ .../netbox_rps_plugin/models.py | 98 +++--- plugins/netbox-rps-plugin/setup.py | 2 +- .../tests/e2e/test_saml_config.py | 315 ++++++++++++++++++ 8 files changed, 510 insertions(+), 71 deletions(-) create mode 100644 plugins/netbox-rps-plugin/netbox_rps_plugin/migrations/0007_saml_config.py create mode 100644 plugins/netbox-rps-plugin/tests/e2e/test_saml_config.py diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/__init__.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/__init__.py index cad9ec1..14b8baf 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/__init__.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/__init__.py @@ -5,7 +5,7 @@ class NetBoxRpsConfig(PluginConfig): name = 'netbox_rps_plugin' verbose_name = 'NetBox RPS' description = 'A Netbox plugin to add RPS resources' - version = '0.8.5' + version = '0.9.0' author = "Vincent Simonin" author_email = "vincent.simonin@ext.ec.europa.eu" base_url = 'rps' diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py index 404b71c..69f21c8 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/serializers.py @@ -1,61 +1,113 @@ from rest_framework import serializers -from ..models import Mapping, HttpHeader +from ..models import Mapping, HttpHeader, SamlConfig from netbox.api.serializers import NetBoxModelSerializer, WritableNestedSerializer +from pprint import pp class NestedMappingSerializer(WritableNestedSerializer): url = serializers.HyperlinkedIdentityField( - view_name='plugins-api:netbox_rps_plugin-api:mapping-detail' + view_name="plugins-api:netbox_rps_plugin-api:mapping-detail" ) class Meta: model = Mapping - fields = ('id', 'url', 'display') + fields = ("id", "url", "display") + + +class NestedSamlConfigSerializer(WritableNestedSerializer): + url = url = serializers.HyperlinkedIdentityField( + view_name="plugins-api:netbox_rps_plugin-api:samlconfig-detail" + ) + + class Meta: + model = SamlConfig + fields = ( + "id", + "url", + "acs_url", + "logout_url", + "force_nauth", + ) + + +class SamlConfigSerializer(NetBoxModelSerializer): + url = serializers.HyperlinkedIdentityField( + view_name="plugins-api:netbox_rps_plugin-api:samlconfig-detail" + ) + + mapping = NestedMappingSerializer() + + class Meta: + model = SamlConfig + fields = ( + "id", + "url", + "acs_url", + "logout_url", + "force_nauth", + "mapping", + "custom_fields", + "created", + "last_updated", + "tags", + ) class HttpHeaderSerializer(NetBoxModelSerializer): url = serializers.HyperlinkedIdentityField( - view_name='plugins-api:netbox_rps_plugin-api:httpheader-detail' + view_name="plugins-api:netbox_rps_plugin-api:httpheader-detail" ) mapping = NestedMappingSerializer() class Meta: model = HttpHeader - fields = ('id', 'url', 'name', 'value', 'apply_to', 'mapping', 'custom_fields', 'created', 'last_updated', 'tags') + fields = ( + "id", + "url", + "name", + "value", + "apply_to", + "mapping", + "custom_fields", + "created", + "last_updated", + "tags", + ) class NestedHttpHeaderSerializer(WritableNestedSerializer): url = serializers.HyperlinkedIdentityField( - view_name='plugins-api:netbox_rps_plugin-api:httpheader-detail' + view_name="plugins-api:netbox_rps_plugin-api:httpheader-detail" ) class Meta: model = HttpHeader - fields = ('id', 'url', 'name', 'value', 'apply_to') + fields = ("id", "url", "name", "value", "apply_to") class MappingSerializer(NetBoxModelSerializer): url = serializers.HyperlinkedIdentityField( - view_name='plugins-api:netbox_rps_plugin-api:mapping-detail' + view_name="plugins-api:netbox_rps_plugin-api:mapping-detail" ) http_headers = NestedHttpHeaderSerializer(many=True, read_only=True) + saml_config = NestedSamlConfigSerializer(read_only=True) class Meta: model = Mapping fields = ( - 'id', - 'url', - 'display', - 'source', - 'target', - 'authentication', - 'testingpage', - 'webdav', - 'Comment', - 'custom_fields', - 'created', - 'last_updated', - 'tags', - 'http_headers' + "id", + "url", + "source", + "target", + "authentication", + "testingpage", + "webdav", + "Comment", + "custom_fields", + "created", + "last_updated", + "tags", + "http_headers", + "saml_config", ) diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/urls.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/urls.py index de785ed..73ce7fd 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/urls.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/urls.py @@ -6,5 +6,6 @@ app_name = 'netbox_rps_plugin' router = NetBoxRouter() router.register('mapping', views.MappingViewSet) router.register('http_header', views.HttpHeaderViewSet) +router.register('saml_config', views.SamlConfigViewSet) urlpatterns = router.urls diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/views.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/views.py index 24f3840..0f70113 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/api/views.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/api/views.py @@ -1,18 +1,23 @@ from netbox.api.viewsets import NetBoxModelViewSet from .. import filtersets, models from utilities.utils import count_related -from .serializers import MappingSerializer, HttpHeaderSerializer -from django.db.models import Count, OuterRef, Subquery +from .serializers import MappingSerializer, HttpHeaderSerializer, SamlConfigSerializer class MappingViewSet(NetBoxModelViewSet): - queryset = models.Mapping.objects.prefetch_related('http_headers', 'tags').all() + queryset = models.Mapping.objects.prefetch_related("http_headers", "tags").all() serializer_class = MappingSerializer filterset_class = filtersets.MappingFilterSet - http_method_names = ['get', 'post', 'patch', 'delete'] + http_method_names = ["get", "post", "patch", "delete"] class HttpHeaderViewSet(NetBoxModelViewSet): - queryset = models.HttpHeader.objects.prefetch_related('mapping','tags').all() + queryset = models.HttpHeader.objects.prefetch_related("mapping", "tags").all() serializer_class = HttpHeaderSerializer - http_method_names = ['get', 'post', 'patch', 'delete'] + http_method_names = ["get", "post", "patch", "delete"] + + +class SamlConfigViewSet(NetBoxModelViewSet): + queryset = models.SamlConfig.objects.prefetch_related("mapping", "tags").all() + serializer_class = SamlConfigSerializer + http_method_names = ["get", "post", "patch", "delete"] diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/migrations/0007_saml_config.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/migrations/0007_saml_config.py new file mode 100644 index 0000000..4414f85 --- /dev/null +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/migrations/0007_saml_config.py @@ -0,0 +1,50 @@ +from django.db import migrations, models +import utilities.json +import taggit.managers +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [("netbox_rps_plugin", "0006_url_max_length")] + + operations = [ + migrations.CreateModel( + name="SamlConfig", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, primary_key=True, serialize=False + ), + ), + ("created", models.DateTimeField(auto_now_add=True, null=True)), + ("last_updated", models.DateTimeField(auto_now=True, null=True)), + ( + "custom_field_data", + models.JSONField( + blank=True, + default=dict, + encoder=utilities.json.CustomFieldJSONEncoder, + ), + ), + ( + "tags", + taggit.managers.TaggableManager( + through="extras.TaggedItem", to="extras.Tag" + ), + ), + ("acs_url", models.CharField(blank=False, null=False, max_length=2000)), + ( + "logout_url", + models.CharField(blank=False, null=False, max_length=2000), + ), + ("force_nauth", models.BooleanField(default=False)), + ('mapping', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='saml_config', to='netbox_rps_plugin.mapping')), + ], + ), + #migrations.AddField( + # model_name="mapping", + # name="saml_config", + # field=models.OneToOneField(on_delete=models.SET_NULL, related_name="mapping", to="netbox_rps_plugin.samlconfig", null=True, blank=True), + #), + ] diff --git a/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py b/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py index b4c7c43..c09a0bb 100644 --- a/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py +++ b/plugins/netbox-rps-plugin/netbox_rps_plugin/models.py @@ -5,105 +5,121 @@ from utilities.choices import ChoiceSet from django.urls import reverse from django.core.validators import URLValidator +URL_MAX_SIZE = 2000 + class AuthenticationChoices(ChoiceSet): - key = 'Mapping.authentication' + key = "Mapping.authentication" - DEFAULT_VALUE = 'none' + DEFAULT_VALUE = "none" CHOICES = [ - ('none', 'None', 'dark'), - ('ldap', 'Ldap', 'blue'), - ('ecas', 'Ecas', 'blue'), + ("none", "None", "dark"), + ("ldap", "Ldap", "blue"), + ("ecas", "Ecas", "blue"), ] class ApplyToChoices(ChoiceSet): - key = 'HttpHeader.applyTo' + key = "HttpHeader.applyTo" - DEFAULT_VALUE = 'request' + DEFAULT_VALUE = "request" CHOICES = [ - ('request', 'Request', 'blue'), - ('response', 'Response', 'red'), + ("request", "Request", "blue"), + ("response", "Response", "red"), ] class Mapping(NetBoxModel): source = models.CharField( - max_length=2000, + max_length=URL_MAX_SIZE, blank=False, - verbose_name='Source', - validators=[URLValidator(message='It must be a url')], - unique=True + verbose_name="Source", + validators=[URLValidator(message="It must be a url")], + unique=True, ) target = models.CharField( - max_length=2000, + max_length=URL_MAX_SIZE, blank=False, - verbose_name='Target', - validators=[URLValidator(message='It must be a url')], + verbose_name="Target", + validators=[URLValidator(message="It must be a url")], ) authentication = models.CharField( max_length=30, choices=AuthenticationChoices, default=AuthenticationChoices.DEFAULT_VALUE, blank=False, - verbose_name='Auth', + verbose_name="Auth", ) testingpage = models.CharField( - max_length=2000, + max_length=URL_MAX_SIZE, blank=True, null=True, - validators=[URLValidator(message='It must be a url')], + validators=[URLValidator(message="It must be a url")], ) webdav = models.BooleanField( default=False, ) - Comment = models.CharField( - max_length=500, - blank=True - ) + Comment = models.CharField(max_length=500, blank=True) class Meta: - ordering = ('source', 'target') + ordering = ("source", "target") def __str__(self): - return f'{self.source}' + return f"{self.source}" def get_absolute_url(self): - return reverse('plugins:netbox_rps_plugin:mapping', args=[self.pk]) + return reverse("plugins:netbox_rps_plugin:mapping", args=[self.pk]) - def get_authentication_color(self): - return AuthenticationChoices.colors.get(self.authentication) + +class SamlConfig(NetBoxModel): + acs_url = models.CharField( + max_length=URL_MAX_SIZE, + blank=False, + verbose_name="ACS URL", + validators=[URLValidator(message="It must be a url")], + ) + logout_url = models.CharField( + max_length=URL_MAX_SIZE, + blank=False, + verbose_name="Logout URL", + validators=[URLValidator(message="It must be a url")], + ) + force_nauth = models.BooleanField( + default=False, + ) + Mapping = models.OneToOneField( + Mapping, + on_delete=models.CASCADE, + related_name="saml_config", + db_column="mapping_id", + name="mapping", + ) class HttpHeader(NetBoxModel): - mapping = models.ForeignKey(Mapping, on_delete=models.CASCADE, related_name='http_headers') - name = models.CharField( - max_length=120, - blank=False, - verbose_name='Header name' + mapping = models.ForeignKey( + Mapping, on_delete=models.CASCADE, related_name="http_headers" ) + name = models.CharField(max_length=120, blank=False, verbose_name="Header name") value = models.CharField( - max_length=256, - null=True, - blank=True, - verbose_name='Header value' + max_length=256, null=True, blank=True, verbose_name="Header value" ) apply_to = models.CharField( max_length=30, choices=ApplyToChoices, default=ApplyToChoices.DEFAULT_VALUE, blank=False, - verbose_name='Apply to', + verbose_name="Apply to", ) class Meta: - unique_together = ['mapping', 'name', 'apply_to'] - ordering = ['name'] + unique_together = ["mapping", "name", "apply_to"] + ordering = ["name"] def __str__(self): return self.name def get_absolute_url(self): - return reverse('plugins:netbox_rps_plugin:httpheader', args=[self.pk]) + return reverse("plugins:netbox_rps_plugin:httpheader", args=[self.pk]) diff --git a/plugins/netbox-rps-plugin/setup.py b/plugins/netbox-rps-plugin/setup.py index 5ba2bcd..ce13692 100644 --- a/plugins/netbox-rps-plugin/setup.py +++ b/plugins/netbox-rps-plugin/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup setup( name='netbox_rps_plugin', - version='0.8.5', + version='0.9.0', description='A Netbox plugin to add RPS resources', install_requires=[], packages=find_packages(), diff --git a/plugins/netbox-rps-plugin/tests/e2e/test_saml_config.py b/plugins/netbox-rps-plugin/tests/e2e/test_saml_config.py new file mode 100644 index 0000000..a8783d6 --- /dev/null +++ b/plugins/netbox-rps-plugin/tests/e2e/test_saml_config.py @@ -0,0 +1,315 @@ +import json +import unittest +import requests +import os + +HOST = os.getenv("HOST", default="localhost") +PORT = os.getenv("PORT", default="8080") +API_KEY = os.getenv("API_KEY", "only4testingpurpose") + + +class TestSamlConfig(unittest.TestCase): + mappingId = [] + + def test_that_saml_config_is_created(self): + r = requests.post( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json={ + "source": "https://truc7.com/api", + "target": "http://10.10.10.10:1886/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 201) + + self.mappingId = json.loads(r.content)["id"] + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost", + "logout_url": "http://localhost", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 201) + + samlConfigId = json.loads(r.content)["id"] + + r = requests.get( + url="http://{}:{}/api/plugins/rps/mapping/{}/".format( + HOST, PORT, self.mappingId + ), + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertIsNotNone(json.loads(r.content)["saml_config"]) + self.assertEqual(json.loads(r.content)["saml_config"]["id"], samlConfigId) + + def test_that_saml_config_is_cascade_deleted(self): + r = requests.post( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json={ + "source": "https://truc7.com/api", + "target": "http://10.10.10.10:1886/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 201) + + mappingId = json.loads(r.content)["id"] + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost", + "logout_url": "http://localhost", + "force_nauth": False, + "mapping": mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 201) + + samlConfigId = json.loads(r.content)["id"] + + requests.delete( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json=[{"id": mappingId}], + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + r = requests.get( + url="http://{}:{}/api/plugins/rps/saml_config/{}/".format( + HOST, PORT, samlConfigId + ), + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 404) + + def test_that_patch_works(self): + r = requests.post( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json={ + "source": "https://truc7.com/api", + "target": "http://10.10.10.10:1886/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 201) + + self.mappingId = json.loads(r.content)["id"] + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost", + "logout_url": "http://localhost", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 201) + + samlConfigId = json.loads(r.content)["id"] + + r = requests.patch( + url="http://{}:{}/api/plugins/rps/saml_config/{}/".format( + HOST, PORT, samlConfigId + ), + json={"acs_url": "http://anotherhost.com:8080"}, + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 200) + + r = requests.get( + url="http://{}:{}/api/plugins/rps/saml_config/{}/".format( + HOST, PORT, samlConfigId + ), + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 200) + self.assertEqual( + json.loads(r.content)["acs_url"], "http://anotherhost.com:8080" + ) + + def test_that_acs_url_is_an_url(self): + r = requests.post( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json={ + "source": "https://truc7.com/api", + "target": "http://10.10.10.10:1886/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 201) + + self.mappingId = json.loads(r.content)["id"] + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://thisisanurl", + "logout_url": "http://localhost", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 400) + self.assertEqual(r.content, b'{"acs_url":["It must be a url"]}') + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "verynotanurl", + "logout_url": "http://localhost", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 400) + self.assertEqual(r.content, b'{"acs_url":["It must be a url"]}') + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost/api" + ("i" * 1981), + "logout_url": "http://localhost", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 400) + self.assertEqual( + r.content, + b'{"acs_url":["Ensure this field has no more than 2000 characters."]}', + ) + + def test_that_logout_url_is_an_url(self): + r = requests.post( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json={ + "source": "https://truc7.com/api", + "target": "http://10.10.10.10:1886/api", + "authentication": "none", + "testingpage": None, + }, + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + self.assertEqual(r.status_code, 201) + + self.mappingId = json.loads(r.content)["id"] + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost", + "logout_url": "http://thisisanurl", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 400) + self.assertEqual(r.content, b'{"logout_url":["It must be a url"]}') + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost", + "logout_url": "verynotanurl", + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 400) + self.assertEqual(r.content, b'{"logout_url":["It must be a url"]}') + + r = requests.post( + url="http://{}:{}/api/plugins/rps/saml_config/".format(HOST, PORT), + json={ + "acs_url": "http://localhost/api", + "logout_url": "http://localhost/api" + ("i" * 1981), + "force_nauth": False, + "mapping": self.mappingId, + }, + headers={ + "Authorization": "Token {}".format(API_KEY), + "accept": "application/json", + }, + ) + + self.assertEqual(r.status_code, 400) + self.assertEqual( + r.content, + b'{"logout_url":["Ensure this field has no more than 2000 characters."]}', + ) + + def tearDown(self) -> None: + requests.delete( + url="http://{}:{}/api/plugins/rps/mapping/".format(HOST, PORT), + json=[{"id": self.mappingId}], + headers={"Authorization": "Token {}".format(API_KEY)}, + ) + + +if __name__ == "__main__": + unittest.main() -- GitLab