Skip to content
Snippets Groups Projects

Study late events imported to HDFS by Gobblin - parallelized!

  • Clone with SSH
  • Clone with HTTPS
  • Embed
  • Share
    The snippet can be accessed without any authentication.
    Authored by Ottomata
    late_events_parallelized.py 15.55 KiB
    import requests
    import subprocess
    from datetime import datetime, timedelta
    import re
    
    
    # List all late events within raw directory (event_default)
    # by studying files with create timestamps out of the time range of the partition dir.
    
    # 1/ collect all streams from ESC
    
    def download_and_process_json(url):
        # Download JSON from the URL
        response = requests.get(url)
        return response.json()
    
    
    streamconfig_url = "https://meta.wikimedia.org/w/api.php?format=json&action=streamconfigs&all_settings=true"
    streamconfigs_json = download_and_process_json(streamconfig_url)
    len(streamconfigs_json["streams"])
    
    
    def get_value_from_path(parsed_json_data, path):
        """Get a value from a JSON object using a path."""
        try:
            # Split the path into keys, ignoring the leading slash if present
            keys = path.strip("/").split("/")
            # Traverse the JSON object using the keys
            for key in keys:
                parsed_json_data = parsed_json_data[key]
            return parsed_json_data
        except (KeyError, TypeError):
            # Return None if the path is not found or an error occurs
            return None
    
    
    def tableify(stream_name):
        """Convert a stream name to a table name."""
        if stream_name.startswith("eventlogging_"):
            stream_name = stream_name[len("eventlogging_"):]
        return re.sub(r"[^a-zA-Z0-9]", "_", stream_name).lower()
    
    
    EVENT_LOGGING_LEGACY_TABLE_EXCLUDE_REGEX = r"^(edit|inputdevicedynamics|pageissues|mobilewebmainmenuclicktracking|kaiosappconsent|mobilewebuiclicktracking|citationusagepageload|citationusage|readingdepth|editconflict|resourcetiming|rumspeedindex|layoutshift|featurepolicyviolation|specialmutesubmit|suggestedtagsaction)$"
    EVENT_LOGGING_LEGACY_TABLE_INCLUDE_REGEX = r"^(contenttranslationabusefilter|desktopwebuiactionstracking|mobilewebuiactionstracking|prefupdate|quicksurveyinitiation|quicksurveysresponses|searchsatisfaction|specialinvestigate|templatewizard|test|universallanguageselector|wikidatacompletionsearchclicks|editattemptstep|visualeditorfeatureuse|helppanel|homepagemodule|newcomertask|homepagevisit|serversideaccountcreation|cpubenchmark|navigationtiming|painttiming|savetiming|codemirrorusage|referencepreviewsbaseline|referencepreviewscite|referencepreviewspopups|templatedataapi|templatedataeditor|twocolconflictconflict|twocolconflictexit|virtualpageview|visualeditortemplatedialoguse|wikibasetermboxinteraction|wmdebannerevents|wmdebannerinteractions|wmdebannersizeissue|landingpageimpression|centralnoticebannerhistory|centralnoticeimpression|translationrecommendationuseraction|translationrecommendationuirequests|translationrecommendationapirequests|wikipediaportal)$"
    EVENT_TABLE_EXCLUDE_REGEX = r"^(mediawiki_page_properties_change|mediawiki_recentchange|resource_purge)$"
    
    
    def was_this_stream_included_in_legacy_refine(job_name, stream_name):
        """Get back regex mechanism from the old code."""
        dir_name = tableify(stream_name)
        if job_name == "eventlogging_legacy":
            return (
                    re.match(EVENT_LOGGING_LEGACY_TABLE_INCLUDE_REGEX, dir_name)
                    and not re.match(EVENT_LOGGING_LEGACY_TABLE_EXCLUDE_REGEX, dir_name)
            )
        elif job_name == "event_default":
            return not re.match(EVENT_TABLE_EXCLUDE_REGEX, dir_name)
        else:
            return True
    
    
    def get_event_streams_by_job_name(config, job_name: str):
        return [
            config
            for stream, config
            in config.get('streams', {}).items()
            if get_value_from_path(config, "consumers/analytics_hadoop_ingestion/job_name") == job_name
               # exclude regex streams (not used any more) e.g. /^swift\\.(.+\\.)?upload-complete$/
               # Used to by refined but empty streams anyway since 2021.
               and not stream.startswith("/") and not stream.endswith("/")
               # exclude disabled streams.
               and get_value_from_path(config, "consumers/analytics_hadoop_ingestion/enabled")
               # exclude external eventgate-logging-external streams
               # Unknown schemas. Were not refined anyway.
               and get_value_from_path(config, "destination_event_service") != "eventgate-logging-external"
               # A stream without canary events will miss _IMPORTED flags
               and get_value_from_path(config, "canary_events_enabled")
               and was_this_stream_included_in_legacy_refine(job_name, stream)
        ]
    
    
    event_default_streams = [
        {
            "name": stream_config["stream"],
            "hdfs_source_paths": [f"/wmf/data/raw/event/{topic}" for topic in stream_config.get("topics", [])],
        }
        for stream_config
        in get_event_streams_by_job_name(streamconfigs_json, "event_default")
    ]
    
    # 2/ collect all files recursively on HDFS for each stream
    
    PARTITION_PATTERN = r"year=(\d{4})/month=(\d{2})/day=(\d{2})/hour=(\d{2})/"
    
    
    def get_data_files_desc_by_partition_ts(directory):
        """Get the list of files in a directory recursively and group them by partition timestamps."""
        command = ["hdfs", "dfs", "-ls", "-R", directory]
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        if result.returncode != 0:
            print("Error executing command:", result.stderr)
            return None
        files = {}
        if result.stdout:
            for line in result.stdout.strip().split('\n'):
                # Get info from the file itself
                parts = line.split()
                path = parts[-1]
                if path.endswith("_COPYING_") or "_IMPORTED" in path:
                    continue
                file_size = int(parts[4])
                file_date = parts[5]
                file_time = parts[6]
                file_desc = {
                    "file_path": path,
                    "file_ts": datetime.strptime(f"{file_date} {file_time}", "%Y-%m-%d %H:%M"),
                    "file_size": file_size,
                }
                # Get info from the partition
                match = re.search(PARTITION_PATTERN, path)
                if not match:
                    continue
                year, month, day, hour = match.groups()
                partition_timestamp = datetime.strptime(f"{year}-{month}-{day} {hour}:00", "%Y-%m-%d %H:%M")
                # Add the file description to the list
                files_in_partition = files.get(partition_timestamp, [])
                files_in_partition.append(file_desc)
                files[partition_timestamp] = files_in_partition
        return files
    
    
    # 4/ Detect late arrived events files in partitions
    
    def is_late_event_file(partition_ts, file_ts):
        """Files created by Gobblin more than 2 hours and 20min after the beginning of the partition are considered late.
        Refine is triggered at HH:20 and refines until 2H before the beginning of the hour"""
        return abs(file_ts - partition_ts) > timedelta(hours=2, minutes=20)
    
    def is_partition_with_late_event_file(partition_ts, partition_files):
        return any([
            is_late_event_file(partition_ts, file["file_ts"])
            for file in partition_files
        ])
    
    
    # 5/ Report
    
    def report_late_events(files_by_partition, verbose=False):
        partitions_nb = len(files_by_partition)
        partitions_with_late_events_nb = 0
        files_with_late_events_total_size = 0
        latencies = []
        for partition_ts, partition_files in files_by_partition.items():
            if is_partition_with_late_event_file(partition_ts, partition_files):
                partitions_with_late_events_nb += 1
                if verbose:
                    print(f"Partition {partition_ts} has late events files:")
                for file in partition_files:
                    if verbose:
                        print(f"  - {file['file_path']} ({file['file_ts']})")
                    if is_late_event_file(partition_ts, file["file_ts"]):
                        files_with_late_events_total_size += file["file_size"]
                        latencies.append(abs(file["file_ts"] - partition_ts))
        if verbose:
            print(f"Number of late events files: {partitions_with_late_events_nb}/{partitions_nb} ({partitions_with_late_events_nb/partitions_nb:.2%})")
        return {
            "nb_of_partitions": partitions_nb,
            "nb_of_partitions_with_late_event_files": partitions_with_late_events_nb,
            "ratio": partitions_with_late_events_nb/partitions_nb,
            "total_file_sizes": sum([file["file_size"] for files in files_by_partition.values() for file in files]),
            "files_with_late_events_total_size": files_with_late_events_total_size,
            "latencies": latencies,
        }
    
    
    # Test on a single stream
    # directory = "/wmf/data/raw/event/codfw.android.article_toc_interaction"
    # files_by_partition = get_data_files_desc_by_partition_ts(directory)
    # report_late_events(files_by_partition, True)
    
    # Partition 2024-05-21 08:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=05/day=21/hour=08/part.task_event_default_1716278713673_194_2.txt.gz (2024-05-21 08:06:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=05/day=21/hour=08/part.task_event_default_1716282315097_86_0.txt.gz (2024-05-21 09:06:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=05/day=21/hour=08/part.task_event_default_1716289513265_137_1.txt.gz (2024-05-21 11:06:00)
    # Partition 2024-05-21 09:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=05/day=21/hour=09/part.task_event_default_1716282315097_86_2.txt.gz (2024-05-21 09:06:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=05/day=21/hour=09/part.task_event_default_1716289513265_137_0.txt.gz (2024-05-21 11:06:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=05/day=21/hour=09/part.task_event_default_1716300912369_171_1.txt.gz (2024-05-21 14:16:00)
    # Partition 2024-07-02 04:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=02/hour=04/part.task_event_default_1719893715576_38_1.txt.gz (2024-07-02 04:16:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=02/hour=04/part.task_event_default_1719914699691_37_0.txt.gz (2024-07-02 10:06:00)
    # Partition 2024-07-02 05:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=02/hour=05/part.task_event_default_1719914699691_37_1.txt.gz (2024-07-02 10:06:00)
    # Partition 2024-07-02 06:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=02/hour=06/part.task_event_default_1719914699691_37_2.txt.gz (2024-07-02 10:06:00)
    # Partition 2024-07-02 07:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=02/hour=07/part.task_event_default_1719914699691_37_3.txt.gz (2024-07-02 10:06:00)
    # Partition 2024-07-10 08:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=10/hour=08/part.task_event_default_1720599313836_13_1.txt.gz (2024-07-10 08:16:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=10/hour=08/part.task_event_default_1720610113382_4_0.txt.gz (2024-07-10 11:16:00)
    # Partition 2024-07-11 08:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=08/part.task_event_default_1720685715311_163_1.txt.gz (2024-07-11 08:16:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=08/part.task_event_default_1720710914625_66_0.txt.gz (2024-07-11 15:24:00)
    # Partition 2024-07-11 09:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=09/part.task_event_default_1720710914625_66_1.txt.gz (2024-07-11 15:24:00)
    # Partition 2024-07-11 10:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=10/part.task_event_default_1720710914625_66_2.txt.gz (2024-07-11 15:24:00)
    # Partition 2024-07-11 11:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=11/part.task_event_default_1720710914625_66_3.txt.gz (2024-07-11 15:24:00)
    # Partition 2024-07-11 12:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=12/part.task_event_default_1720710914625_66_4.txt.gz (2024-07-11 15:24:00)
    # Partition 2024-07-11 13:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=07/day=11/hour=13/part.task_event_default_1720710914625_66_5.txt.gz (2024-07-11 15:24:00)
    # Partition 2024-08-13 09:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=08/day=13/hour=09/part.task_event_default_1723540514357_153_1.txt.gz (2024-08-13 09:16:00)
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=08/day=13/hour=09/part.task_event_default_1723554912751_7_0.txt.gz (2024-08-13 13:16:00)
    # Partition 2024-08-13 10:00:00 has late events files:
    #   - /wmf/data/raw/event/codfw.android.article_toc_interaction/year=2024/month=08/day=13/hour=10/part.task_event_default_1723554912751_7_1.txt.gz (2024-08-13 13:16:00)
    # Number of late events files: 15/2164 (0.69%)
    
    
    # Test on all event_default streams
    reports = []
    partitions_nb = 0
    partitions_with_late_events_nb = 0
    data_size = 0
    late_event_files_size = 0
    latencies = []
    
    
    from concurrent.futures import ProcessPoolExecutor
    
    
    def get_late_events_for_paths(path):
    
        print(f"Processing path {path}")
    
        files_by_partition = get_data_files_desc_by_partition_ts(path)
        partitions_nb = len(files_by_partition)
        report = report_late_events(files_by_partition)
        partitions_with_late_events_nb = report["nb_of_partitions_with_late_event_files"]
        data_size = report["total_file_sizes"]
        late_event_files_size = report["files_with_late_events_total_size"]
        latencies = [ts_diff.total_seconds() / 3600 for ts_diff in report["latencies"]]
    
        return {
            'path': path,
            'partitions_nb': partitions_nb,
            'report': report,
            'partitions_with_late_events_nb': partitions_with_late_events_nb,
            'data_size': data_size,
            'late_event_files_size': late_event_files_size,
            'latencies': latencies
        }
    
    
    def print_results(late_events_results):
    
        for result in late_events_results:
            if result['partitions_with_late_events_nb'] == 0:
                print("skipping ", path)
                continue
            print(f"Path: {result['path']}")
            print(f"\tNumber of partitions with late events files: {result['partitions_with_late_events_nb']}/{result['partitions_nb']} ({result['partitions_with_late_events_nb']/result['partitions_nb']:.2%})")
            # print(f"\tTotal data size: {result['data_size']/1024/1024/1024/1024:.2f} TB")
            # print(f"\tTotal late events files size: {result['late_event_files_size']/1024/1024/1024:.2f} GB")
            print(f"\tRatio of late events files size: {result['late_event_files_size']/result['data_size']:.2%}")
            print(f"\tLatencies stats (in hours): len={len(result['latencies'])} min={min(result['latencies'])}, max={max(result['latencies'])}, avg={sum(result['latencies'])/len(result['latencies'])}")
    
    
    
    paths = []
    for stream in event_default_streams:
        for path in stream["hdfs_source_paths"]:
            paths.append(path)
    
    paths = paths[0:20]
    
    with ProcessPoolExecutor(10) as executor:
        results = list(executor.map(get_late_events_for_paths, paths))
    
    
    print_results(results)
    
    # Number of partitions with late events files: 4154/525798 (0.79%)
    # Total data size: 18.81 TB
    # Total late events files size: 109.67 GB
    # Ratio of late events files size: 0.57%
    # Latencies stats (in hours): len=4959 min=2.35, max=7.6, avg=4.433860993479679
    0% or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment