Code development platform for open source projects from the European Union institutions

Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
universal_init_data.py 7.14 KiB
import json
import os
from django.contrib.auth.models import Group
from extras.scripts import Script, StringVar
from django.contrib.contenttypes.models import ContentType

class InitializeJsonDataScript(Script):
    file_path = StringVar(description="Path to init data file")

    class Meta:
        name = "Initialize Data in NetBox"
        description = "Add data to NetBox"
        field_order = ['file_path']

    def run(self, data, commit):
        file_path = data['file_path']

        if not os.path.exists(file_path):
            self.log_failure(f"File not Found: {file_path}")
            return

        try:
            with open(file_path, "r") as file:
                json_data = json.load(file)
                objects = json_data
        except json.JSONDecodeError as e:
            self.log_failure(f"Failed to parse file: {e}")
            return
        except Exception as e:
            self.log_failure(f"Error reading file: {e}")
            return
        
        # Process Data
        for obj_data in objects:
            self.create_object(obj_data)

        if not commit:
            self.log_info("Dry-run mode enabled; no changes committed.")

    def create_object(self, obj_data):
        object_type = obj_data.pop("object_type", None)
        if not object_type:
            self.log_failure("Missing 'object_type' field in object data.") 
            return

        try:
            content_type = ContentType.objects.filter(model=object_type.lower()).first()
            if not content_type:
                self.log_failure(f"Error: ContentType '{object_type}' not found in NetBox.")
                return
            
            model_class = content_type.model_class()
            if not model_class:
                self.log_failure(f"Error: Model class for '{object_type}' could not be resolved.")
                return

            model_fields = {field.name for field in model_class._meta.get_fields()}
            filtered_data = {}
            m2m_data ={}

            for key, value in obj_data.items():
                if key in model_fields:
                    field = model_class._meta.get_field(key)
                    
                    # Handle object_types assignment for ObjectPermission (e.g., "auth | group")
                    if key == "object_types" and isinstance(value, list):
                        object_cts = []
                        for obj in value:
                            m2m_data["model_name"] = obj["model"]
                            assigned_ct = ContentType.objects.filter(
                                model=obj["model"], app_label=obj["app_label"]
                            ).first()
                            if assigned_ct:
                                object_cts.append(assigned_ct)
                            else:
                                self.log_failure(f"Error: ContentType '{obj}' not found.")
                                return
                        if object_cts:
                            m2m_data["object_types"] = object_cts


                    if key == "groups" and isinstance(value, list): 
                            m2m_data["groups"] = value

                    # Handle assigned_object dynamically
                    if key == "assigned_object" and isinstance(value, dict) and "name" in value and "object_type" in value:
                        assigned_ct = ContentType.objects.filter(model=value["object_type"].lower()).first()
                        if assigned_ct:
                            assigned_model_class = assigned_ct.model_class()
                            if assigned_model_class:
                                assigned_object = assigned_model_class.objects.filter(name=value["name"]).first()
                                if assigned_object:
                                    filtered_data["assigned_object_id"] = assigned_object.id
                                    filtered_data["assigned_object_type"] = assigned_ct
                                else:
                                    self.log_failure(f"Error: Assigned object '{value['name']}' not found in {assigned_ct.model}.")
                                    return
                            else:
                                self.log_failure(f"Error: Assigned object type '{value['object_type']}' is invalid.")
                                return
                        else:
                            self.log_failure(f"Error: ContentType for '{value['object_type']}' not found.")
                            return

                    # Handle ForeignKey relationships dynamically
                    elif field.is_relation and isinstance(value, dict) and "name" in value:
                        related_model = field.related_model
                        related_object = related_model.objects.filter(name=value["name"]).first()
                        if related_object:
                            filtered_data[key] = related_object
                        else:
                            self.log_failure(f"Foreign key '{key}' value '{value['name']}' not found.")
                            return
                    else:
                        
                        #Normal objects
                        filtered_data[key] = value
            
            #Remove original groups and object types from data
            if object_type == "ObjectPermission":
                filtered_data.pop("groups")
                filtered_data.pop("object_types")

            # Create or update the regular object
            if not(object_type == "ObjectPermission"):
                obj, created = model_class.objects.get_or_create(**filtered_data)
            
            # Create Permissions, add groups and objects types
            else:

                # Prepare Permission data
                permission_name = filtered_data.pop("name")
                actions = [f"{permission_name}"]
                if "object_types" in m2m_data:
                    model_name = m2m_data.pop("model_name")
                    permission_name = f"{permission_name.capitalize()} {model_name.capitalize()}"
                filtered_data["name"] = permission_name
                filtered_data["description"] = f"Automatically generated permission: {permission_name}"
                filtered_data["actions"] = actions

                # Create Permission
                permission, created = model_class.objects.get_or_create(**filtered_data)

                # Assign Group
                if "groups" in m2m_data and created:
                    for group in m2m_data["groups"]:
                        group_obj = Group.objects.filter(name=group["name"]).first()
                        permission.groups.add(group_obj)
                
                # Assign Object Types
                if "object_types" in m2m_data and created:
                    permission.object_types.set(m2m_data["object_types"])

            # Final Status Messages
            if created:
                self.log_success(f"Created {object_type}: {obj}")
            else:
                self.log_info(f"Updated {object_type}: {obj} (Skipping re-assignment)")

        except Exception as e:
            self.log_failure(f"Error processing {object_type}: {e}")