Code development platform for open source projects from the European Union institutions 🔵 EU Login authentication by SMS has been phased out. To see alternatives please check here

Skip to content

Alternative way of aggregating with polars

Hello, thanks for the work ! I needed to change a bit the aggregation (notably to add the columns illegal_content_legal_ground and TDB_columnsFull.incompatible_content_ground that contains often interesting informations), and having trouble manipulating spark I made a slower polars equivalent.

from dsa_tdb.types import TDB_columnsFull
import polars as pl
from pathlib import Path


ALL_GROUPABLE_COLUMNS = [
    TDB_columnsFull.decision_visibility, TDB_columnsFull.decision_monetary, TDB_columnsFull.decision_monetary_other, 
    TDB_columnsFull.decision_provision, TDB_columnsFull.decision_account, TDB_columnsFull.decision_ground, 
    TDB_columnsFull.illegal_content_legal_ground, TDB_columnsFull.incompatible_content_ground, 
    TDB_columnsFull.incompatible_content_illegal, TDB_columnsFull.category, TDB_columnsFull.content_type, 
    TDB_columnsFull.content_type_other, TDB_columnsFull.content_date, TDB_columnsFull.source_type, 
    TDB_columnsFull.automated_detection, TDB_columnsFull.automated_decision, TDB_columnsFull.platform_name,
    TDB_columnsFull.created_at, TDB_columnsFull.application_date
]

def aggregate_platform(platform_name: str, use_dummies=False):
    folder_path = base_path / f"{platform_name}___light" / "daily_dumps_chunked"
    for day_file in sorted(list(folder_path.iterdir())):
        aggregate_day_files(day_file, use_dummies=use_dummies)

def aggregate_day_files(day_folder: Path, use_dummies=False):
    print(day_folder.name)
    target_file = day_folder / "daily_aggregate.parquet"
    if target_file.exists():
        print("already aggregated")
        return
    file_list = list(day_folder.glob("part-*.parquet"))
    print(file_list)
    if not file_list:
        print("nothing to aggregate")
        return
    aggregated_dataframe = (
        pl.concat([pl.scan_parquet(file) for file in file_list])
        .cast({"application_date": pl.Date, "content_date": pl.Date, "created_at": pl.Date})
        .sort(by="application_date")
        .select(ALL_GROUPABLE_COLUMNS)
        .with_columns(
            decision_visibility=pl.col("decision_visibility").str.json_decode().list.sort(),
            content_type=pl.col("content_type").str.json_decode().list.sort(),
        )
        .group_by(ALL_GROUPABLE_COLUMNS).len()
        .collect()
    )
    if use_dummies:
        aggregated_dataframe = aggregated_dataframe.to_dummies(["content_type", "decision_visibility"])
    aggregated_dataframe.write_parquet(target_file)

aggregate_platform(platform_name="tiktok")

And after to recover a complete dataframe with all the agregated data

df_all_tiktok_aggregated = pl.concat([pl.read_parquet(file) for file in folder_path.glob("*/daily_aggregate.parquet")])

While it might not be of interest as it may be a bit slow (~30 min to aggregate all of tiktok), I let it accessible in case someone might be interested

Edited by Nicolas Rolin