Code development platform for open source projects from the European Union institutions

Skip to content
Snippets Groups Projects
Commit 61080146 authored by Frederico SEQUEIRA's avatar Frederico SEQUEIRA
Browse files

:fire: Remove not needed scripts

parent 872e1cd4
No related branches found
No related tags found
1 merge request!140V3 custom script distribution
Pipeline #280355 passed
import csv
from io import StringIO
from dcim.models import Region, SiteGroup, Site, Location, DeviceRole, Manufacturer, DeviceType, Device, Interface, Cable, FrontPort, RearPort
from extras.scripts import Script, ObjectVar, FileVar
from django.core.exceptions import ValidationError
from dcim.choices import LinkStatusChoices
from logging import Logger
class ImportLocationsDevicesAndCabling(Script):
class Meta:
name = "Import Locations, Devices, and Cabling"
description = "Imports locations, devices, and cabling from a CSV file"
job_timeout = 3600
csv_file = FileVar(
description="CSV file containing data for locations, devices, and cabling. Must be utf-8 encoded. Header row must mach: Plug_name,Building,Wing_tr,Wing_up,Floor_tr,Floor_up,Plug_Number,Site_Group,Regions,Connected_to_Device_Name,Connected_to_Device_Interface,tr",
label="CSV File",
)
def parse_csv(self, file_content):
"""
Parses the CSV file into a list of dictionaries.
"""
try:
csv_data = StringIO(file_content.read().decode("utf-8"))
reader = csv.DictReader(csv_data)
rows = [row for row in reader]
return rows
except Exception as e:
raise ValidationError(f"Failed to parse CSV: {e}")
def validate_row(self, row):
"""
Validates the row for required fields.
"""
required_fields = [
"Plug_name", "Building", "Wing_tr","Wing_up","Floor_tr","Floor_up","Site_Group", "Regions", "tr",
]
for field in required_fields:
if not row.get(field):
raise ValidationError(f"Missing required field: {field}, Row: {row}")
def create_or_get_region(self, region):
"""
Creates or retrieves a Region
"""
_region, _ = Region.objects.get_or_create(
name=region,
defaults={
"slug": region.lower().replace(" ", "-")
}
)
return _region
def create_or_get_site_group(self, site_group):
"""
Creates or retrieves a Region
"""
_site_group, _ = SiteGroup.objects.get_or_create(
name=site_group,
defaults={
"slug": site_group.lower().replace(" ", "-")
}
)
return _site_group
def create_or_get_site(self, building_name, region, site_group):
"""
Creates or retrieves a Site based on building name, region, and site group.
"""
site, _ = Site.objects.get_or_create(
name=building_name,
region=region,
group=site_group,
defaults={
"slug": building_name.lower().replace(" ", "-"),
"region": region,
"group": site_group,
"status": "active",
}
)
return site
def create_or_get_location(self, parent, name, site):
"""
Creates or retrieves a Location based on name, parent, and site.
"""
location, _ = Location.objects.get_or_create(
name=name,
parent=parent,
site=site,
defaults={
"name": name,
"parent": parent,
"site": site,
"slug": name.lower().replace(" ", "-")
},
)
return location
def create_network_plug_device_and_ports(self, row, site, location):
"""
Creates a Network Plug device and its interfaces in the specified location.
"""
manufacturer, _ = Manufacturer.objects.get_or_create(
name="PP", defaults={"slug": "pp"}
)
device_type_network_plug, _ = DeviceType.objects.get_or_create(
model="Network_plugs",
manufacturer=manufacturer,
defaults={"slug": "Network_plugs".lower()},
)
device_role_networ_plug, _ = DeviceRole.objects.get_or_create(
name="User plug", defaults={"slug": "User-plug".lower(), "color": "ff0000"}
)
device_networ_plug, _ = Device.objects.get_or_create(
name=row["Plug_name"],
site=site,
location=location,
device_type=device_type_network_plug,
role=device_role_networ_plug,
defaults={"status": "active"},
)
# Create front and rear ports
rear_port, _ = RearPort.objects.get_or_create(
device=device_networ_plug,
name=row["Plug_name"],
type="1000base-t" # Adjust type as needed
)
front_port, _ = FrontPort.objects.get_or_create(
device=device_networ_plug,
rear_port=rear_port,
name=row["Plug_name"],
type="1000base-t" # Adjust type as needed
)
return device_networ_plug, front_port, rear_port
def create_pp_device_and_ports(self, row, site, location):
"""
Creates a Patch Panel device and its ports in the specified location.
"""
device_pp_name = row["Floor_tr"]+"."+row["tr"]
manufacturer, _ = Manufacturer.objects.get_or_create(
name="PP", defaults={"slug": "pp"}
)
device_type_pp, _ = DeviceType.objects.get_or_create(
model="PP_RJ45",
manufacturer=manufacturer,
defaults={"slug": "PP_RJ45".lower()},
)
device_role_pp, _ = DeviceRole.objects.get_or_create(
name="Patch Panel RJ45", defaults={"slug": "PATCH-PANEL-RJ45".lower(), "color": "ff0000"}
)
device_pp, _ = Device.objects.get_or_create(
name=device_pp_name,
site=site,
location=location,
device_type=device_type_pp,
role=device_role_pp,
defaults={"status": "active"},
)
# Create front and rear ports
rear_port, _ = RearPort.objects.get_or_create(
device=device_pp,
name=row["Plug_name"],
defaults={"label":" PP rear_port"},
type="1000base-t" # Adjust type as needed
)
front_port, _ = FrontPort.objects.get_or_create(
device=device_pp,
rear_port=rear_port,
name=row["Plug_name"],
defaults={"label":"PP front_port"},
type="1000base-t" # Adjust type as needed
)
return device_pp, front_port, rear_port
def create_cable(self, source_interface, target_interface):
"""
Creates a cable between two devices.
"""
try:
if source_interface.link:
self.log_info(f"Rear port {source_interface} already connected, skipping.")
return None
cable = Cable(
a_terminations = [ source_interface ],
b_terminations = [ target_interface ],
status = LinkStatusChoices.STATUS_CONNECTED
)
return cable
except Exception as e:
self.log_warning(f"Failed to create cable: {e}")
return None
def run(self, data, commit):
"""
Main execution logic for the script.
"""
file_content = data["csv_file"]
# Parse CSV content
rows = self.parse_csv(file_content)
cables = []
for row in rows:
self.log_debug(f"Processing row: {row}")
self.validate_row(row)
# Create or retrieve Region, Site Group and Site
region = self.create_or_get_region(row["Regions"])
site_group = self.create_or_get_site_group(row["Site_Group"])
site = self.create_or_get_site(row["Building"], region, site_group)
# Create Location hierarchy: Wing -> Floor -> Room
wing1 = self.create_or_get_location(None, row["Wing_tr"], site)
wing2 = self.create_or_get_location(None, row["Wing_up"], site)
floor1 = self.create_or_get_location(wing1, row["Floor_tr"], site)
floor2 = self.create_or_get_location(wing2, row["Floor_up"], site)
room = self.create_or_get_location(floor1, row["tr"], site)
# Create device and its ports
device_network_plug, front_port_network_plug, rear_port_network_plug = self.create_network_plug_device_and_ports(row, site, floor2)
device_pp, front_port_pp, rear_port_pp = self.create_pp_device_and_ports(row, site, room)
cables.append(self.create_cable(rear_port_network_plug, rear_port_pp))
self.log_debug(f"Created or used: Region: {region}, Site group: {site_group}, Site: {site}, Locations: [wings:[{wing1}, {wing2}], floors:[{floor1}, {floor2}], room: {room}], \n"
+ f"Network Plug: {device_network_plug}, front port: {front_port_network_plug}, rear_port: {rear_port_network_plug}], \n"
+ f"Patch panel: {device_pp}, front port: {front_port_pp}, rear port: {rear_port_pp}")
for cable in cables:
if cable is not None:
cable.save()
import csv
from extras.scripts import Script, FileVar
from django.contrib.contenttypes.models import ContentType
from users.models import ObjectPermission
from django.contrib.auth.models import Group
class InitializeRolesScript(Script):
csv_file = FileVar(description="Upload a CSV file with roles configuration")
class Meta:
name = "Initialize Roles and Permissions"
description = "Create or update roles and permissions based on a CSV file."
field_order = ['csv_file']
def run(self, data, commit):
csv_file = data['csv_file']
# Required groups to ensure exist
required_groups = ['SuperAccess', 'Administrator', 'Global Viewer', 'Overall Viewer']
# Ensure groups exist
for group_name in required_groups:
group, created = Group.objects.get_or_create(name=group_name)
if created:
self.log_success(f"Group '{group_name}' created.")
else:
self.log_info(f"Group '{group_name}' already exists.")
# Process the uploaded CSV file
try:
csv_data = csv_file.read().decode('utf-8').splitlines()
reader = csv.DictReader(csv_data)
for row in reader:
# Read necessary columns from the CSV
permission_name = row.get('Permission')
groups = row.get('Groups', '').split(", ")
app_label = row.get('app_label')
model = row.get('model')
# Configure the 'actions' field as a list with the permission name
actions = [permission_name] if permission_name else ['view']
# Get the corresponding content type
try:
content_type = ContentType.objects.get(app_label=app_label, model=model)
# Create or update the permission
permission, created = ObjectPermission.objects.get_or_create(
name=f"{permission_name.capitalize()} {model.capitalize()}",
defaults={
'description': f"Automatically generated permission: {permission_name}",
'enabled': True,
'actions': actions
}
)
# Assign content types and groups only if the permission was newly created
if created:
permission.object_types.set([content_type])
for group_name in groups:
try:
group = Group.objects.get(name=group_name)
permission.groups.add(group)
except Group.DoesNotExist:
self.log_warning(f"Group '{group_name}' does not exist. Skipping...")
self.log_success(f"Permission '{permission_name}' created or updated for the model '{model}'.")
except ContentType.DoesNotExist:
self.log_warning(f"ContentType '{app_label} > {model}' does not exist; skipping permission creation.")
except Exception as e:
self.log_error(f"Error processing the CSV file: {e}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment