Code development platform for open source projects from the European Union institutions :large_blue_circle: EU Login authentication by SMS has been phased out. To see alternatives please check here

Skip to content
Snippets Groups Projects

Draft: Resolve "Add support for new data sources"

Open Enrico UBALDI requested to merge 4-new-data-sources into main
3 unresolved threads

Closes #4

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
948 logger.info(f"Extracting {tmp_zip_name}...")
949 with ZipFile(tmp_zip_name, "r") as zip_ref:
950 zip_ref.extractall(root_folder)
951 # Remove the zip file
952 os.remove(tmp_zip_name)
953 logger.info(f"Extracted {tmp_zip_name}.")
954 logger.info("Checking for missing days from the aggregates done.")
955
956 logger.info("Starting up the spark session")
957 spark = dsa_tdb.utils.spark_session_factory(
958 app_name="Advanced data factory",
959 memory_limit=memory_limit,
960 n_workers=n_workers,
961 spark_local_dir=spark_local_dir,
962 )
963 df_raw = spark.read.parquet(os.path.join(target_folder, CHUNKED_FILES_SUBFOLDER_NAME, "sor-*/part-*.parquet"))
  • This, from the README is failing since we don't have the local chunked files in CHUNKED_FILES_SUBFOLDER_NAME (here data/advanced_pipeline/DAILY_CHUNKED). I'm not sure about the proper way to solve it but I would:

    • either make a call to preprocess right before line 963 to ensure we have all the required daily chunks files
    • or make the os.path.join(target_folder, CHUNKED_FILES_SUBFOLDER_NAME) path configurable as a CLI arg to point to repository of choice for the user, e.g. data/tdb_data/global___light/daily_dumps_chunked)
  • Please register or sign in to reply
  • 911 + T.ADVANCED_FILE_AGGREGATE_DATE_PATTERN.format(aggregation=T.TDB_agg_data_versions.complete)
    912 ),
    913 local_filename=agg_dates_file,
    914 check_sha1=False,
    915 )
    916
    917 dates_file_df = pd.read_csv(agg_dates_file)
    918 days_from_agg_file = dates_file_df["date"].unique()
    919
    920 table["day_str"] = table["date_str"]
    921 logger.debug(f"Dates from the aggregates: {days_from_agg_file}")
    922 logger.debug(f"Dates from the table: {table['day_str'].values}")
    923 missing_days = set(table["day_str"].values) - set(days_from_agg_file)
    924 # Then remove the days that are already available locally
    925 local_table_df = dsa_tdb.fetch.check_local_storage(root_folder=root_folder)
    926 local_table_dates = local_table_df["date"].dt.strftime("%Y-%m-%d").values
    • Failing here for me on a clean system (no pre-existing data/advanced_pipeline folder) due to local_table_dates being an empty dataframe.

      Proposed fix:

      diff --git a/dsa_tdb/cli.py b/dsa_tdb/cli.py
      index 3532d47..1067e81 100644
      --- a/dsa_tdb/cli.py
      +++ b/dsa_tdb/cli.py
      @@ -923,7 +923,10 @@ def create_advanced_data(
           missing_days = set(table["day_str"].values) - set(days_from_agg_file)
           # Then remove the days that are already available locally
           local_table_df = dsa_tdb.fetch.check_local_storage(root_folder=root_folder)
      -    local_table_dates = local_table_df["date"].dt.strftime("%Y-%m-%d").values
      +    if not local_table_df.empty:
      +        local_table_dates = local_table_df["date"].dt.strftime("%Y-%m-%d").values
      +    else:
      +        local_table_dates = local_table_df["date"]
    • Enrico UBALDI changed this line in version 19 of the diff

      changed this line in version 19 of the diff

    • Please register or sign in to reply
  • 326 332 "--loglevel", type=str, help="The logging level. [DEBUG|INFO|WARNING|ERROR|CRITICAL]", default="INFO"
    327 333 )
    328 334
    335 # Advanced data preparation pipeline command:
    336 parserAdvancedData = subparsers.add_parser(
  • Enrico UBALDI added 3 commits

    added 3 commits

    • db5dab20 - Also simple version should be partitioned by month.
    • d597f755 - Raise a not implemented when checking for the version in advanced data.
    • b4dd8ad1 - Handles the nothing to do case.

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    • 7928985f - Handling default None start and end dates.

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    • 9731f1c1 - Added the build-local option to ignore web situation and replicate the...

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    • 09554d80 - Better handling the no-files-to process case.

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    • 1c0732e5 - Fixing failed download handling in advanced data and file download.

    Compare with previous version

  • Enrico UBALDI added 1 commit

    added 1 commit

    • 78b53064 - Fixed changing checksum pipeline.

    Compare with previous version

  • Please register or sign in to reply
    Loading