Draft: Resolve "Add support for new data sources"
Closes #4
Merge request reports
Activity
assigned to @ubalden
added 1 commit
- 25ea6da9 - Added documentation for the new pipeline in cli.
added 1 commit
- e1786fc3 - Release candidate with scripts and documentation.
I probably missed something but it does not work with light variant (error due to CloudFront not having a required file):
poetry run dsa-tdb-cli advanced_data_pipeline -d data/advanced_pipeline -p global -v light --loglevel INFO -n 20
Edited by Lucas VERNEY948 logger.info(f"Extracting {tmp_zip_name}...") 949 with ZipFile(tmp_zip_name, "r") as zip_ref: 950 zip_ref.extractall(root_folder) 951 # Remove the zip file 952 os.remove(tmp_zip_name) 953 logger.info(f"Extracted {tmp_zip_name}.") 954 logger.info("Checking for missing days from the aggregates done.") 955 956 logger.info("Starting up the spark session") 957 spark = dsa_tdb.utils.spark_session_factory( 958 app_name="Advanced data factory", 959 memory_limit=memory_limit, 960 n_workers=n_workers, 961 spark_local_dir=spark_local_dir, 962 ) 963 df_raw = spark.read.parquet(os.path.join(target_folder, CHUNKED_FILES_SUBFOLDER_NAME, "sor-*/part-*.parquet")) This, from the README is failing since we don't have the local chunked files in
CHUNKED_FILES_SUBFOLDER_NAME
(heredata/advanced_pipeline/DAILY_CHUNKED
). I'm not sure about the proper way to solve it but I would:- either make a call to preprocess right before line 963 to ensure we have all the required daily chunks files
- or make the
os.path.join(target_folder, CHUNKED_FILES_SUBFOLDER_NAME)
path configurable as a CLI arg to point to repository of choice for the user, e.g.data/tdb_data/global___light/daily_dumps_chunked
)
911 + T.ADVANCED_FILE_AGGREGATE_DATE_PATTERN.format(aggregation=T.TDB_agg_data_versions.complete) 912 ), 913 local_filename=agg_dates_file, 914 check_sha1=False, 915 ) 916 917 dates_file_df = pd.read_csv(agg_dates_file) 918 days_from_agg_file = dates_file_df["date"].unique() 919 920 table["day_str"] = table["date_str"] 921 logger.debug(f"Dates from the aggregates: {days_from_agg_file}") 922 logger.debug(f"Dates from the table: {table['day_str'].values}") 923 missing_days = set(table["day_str"].values) - set(days_from_agg_file) 924 # Then remove the days that are already available locally 925 local_table_df = dsa_tdb.fetch.check_local_storage(root_folder=root_folder) 926 local_table_dates = local_table_df["date"].dt.strftime("%Y-%m-%d").values Failing here for me on a clean system (no pre-existing
data/advanced_pipeline
folder) due tolocal_table_dates
being an empty dataframe.Proposed fix:
diff --git a/dsa_tdb/cli.py b/dsa_tdb/cli.py index 3532d47..1067e81 100644 --- a/dsa_tdb/cli.py +++ b/dsa_tdb/cli.py @@ -923,7 +923,10 @@ def create_advanced_data( missing_days = set(table["day_str"].values) - set(days_from_agg_file) # Then remove the days that are already available locally local_table_df = dsa_tdb.fetch.check_local_storage(root_folder=root_folder) - local_table_dates = local_table_df["date"].dt.strftime("%Y-%m-%d").values + if not local_table_df.empty: + local_table_dates = local_table_df["date"].dt.strftime("%Y-%m-%d").values + else: + local_table_dates = local_table_df["date"]
changed this line in version 19 of the diff
326 332 "--loglevel", type=str, help="The logging level. [DEBUG|INFO|WARNING|ERROR|CRITICAL]", default="INFO" 327 333 ) 328 334 335 # Advanced data preparation pipeline command: 336 parserAdvancedData = subparsers.add_parser( added 1 commit
- 9731f1c1 - Added the build-local option to ignore web situation and replicate the...
added 1 commit
- 09554d80 - Better handling the no-files-to process case.
added 1 commit
- 1c0732e5 - Fixing failed download handling in advanced data and file download.