diff --git a/netbox_custom_scripts/universal_init_data.py b/netbox_custom_scripts/universal_init_data.py new file mode 100644 index 0000000000000000000000000000000000000000..8ced858f179039d1f2a4f0af853127aee89b7b46 --- /dev/null +++ b/netbox_custom_scripts/universal_init_data.py @@ -0,0 +1,161 @@ +import json +import os +from django.contrib.auth.models import Group +from extras.scripts import Script, StringVar +from django.contrib.contenttypes.models import ContentType + +class InitializeJsonDataScript(Script): + file_path = StringVar(description="Path to init data file") + + class Meta: + name = "Initialize Data in NetBox" + description = "Add data to NetBox" + field_order = ['file_path'] + + def run(self, data, commit): + file_path = data['file_path'] + + if not os.path.exists(file_path): + self.log_failure(f"File not Found: {file_path}") + return + + try: + with open(file_path, "r") as file: + json_data = json.load(file) + objects = json_data + except json.JSONDecodeError as e: + self.log_failure(f"Failed to parse file: {e}") + return + except Exception as e: + self.log_failure(f"Error reading file: {e}") + return + + # Process Data + for obj_data in objects: + self.create_object(obj_data) + + if not commit: + self.log_info("Dry-run mode enabled; no changes committed.") + + def create_object(self, obj_data): + object_type = obj_data.pop("object_type", None) + if not object_type: + self.log_failure("Missing 'object_type' field in object data.") + return + + try: + content_type = ContentType.objects.filter(model=object_type.lower()).first() + if not content_type: + self.log_failure(f"Error: ContentType '{object_type}' not found in NetBox.") + return + + model_class = content_type.model_class() + if not model_class: + self.log_failure(f"Error: Model class for '{object_type}' could not be resolved.") + return + + model_fields = {field.name for field in model_class._meta.get_fields()} + filtered_data = {} + m2m_data ={} + + for key, value in obj_data.items(): + if key in model_fields: + field = model_class._meta.get_field(key) + + # Handle object_types assignment for ObjectPermission (e.g., "auth | group") + if key == "object_types" and isinstance(value, list): + object_cts = [] + for obj in value: + m2m_data["model_name"] = obj["model"] + assigned_ct = ContentType.objects.filter( + model=obj["model"], app_label=obj["app_label"] + ).first() + if assigned_ct: + object_cts.append(assigned_ct) + else: + self.log_failure(f"Error: ContentType '{obj}' not found.") + return + if object_cts: + m2m_data["object_types"] = object_cts + + + if key == "groups" and isinstance(value, list): + m2m_data["groups"] = value + + # Handle assigned_object dynamically + if key == "assigned_object" and isinstance(value, dict) and "name" in value and "object_type" in value: + assigned_ct = ContentType.objects.filter(model=value["object_type"].lower()).first() + if assigned_ct: + assigned_model_class = assigned_ct.model_class() + if assigned_model_class: + assigned_object = assigned_model_class.objects.filter(name=value["name"]).first() + if assigned_object: + filtered_data["assigned_object_id"] = assigned_object.id + filtered_data["assigned_object_type"] = assigned_ct + else: + self.log_failure(f"Error: Assigned object '{value['name']}' not found in {assigned_ct.model}.") + return + else: + self.log_failure(f"Error: Assigned object type '{value['object_type']}' is invalid.") + return + else: + self.log_failure(f"Error: ContentType for '{value['object_type']}' not found.") + return + + # Handle ForeignKey relationships dynamically + elif field.is_relation and isinstance(value, dict) and "name" in value: + related_model = field.related_model + related_object = related_model.objects.filter(name=value["name"]).first() + if related_object: + filtered_data[key] = related_object + else: + self.log_failure(f"Foreign key '{key}' value '{value['name']}' not found.") + return + else: + + #Normal objects + filtered_data[key] = value + + #Remove original groups and object types from data + if object_type == "ObjectPermission": + filtered_data.pop("groups") + filtered_data.pop("object_types") + + # Create or update the regular object + if not(object_type == "ObjectPermission"): + obj, created = model_class.objects.get_or_create(**filtered_data) + + # Create Permissions, add groups and objects types + else: + + # Prepare Permission data + permission_name = filtered_data.pop("name") + actions = [f"{permission_name}"] + if "object_types" in m2m_data: + model_name = m2m_data.pop("model_name") + permission_name = f"{permission_name.capitalize()} {model_name.capitalize()}" + filtered_data["name"] = permission_name + filtered_data["description"] = f"Automatically generated permission: {permission_name}" + filtered_data["actions"] = actions + + # Create Permission + permission, created = model_class.objects.get_or_create(**filtered_data) + + # Assign Group + if "groups" in m2m_data and created: + for group in m2m_data["groups"]: + group_obj = Group.objects.filter(name=group["name"]).first() + permission.groups.add(group_obj) + + # Assign Object Types + if "object_types" in m2m_data and created: + permission.object_types.set(m2m_data["object_types"]) + + # Final Status Messages + if created: + self.log_success(f"Created {object_type}: {obj}") + else: + self.log_info(f"Updated {object_type}: {obj} (Skipping re-assignment)") + + except Exception as e: + self.log_failure(f"Error processing {object_type}: {e}") \ No newline at end of file