Skip to content

DAG for monthly Iceberg table maintenance

Xcollazo requested to merge table-maintenance into main

In this DAG, we launch monthly maintenance for Iceberg tables.

wmf_airflow_common.dataset.IcebergDataset.DEFAULT_MAINTENANCE defines the default Spark CALL procedures that will be scheduled for all Iceberg tables. These defaults can be overridden via config file analytics/config/datasets.yaml.

Example configuration from analytics/config/datasets.yaml:

...
iceberg_wmf_dumps_wikitext_raw:
  datastore: iceberg
  table_name: wmf_dumps.wikitext_raw_rc2

iceberg_wmf_dumps_wikitext_inconsistent_rows:
  datastore: iceberg
  table_name: wmf_dumps.wikitext_inconsistent_rows_rc1
  maintenance:
    data_delete:
      enabled: True
      where_clause: "computation_dt <= TIMESTAMP '{{ data_interval_end | subtract_days(30) | to_dt() }}'"
...

The fact that iceberg_wmf_dumps_wikitext_raw is defined as an iceberg dataset above resolves into picking up the following default Spark procedure CALLs:

  • spark_catalog.system.remove_orphan_files() -> This removes any data files that are not tied to any Iceberg snapshot. These orphan files can occur when writes fail and leave files behind.
  • spark_catalog.system.expire_snapshots() -> Iceberg keeps an immutable copy of the state of the table for each commit, so that we can 'go back in time' This procedure removes older snapshots and thus older files that are not needed anymore.
  • spark_catalog.system.rewrite_manifests() -> Over time, Iceberg tables may accumulate many manifest files that can slow down query planning. This procedure rewrites them so that reading is more efficient.

Data deletion is also possible, although disabled by default. The example iceberg_wmf_dumps_wikitext_inconsistent_rows above shows how to enable data deletion and how to define the WHERE clause to utilize. You can include jinja templates.

Bug: T338065

Edited by Xcollazo

Merge request reports