Alternative way of aggregating with polars
Hello, thanks for the work !
I needed to change a bit the aggregation (notably to add the columns illegal_content_legal_ground
and TDB_columnsFull.incompatible_content_ground
that contains often interesting informations), and having trouble manipulating spark I made a slower polars equivalent.
from dsa_tdb.types import TDB_columnsFull
import polars as pl
from pathlib import Path
ALL_GROUPABLE_COLUMNS = [
TDB_columnsFull.decision_visibility, TDB_columnsFull.decision_monetary, TDB_columnsFull.decision_monetary_other,
TDB_columnsFull.decision_provision, TDB_columnsFull.decision_account, TDB_columnsFull.decision_ground,
TDB_columnsFull.illegal_content_legal_ground, TDB_columnsFull.incompatible_content_ground,
TDB_columnsFull.incompatible_content_illegal, TDB_columnsFull.category, TDB_columnsFull.content_type,
TDB_columnsFull.content_type_other, TDB_columnsFull.content_date, TDB_columnsFull.source_type,
TDB_columnsFull.automated_detection, TDB_columnsFull.automated_decision, TDB_columnsFull.platform_name,
TDB_columnsFull.created_at, TDB_columnsFull.application_date
]
def aggregate_platform(platform_name: str, use_dummies=False):
folder_path = base_path / f"{platform_name}___light" / "daily_dumps_chunked"
for day_file in sorted(list(folder_path.iterdir())):
aggregate_day_files(day_file, use_dummies=use_dummies)
def aggregate_day_files(day_folder: Path, use_dummies=False):
print(day_folder.name)
target_file = day_folder / "daily_aggregate.parquet"
if target_file.exists():
print("already aggregated")
return
file_list = list(day_folder.glob("part-*.parquet"))
print(file_list)
if not file_list:
print("nothing to aggregate")
return
aggregated_dataframe = (
pl.concat([pl.scan_parquet(file) for file in file_list])
.cast({"application_date": pl.Date, "content_date": pl.Date, "created_at": pl.Date})
.sort(by="application_date")
.select(ALL_GROUPABLE_COLUMNS)
.with_columns(
decision_visibility=pl.col("decision_visibility").str.json_decode().list.sort(),
content_type=pl.col("content_type").str.json_decode().list.sort(),
)
.group_by(ALL_GROUPABLE_COLUMNS).len()
.collect()
)
if use_dummies:
aggregated_dataframe = aggregated_dataframe.to_dummies(["content_type", "decision_visibility"])
aggregated_dataframe.write_parquet(target_file)
aggregate_platform(platform_name="tiktok")
And after to recover a complete dataframe with all the agregated data
df_all_tiktok_aggregated = pl.concat([pl.read_parquet(file) for file in folder_path.glob("*/daily_aggregate.parquet")])
While it might not be of interest as it may be a bit slow (~30 min to aggregate all of tiktok), I let it accessible in case someone might be interested