Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import json
import os
from django.contrib.auth.models import Group
from extras.scripts import Script, StringVar
from django.contrib.contenttypes.models import ContentType
class InitializeJsonDataScript(Script):
file_path = StringVar(description="Path to init data file")
class Meta:
name = "Initialize Data in NetBox"
description = "Add data to NetBox"
field_order = ['file_path']
def run(self, data, commit):
file_path = data['file_path']
if not os.path.exists(file_path):
self.log_failure(f"File not Found: {file_path}")
return
try:
with open(file_path, "r") as file:
json_data = json.load(file)
objects = json_data
except json.JSONDecodeError as e:
self.log_failure(f"Failed to parse file: {e}")
return
except Exception as e:
self.log_failure(f"Error reading file: {e}")
return
# Process Data
for obj_data in objects:
self.create_object(obj_data)
if not commit:
self.log_info("Dry-run mode enabled; no changes committed.")
def create_object(self, obj_data):
object_type = obj_data.pop("object_type", None)
if not object_type:
self.log_failure("Missing 'object_type' field in object data.")
return
try:
content_type = ContentType.objects.filter(model=object_type.lower()).first()
if not content_type:
self.log_failure(f"Error: ContentType '{object_type}' not found in NetBox.")
return
model_class = content_type.model_class()
if not model_class:
self.log_failure(f"Error: Model class for '{object_type}' could not be resolved.")
return
model_fields = {field.name for field in model_class._meta.get_fields()}
filtered_data = {}
m2m_data ={}
for key, value in obj_data.items():
if key in model_fields:
field = model_class._meta.get_field(key)
# Handle object_types assignment for ObjectPermission (e.g., "auth | group")
if key == "object_types" and isinstance(value, list):
object_cts = []
for obj in value:
m2m_data["model_name"] = obj["model"]
assigned_ct = ContentType.objects.filter(
model=obj["model"], app_label=obj["app_label"]
).first()
if assigned_ct:
object_cts.append(assigned_ct)
else:
self.log_failure(f"Error: ContentType '{obj}' not found.")
return
if object_cts:
m2m_data["object_types"] = object_cts
if key == "groups" and isinstance(value, list):
m2m_data["groups"] = value
# Handle assigned_object dynamically
if key == "assigned_object" and isinstance(value, dict) and "name" in value and "object_type" in value:
assigned_ct = ContentType.objects.filter(model=value["object_type"].lower()).first()
if assigned_ct:
assigned_model_class = assigned_ct.model_class()
if assigned_model_class:
assigned_object = assigned_model_class.objects.filter(name=value["name"]).first()
if assigned_object:
filtered_data["assigned_object_id"] = assigned_object.id
filtered_data["assigned_object_type"] = assigned_ct
else:
self.log_failure(f"Error: Assigned object '{value['name']}' not found in {assigned_ct.model}.")
return
else:
self.log_failure(f"Error: Assigned object type '{value['object_type']}' is invalid.")
return
else:
self.log_failure(f"Error: ContentType for '{value['object_type']}' not found.")
return
# Handle ForeignKey relationships dynamically
elif field.is_relation and isinstance(value, dict) and "name" in value:
related_model = field.related_model
related_object = related_model.objects.filter(name=value["name"]).first()
if related_object:
filtered_data[key] = related_object
else:
self.log_failure(f"Foreign key '{key}' value '{value['name']}' not found.")
return
else:
#Normal objects
filtered_data[key] = value
#Remove original groups and object types from data
if object_type == "ObjectPermission":
filtered_data.pop("groups")
filtered_data.pop("object_types")
# Create or update the regular object
if not(object_type == "ObjectPermission"):
obj, created = model_class.objects.get_or_create(**filtered_data)
# Create Permissions, add groups and objects types
else:
# Prepare Permission data
permission_name = filtered_data.pop("name")
actions = [f"{permission_name}"]
if "object_types" in m2m_data:
model_name = m2m_data.pop("model_name")
permission_name = f"{permission_name.capitalize()} {model_name.capitalize()}"
filtered_data["name"] = permission_name
filtered_data["description"] = f"Automatically generated permission: {permission_name}"
filtered_data["actions"] = actions
# Create Permission
permission, created = model_class.objects.get_or_create(**filtered_data)
# Assign Group
if "groups" in m2m_data and created:
for group in m2m_data["groups"]:
group_obj = Group.objects.filter(name=group["name"]).first()
permission.groups.add(group_obj)
# Assign Object Types
if "object_types" in m2m_data and created:
permission.object_types.set(m2m_data["object_types"])
# Final Status Messages
if created:
self.log_success(f"Created {object_type}: {obj}")
else:
self.log_info(f"Updated {object_type}: {obj} (Skipping re-assignment)")
except Exception as e:
self.log_failure(f"Error processing {object_type}: {e}")