import json import os from django.contrib.auth.models import Group from extras.scripts import Script, StringVar, FileVar from django.contrib.contenttypes.models import ContentType class InitializeJsonDataScript(Script): file_path = StringVar(description="Path to init data file") json_file = FileVar(description="Upload a CSV file with roles configuration") class Meta: name = "Initialize Data in NetBox" description = "Add data to NetBox via file or path" field_order = ['file_path','json_file'] def __init__(self): super().__init__() self.file_path.field_attrs['required'] = False self.json_file.field_attrs['required'] = False def run(self, data, commit): file_path = data.get('file_path',None) json_file = data.get('json_file',None) if file_path: if not os.path.exists(file_path): self.log_failure(f"File not Found: {file_path}") return if not(json_file) and file_path: try: with open(file_path, "r") as file: json_data = json.load(file) objects = json_data except json.JSONDecodeError as e: self.log_failure(f"Failed to parse file: {e}") return except Exception as e: self.log_failure(f"Error reading file: {e}") return elif not(file_path) and json_file: print("json_file",json_file) json_data= json_file.read().decode("utf-8") try: objects = json.loads(json_data) except json.JSONDecodeError as e: self.log_failure(f"Failed to parse file: {e}") else: self.log_failure("Please specify the path to the data file the data file itself") return # Process Data for obj_data in objects: self.create_object(obj_data) if not commit: self.log_info("Dry-run mode enabled; no changes committed.") def create_object(self, obj_data): object_type = obj_data.pop("object_type", None) if not object_type: self.log_failure("Missing 'object_type' field in object data.") return try: #Specific logic for tag due to duplicated content type name if object_type.lower() == "tag": content_type = ContentType.objects.filter(model=object_type.lower(), app_label="extras").first() else: content_type = ContentType.objects.filter(model=object_type.lower()).first() if not content_type: self.log_failure(f"Error: ContentType '{object_type}' not found in NetBox.") return model_class = content_type.model_class() if not model_class: self.log_failure(f"Error: Model class for '{object_type}' could not be resolved.") return model_fields = {field.name for field in model_class._meta.get_fields()} filtered_data = {} m2m_data ={} for key, value in obj_data.items(): if key in model_fields: field = model_class._meta.get_field(key) # Handle object_types assignment for ObjectPermission (e.g., "auth | group") if key == "object_types" and isinstance(value, list): object_cts = [] for obj in value: m2m_data["model_name"] = obj["model"] m2m_data["app_label"] = obj["app_label"] assigned_ct = ContentType.objects.filter( model=obj["model"], app_label=obj["app_label"] ).first() if assigned_ct: object_cts.append(assigned_ct) else: self.log_failure(f"Error: ContentType '{obj}' not found.") return if object_cts: m2m_data["object_types"] = object_cts if key == "groups" and isinstance(value, list): m2m_data["groups"] = value # Handle assigned_object dynamically if key == "assigned_object" and isinstance(value, dict) and "name" in value and "object_type" in value: assigned_ct = ContentType.objects.filter(model=value["object_type"].lower()).first() if assigned_ct: assigned_model_class = assigned_ct.model_class() if assigned_model_class: assigned_object = assigned_model_class.objects.filter(name=value["name"]).first() if assigned_object: filtered_data["assigned_object_id"] = assigned_object.id filtered_data["assigned_object_type"] = assigned_ct else: self.log_failure(f"Error: Assigned object '{value['name']}' not found in {assigned_ct.model}.") return else: self.log_failure(f"Error: Assigned object type '{value['object_type']}' is invalid.") return else: self.log_failure(f"Error: ContentType for '{value['object_type']}' not found.") return # Handle ForeignKey relationships dynamically elif field.is_relation and isinstance(value, dict) and "name" in value: related_model = field.related_model related_object = related_model.objects.filter(name=value["name"]).first() if related_object: filtered_data[key] = related_object else: self.log_failure(f"Foreign key '{key}' value '{value['name']}' not found.") return else: #Normal objects filtered_data[key] = value #Remove original groups and object types from data if object_type == "ObjectPermission": filtered_data.pop("groups") filtered_data.pop("object_types") # Create or update the regular object if not(object_type == "ObjectPermission"): obj, created = model_class.objects.get_or_create(**filtered_data) # Create Permissions, add groups and objects types else: # Prepare Permission data permission_name = filtered_data.pop("name") actions = [f"{permission_name}"] if "object_types" in m2m_data: model_name = m2m_data.pop("model_name") app_name = m2m_data.pop("app_label") permission_name = f"{permission_name.capitalize()} {model_name.capitalize()} ({app_name.capitalize()})" filtered_data["name"] = permission_name filtered_data["description"] = f"Automatically generated permission: {permission_name}" filtered_data["actions"] = actions # Create Permission permission, created = model_class.objects.get_or_create(**filtered_data) # Assign Group if "groups" in m2m_data and created: for group in m2m_data["groups"]: group_obj = Group.objects.filter(name=group["name"]).first() permission.groups.add(group_obj) # Assign Object Types if "object_types" in m2m_data and created: permission.object_types.set(m2m_data["object_types"]) # Final Status Messages if created: self.log_success(f"Created {object_type}: {obj}") else: self.log_info(f"Updated {object_type}: {obj} (Skipping re-assignment)") except Exception as e: self.log_failure(f"Error processing {object_type}: {e}")