From ef4da0f7e8e433a93569ccec17a1c2759b786c93 Mon Sep 17 00:00:00 2001 From: Thomas Vliagkoftis <thomas.vliagkoftis@gmail.com> Date: Mon, 21 Nov 2022 12:15:29 +0100 Subject: [PATCH] enh: jrcmatics test scripts for updates --- vliagth/jrcmatics/all_data_per_trip.py | 166 ++++++++++++++++++ vliagth/jrcmatics/set_trip_guid.py | 40 +++-- vliagth/jrcmatics/test.py | 43 +++++ .../jrcmatics/timestamp_jump_correction.py | 112 ++++++++++++ vliagth/jrcmatics/trip_guid_test.py | 52 ++++++ vliagth/jrcmatics/update_negative_timetamp.py | 45 +++++ vliagth/jrcmatics/vin_correction.py | 48 +++++ 7 files changed, 491 insertions(+), 15 deletions(-) create mode 100644 vliagth/jrcmatics/all_data_per_trip.py create mode 100644 vliagth/jrcmatics/test.py create mode 100644 vliagth/jrcmatics/timestamp_jump_correction.py create mode 100644 vliagth/jrcmatics/trip_guid_test.py create mode 100644 vliagth/jrcmatics/update_negative_timetamp.py create mode 100644 vliagth/jrcmatics/vin_correction.py diff --git a/vliagth/jrcmatics/all_data_per_trip.py b/vliagth/jrcmatics/all_data_per_trip.py new file mode 100644 index 0000000..db064ca --- /dev/null +++ b/vliagth/jrcmatics/all_data_per_trip.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 + +tvin = 'JM4BP6HE601165733' # A0HNZZ33 +dfrom = 'Oct 01 2022' +dto = 'Nov 08 2022' + +import datetime +import dateparser +import pandas +from src.uds4jrc.db import jrcmatics_data + +list_df = [] + +for vin in jrcmatics_data.find({}).distinct('vin'): + + # if( vin != tvin ): continue + + pipeline = \ + [ + { + '$match': { + 'vin': vin, + 'trip_created_at': {'$gt': dateparser.parse(dfrom), + '$lt': dateparser.parse(dto) + datetime.timedelta(days=1)} + } + }, { + '$sort': { + 'trip_created_at': 1, + 'time': 1, + } + }, { + '$group': { + '_id': '$trip_created_at', + 'vin': {'$first': '$vin'}, + 'dev_id': {'$first': '$dev_id'}, + 'trip_guid': {'$first': '$trip_guid'}, + 'min_time': {'$min': '$time'}, + 'max_time': {'$max': '$time'}, + 'tot_distances': {'$push': '$tot_distance'}, + 'max_tot_distance': {'$max': '$tot_distance'}, + 'tot_fuels': {'$push': '$tot_fuel'}, + 'max_tot_fuel': {'$max': '$tot_fuel'}, + 'min_ambient_temp': {'$min': '$ambient_temp'}, + 'max_ambient_temp': {'$max': '$ambient_temp'}, + 'mean_ambient_temp': {'$avg': '$ambient_temp'}, + 'min_distance_cd_ice_on': {'$min': '$distance_cd_ice_on'}, + 'max_distance_cd_ice_on': {'$max': '$distance_cd_ice_on'}, + 'min_distance_cd_ice_off': {'$min': '$distance_cd_ice_off'}, + 'max_distance_cd_ice_off': {'$max': '$distance_cd_ice_off'}, + 'mean_speed': {'$avg': '$speed'}, + 'std_speed': {'$stdDevPop': '$speed'}, + 'mean_speed_rural': { + '$avg': { + '$cond': [{'$and': [{'$gte': ['$speed', 0.0]}, {'$lte': ['$speed', 60.0]}]}, '$speed', None]}}, + 'mean_speed_urban': { + '$avg': { + '$cond': [{'$and': [{'$gt': ['$speed', 60.0]}, {'$lte': ['$speed', 90.0]}]}, '$speed', None]}}, + 'mean_speed_motorway': {'$avg': { + '$cond': [{'$and': [{'$gt': ['$speed', 90.0]}, {'$lte': ['$speed', 200.0]}]}, '$speed', None]}}, + 'stop_percentage': {'$avg': {'$cond': [{'$eq': ['$speed', 0.0]}, 1.0, 0.0]}}, + 'mean_rpm': {'$avg': '$rpm'}, + 'std_rpm': {'$stdDevPop': '$rpm'}, + 'min_fuel_cd': {'$min': '$fuel_cd'}, + 'max_fuel_cd': {'$max': '$fuel_cd'}, + 'min_fuel_ci': {'$min': '$fuel_ci'}, + 'max_fuel_ci': {'$max': '$fuel_ci'}, + 'coolant_temp_arr': {'$push': '$coolant_temp'}, + 'urban_share': {'$avg': { + '$cond': [ + {'$and': [{'$ne': ['$speed', None]}, {'$gte': ['$speed', 0.0]}, {'$lte': ['$speed', 60.0]}]}, + 1.0, 0.0]}}, + 'rural_share': {'$avg': { + '$cond': [ + {'$and': [{'$ne': ['$speed', None]}, {'$gt': ['$speed', 60.0]}, {'$lte': ['$speed', 90.0]}]}, + 1.0, 0.0]}}, + 'motorway_share': {'$avg': { + '$cond': [ + {'$and': [{'$ne': ['$speed', None]}, {'$gt': ['$speed', 90.0]}, {'$lte': ['$speed', 200.0]}]}, + 1.0, 0.0]}}, + 'mean_acc_pedal_pos_d': {'$avg': '$acc_pedal_pos_d'}, + 'std_acc_pedal_pos_d': {'$stdDevPop': '$acc_pedal_pos_d'}, + 'grid_energy_cd_ice_off_arr': {'$push': '$grid_energy_cd_ice_off'}, + 'mean_grid_energy_cd_ice_off': {'$avg': '$grid_energy_cd_ice_off'}, + 'grid_energy_cd_ice_on_arr': {'$push': '$grid_energy_cd_ice_on'}, + 'mean_grid_energy_cd_ice_on': {'$avg': '$grid_energy_cd_ice_on'}, + 'grid_energy_into_bat_arr': {'$push': '$grid_energy_into_bat'}, + 'mean_grid_energy_into_bat': {'$avg': '$grid_energy_into_bat'}, + } + }, { + '$addFields': { + 'min_tot_distance': {'$ifNull': [ + { + '$min': { + '$filter': { + 'input': "$tot_distances", + 'cond': {'$gt': ['$$this', 0]} + } + } + }, + 0 + ]}, + 'min_tot_fuel': {'$ifNull': [ + { + '$min': { + '$filter': { + 'input': "$tot_fuels", + 'cond': {'$gt': ['$$this', 0]} + } + } + }, + 0 + ]}, + } + }, { + '$addFields': { + 'distance': {'$subtract': ['$max_tot_distance', '$min_tot_distance']}, + 'fuel': {'$subtract': ['$max_tot_fuel', '$min_tot_fuel']}, + 'duration': {'$divide': [{'$subtract': ['$max_time', '$min_time']}, 1000]}, + 'distance_fuel_on': {'$subtract': ['$max_distance_cd_ice_on', '$min_distance_cd_ice_on']}, + 'distance_fuel_off': {'$subtract': ['$max_distance_cd_ice_off', '$min_distance_cd_ice_off']}, + 'fuel_ci': {'$subtract': ['$max_fuel_ci', '$min_fuel_ci']}, + 'fuel_cd': {'$subtract': ['$max_fuel_cd', '$min_fuel_cd']}, + 'start_coolant_temp': {'$arrayElemAt': ['$coolant_temp_arr', 0]}, + 'start_grid_energy_cd_ice_off': {'$arrayElemAt': ['$grid_energy_cd_ice_off_arr', 0]}, + 'end_grid_energy_cd_ice_off': {'$arrayElemAt': ['$grid_energy_cd_ice_off_arr', -1]}, + 'start_grid_energy_cd_ice_on': {'$arrayElemAt': ['$grid_energy_cd_ice_on_arr', 0]}, + 'end_grid_energy_cd_ice_on': {'$arrayElemAt': ['$grid_energy_cd_ice_on_arr', -1]}, + 'start_grid_energy_into_bat': {'$arrayElemAt': ['$grid_energy_into_bat_arr', 0]}, + 'end_grid_energy_into_bat': {'$arrayElemAt': ['$grid_energy_into_bat_arr', -1]}, + } + }, { + '$match': {'distance': {'$gt': 0}, 'duration': {'$gt': 0}} + }, { + '$sort': { + '_id': -1 + } + }, { + '$project': { + # '_id': 0, + 'min_distance_cd_ice_on': 0, + 'max_distance_cd_ice_on': 0, + 'min_distance_cd_ice_off': 0, + 'max_distance_cd_ice_off': 0, + 'min_fuel_cd': 0, + 'max_fuel_cd': 0, + 'min_fuel_ci': 0, + 'max_fuel_ci': 0, + 'coolant_temp_arr': 0, + 'grid_energy_cd_ice_off_arr': 0, + 'grid_energy_cd_ice_on_arr': 0, + 'grid_energy_into_bat_arr': 0, + } + } + ] + + results_df = pandas.DataFrame(list(jrcmatics_data.aggregate(pipeline, allowDiskUse=True))) + + if results_df.empty: + continue + else: + list_df.append(results_df) + +final_df = pandas.concat(list_df, ignore_index=True) + +final_df.to_excel("/home/vliagth/aggregated_trips.xlsx", index=False) +final_df.to_csv("/home/vliagth/aggregated_trips.csv", index=False) diff --git a/vliagth/jrcmatics/set_trip_guid.py b/vliagth/jrcmatics/set_trip_guid.py index 0bfbdcc..8f42993 100644 --- a/vliagth/jrcmatics/set_trip_guid.py +++ b/vliagth/jrcmatics/set_trip_guid.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 import multiprocessing as mp -import pandas as pd +import uuid from bson.binary import Binary, UuidRepresentation from munch import munchify @@ -10,12 +10,14 @@ from src.uds4jrc.db import jrcmatics_data def set_trip_guid(trip): trip = munchify(trip) - guid_binary = Binary.from_uuid(trip.guid, UuidRepresentation.STANDARD) + + guid = uuid.uuid4() + guid_binary = Binary.from_uuid(guid, UuidRepresentation.STANDARD) jrcmatics_data.update_many( { 'dev_id': trip.device, - 'trip_created_at': trip.created_at + 'trip_created_at': trip.trip_created_at }, { '$set': { @@ -24,20 +26,28 @@ def set_trip_guid(trip): } ) + print(f"Updated trip. Dev id: {trip.device} created at {trip.trip_created_at}") -if __name__ == "__main__": - trips = pd.read_csv( - '/eos/jeodpp/data/projects/LEGENT/internal/jrcmatics/jrcmatics_trips.csv', - delimiter=',', - header=0, - encoding='utf-8', - index_col=False - ) - - df = pd.DataFrame(trips, columns=['guid', 'device', 'created_at']) - df.to_csv('jrcmatics_trips.csv', index=False) - p = mp.Pool(mp.cpu_count()) +if __name__ == "__main__": + trips = list(jrcmatics_data.aggregate([ + {"$match": + {'trip_guid': {"$exists": False}} + }, + {"$group": + {"_id": { + "device": "$dev_id", + "trip_created_at": "$trip_created_at" + }} + }, + {"$project": { + "_id": 0, + "device": '$_id.device', + "trip_created_at": "$_id.trip_created_at" + }} + ])) + + p = mp.Pool(100) p.map(set_trip_guid, trips) diff --git a/vliagth/jrcmatics/test.py b/vliagth/jrcmatics/test.py new file mode 100644 index 0000000..905b2c4 --- /dev/null +++ b/vliagth/jrcmatics/test.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +import multiprocessing as mp +import pandas as pd +import uuid + +from bson.binary import Binary, UuidRepresentation +from munch import munchify +from datetime import datetime +from src.uds4jrc.db import jrcmatics_data + + +if __name__ == "__main__": + guid = uuid.UUID('f57684b13b4646438fc881a142e7dddf') + trip_guid_binary = Binary.from_uuid(guid, UuidRepresentation.STANDARD) + + trip_created_at = datetime.strptime('2021-11-14 14:56:31', '%Y-%m-%d %H:%M:%S') + + rs = pd.DataFrame(list(jrcmatics_data.find( + { + 'dev_id': 'A0HL8M13', + 'trip_created_at': trip_created_at + }, + { + '_id': 1, + 'trip_guid': 1, + 'timestamp_corr': 1, + 'trip_created_at': 1, + 'created_at': 1, + 'time': 1, + 'time_corr': 1, + } + ))) + + test = 3 + + + + +# Example +# www.jrcmatics.com/device/A0HLZXHE/trip/4c0cef49-4442-4499-ab3f-77f329875cb4 +# www.jrcmatics.com/trip/4c0cef49-4442-4499-ab3f-77f329875cb4 +# www.jrcmatics.com/device/A0HLZXHE/trip/20220208-104803 diff --git a/vliagth/jrcmatics/timestamp_jump_correction.py b/vliagth/jrcmatics/timestamp_jump_correction.py new file mode 100644 index 0000000..f486662 --- /dev/null +++ b/vliagth/jrcmatics/timestamp_jump_correction.py @@ -0,0 +1,112 @@ +import datetime +import uuid +import multiprocessing as mp +import pandas as pd + +from munch import Munch +from src.uds4jrc.db import jrcmatics_data +from bson.binary import Binary, UuidRepresentation + + +def update_trip_with_jump(trip): + trip = Munch(trip) + trip_guid_binary = Binary.from_uuid(trip.trip_guid, UuidRepresentation.STANDARD) + + delta = datetime.timedelta(seconds=0) + + timestamp_anchor = 0 + prev_doc = None + jump_without_downtime = False + + time_anchor_point = trip.trip_created_at + + rs = list(jrcmatics_data.find( + { + 'trip_guid': trip_guid_binary + }, + { + '_id': 1, + 'timestamp_corr': 1, + 'trip_created_at': 1, + 'created_at': 1, + 'time': 1, + 'time_corr': 1 + } + )) + + sorted_rs = sorted(rs, key=lambda k: k['_id']) + + df = pd.DataFrame(sorted_rs) + + for doc in sorted_rs: + doc = Munch(doc) + + if timestamp_anchor == 0: + timestamp_anchor = int(doc.timestamp_corr / 1000) + + # if the check is True, it means that the device has reset + if prev_doc and abs(int(doc.timestamp_corr / 1000) - int(prev_doc.timestamp_corr / 1000)) > 180: + reset_downtime = (doc.created_at - prev_doc.created_at).total_seconds() + + if int(doc.timestamp_corr / 1000) - timestamp_anchor < 0: + if reset_downtime: + delta = datetime.timedelta(seconds=reset_downtime) + timestamp_anchor = int(doc.timestamp_corr / 1000) + else: + jump_without_downtime = True + timestamp_anchor = int(prev_doc.timestamp_corr / 1000 + doc.timestamp_corr / 1000) + + time_anchor_point = prev_doc.time + delta + + current_timestamp = int(timestamp_anchor + doc.timestamp_corr / 1000) if jump_without_downtime else int(doc.timestamp_corr / 1000) + time = time_anchor_point + datetime.timedelta(seconds=current_timestamp) - datetime.timedelta(seconds=timestamp_anchor) + + jrcmatics_data.update_one( + { + '_id': doc._id + }, + { + '$set': { + 'time_corr': time + } + } + ) + + doc.time = time + prev_doc = doc + + jrcmatics_data.update_many( + { + 'trip_guid': trip_guid_binary + }, + { + '$set': { + 'jump_updated': True + } + } + ) + + print(f"time_corr values generated for trip {str(trip.trip_guid)}") + + +if __name__ == "__main__": + guid = uuid.UUID('81ea5d4429f54b619b88c29eee21145f') + trip_guid_binary = Binary.from_uuid(guid, UuidRepresentation.STANDARD) + + trips = list(jrcmatics_data.aggregate([ + {"$match": { + 'trip_guid': trip_guid_binary + }}, + {"$group": { + "_id": {"trip_guid": "$trip_guid"}, + "trip_created_at": {"$first": "$trip_created_at"} + }}, + {"$project": { + "_id": 0, + "trip_guid": '$_id.trip_guid', + "trip_created_at": "$trip_created_at" + }} + ])) + + p = mp.Pool(1) + p.map(update_trip_with_jump, trips) diff --git a/vliagth/jrcmatics/trip_guid_test.py b/vliagth/jrcmatics/trip_guid_test.py new file mode 100644 index 0000000..b825110 --- /dev/null +++ b/vliagth/jrcmatics/trip_guid_test.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +import multiprocessing as mp +import pandas as pd +import uuid + +from bson.binary import Binary, UuidRepresentation +from munch import munchify +from datetime import datetime +from src.uds4jrc.db import jrcmatics_data + + +if __name__ == "__main__": + csv_trips = pd.read_csv( + '/eos/jeodpp/data/projects/LEGENT/internal/jrcmatics/jrcmatics_trips.csv', + delimiter=',', + header=0, + encoding='utf-8', + index_col=False + ) + + csv_trips.created_at = pd.to_datetime(csv_trips.created_at) + # csv_trips = csv_trips.drop(columns=['guid']) + + db_trips = pd.DataFrame(list(jrcmatics_data.aggregate([ + {"$match": + {'trip_guid': {"$exists": False}} + }, + {"$group": + {"_id": { + "device": "$dev_id", + "trip_created_at": "$trip_created_at" + }} + }, + {"$project": { + "_id": 0, + "device": '$_id.device', + "trip_created_at": "$_id.trip_created_at" + }} + ]))) + + s1 = pd.merge(db_trips, csv_trips, how='inner', on=['device', 'trip_created_at']) + + test = 2 + + + + +# Example +# www.jrcmatics.com/device/A0HLZXHE/trip/4c0cef49-4442-4499-ab3f-77f329875cb4 +# www.jrcmatics.com/trip/4c0cef49-4442-4499-ab3f-77f329875cb4 +# www.jrcmatics.com/device/A0HLZXHE/trip/20220208-104803 diff --git a/vliagth/jrcmatics/update_negative_timetamp.py b/vliagth/jrcmatics/update_negative_timetamp.py new file mode 100644 index 0000000..21f1852 --- /dev/null +++ b/vliagth/jrcmatics/update_negative_timetamp.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +from munch import Munch +from src.uds4jrc.db import jrcmatics_data + +ABS_MIN_INT_32 = 2147483648 + +# copy values of positive timestamp in new field +jrcmatics_data.update_many( + { + "timestamp": {"$gte": 0} + }, + [{ + "$set": { + "timestamp_corr": "$timestamp" + } + }] +) + +# find and update the negative timestamp values using the newly generated field +docs = jrcmatics_data.find( + { + 'timestamp': {'$lt': 0} + }, + { + '_id': 1, + 'timestamp': 1 + } +) + +for doc in docs: + doc = Munch(doc) + + jrcmatics_data.update_one( + { + '_id': doc._id + }, + { + '$set': { + 'timestamp_corr': 2 * ABS_MIN_INT_32 + doc.timestamp, + 'had_negative_timestamp': True + } + } + ) + diff --git a/vliagth/jrcmatics/vin_correction.py b/vliagth/jrcmatics/vin_correction.py new file mode 100644 index 0000000..31ec108 --- /dev/null +++ b/vliagth/jrcmatics/vin_correction.py @@ -0,0 +1,48 @@ +from src.uds4jrc.db import jrcmatics_data +import pandas as pd +import numpy as np + +vin_correction_filepath = "/eos/jeodpp/home/users/tansial/vin_correction.xlsx" + +for vin in jrcmatics_data.distinct('vin'): + print(vin) + x = jrcmatics_data.update_many({"vin": {"$eq":vin}}, {"$set": {"vin_corr": vin}}) + print(x.modified_count, " documents copied.\n") + +vin_correction = pd.read_excel(vin_correction_filepath) + +dev_ids = vin_correction['dev_id'].values + +list_to_get = ['time', 'dev_id', 'trip_created_at', 'vin'] +select_cl = {key: 1 for key in list_to_get} + +data = {} + +for dev_id in dev_ids: + rawData = jrcmatics_data.find({'dev_id': dev_id}, select_cl) + data[dev_id] = pd.DataFrame(rawData) + +for idx, row in vin_correction.iterrows(): + dev_id = row.dev_id + print("ID: ", dev_id) + if not data[dev_id].empty: + if 'vin' in data[dev_id].columns: + vin = row.vin + start = pd.to_datetime(np.datetime64(row.date_start)) + end = pd.to_datetime((np.datetime64(row.date_end) if pd.notnull(row.date_end) else np.datetime64('now')) + np.timedelta64(1, 'D')) + + vin_missing = pd.isnull(data[dev_id]['vin']) + dates_ok = (data[dev_id]['trip_created_at'] > start) & (data[dev_id]['trip_created_at'] < end) + select = vin_missing & dates_ok + + ids = list(data[dev_id][select]['_id'].values) + + where_cl = { + "_id": {"$in": ids}, + } + update_cl = {"$set": {"vin_corr": vin}} + x = jrcmatics_data.update_many(where_cl, update_cl) + print(x.modified_count, " documents updated.") + else: + print("VIN not available, skipping.") + print("\n") \ No newline at end of file -- GitLab