Page MenuHomePhabricator

Support querying a range of hourly data partitions
Open, LowPublic

Description

A common scenario when working with a big table like wmf.webrequest is having to query one hour of data at a time and wrapping that query in a script which runs through all the different values and stitches together the results into a single dataset. The query uses placeholders, e.g.

WHERE year = ${year} AND month = ${month} AND day = ${day} AND hour = ${hour}

and it would be useful to have a helper utility for running the query on a time range (e.g. from 2020-12-31 00:00 to 2021-01-01 23:00) or a date range (2021-10-01 to 2021-10-31)

Something like this:

INPUT: start, end , query with placeholders for year, month, day, hour (optional, may be omitted in cases where query is safe to run on a day of data), **kwargs

  1. generate range of times from start to end in hour increments
  2. for each time, extract year month day hour
  3. fill in placeholders in query with those values and run it (& forwarding kwargs)
  4. append results (with time info as columns for aggregating later)

OUTPUT: DataFrame of per-hour or per-day results from start to end

Event Timeline

nshahquinn-wmf renamed this task from [Feature Request] Querying a time range to Support querying a range of hourly data partitions.Oct 29 2021, 6:39 PM

(Editing because sorry, I read the task description a little more closely and it's not the same use case, but keeping this anyway because perhaps it's useful for others)

I have a Python helper function I wrote that creates a partition statement based on two datetime.datetime parameters (but it uses day resolution so datetime.date would work too), pasted it in below. It doesn't yet support spanning multiple years outside of December/January, but that could be added. It supports prefixes because you might want to create these for multiple tables. Here's an example of using this, where exp_start_ts and exp_end_ts are variables for the start and end timestamps of an experiment:

sql_query = '''
SELECT
    *
FROM table
WHERE {partition_statement}'''

df = spark.run(sql_query.format(partition_statement = make_partition_statement(exp_start_ts, exp_end_ts))

And here's the function:

def make_partition_statement(start_ts, end_ts, prefix = ''):
    '''
    This takes the two timestamps and creates a statement that selects
    partitions based on `year`, `month`, and `day` in order to make our
    data gathering not use excessive amounts of data. It assumes that
    `start_ts` and `end_ts` are either in the same year, or if spanning
    a year boundary are within a month apart.
    This assumption simplifies the code and output a lot.
    
    An optional prefix can be set to enable selecting partitions for
    multiple tables with different aliases.
    
    :param start_ts: start timestamp
    :type start_ts: datetime.datetime
    
    :param end_ts: end timestamp
    :type end_ts: datetime.datetime
    
    :param prefix: prefix to use in front of partition clauses, "." is added automatically
    :type prefix: str
    '''
    
    if prefix:
        prefix = f'{prefix}.' # adds "." after the prefix
    
    # there are three cases:
    # 1: month and year are the same, output a "BETWEEN" statement with the days
    # 2: the years are the same, and the months differ by 1: output a statement for each month
    # 3: the years are the same: create a list of statements from start_ts.month to end_ts.month,
    #    return them OR'ed together
    # 4: the years differ by 1, start_ts is December and end_ts is January, do the same as #2
    # 5: anything else, raise an exception because this isn't implemented yet.
    
    if start_ts.year == end_ts.year and start_ts.month == end_ts.month:
        return(f'''{prefix}year = {start_ts.year}
AND {prefix}month = {start_ts.month}
AND {prefix}day BETWEEN {start_ts.day} AND {end_ts.day}''')
    elif start_ts.year == end_ts.year and (end_ts.month - start_ts.month) == 1:
        return(f'''
(
    ({prefix}year = {start_ts.year}
     AND {prefix}month = {start_ts.month}
     AND {prefix}day >= {start_ts.day})
 OR ({prefix}year = {end_ts.year}
     AND {prefix}month = {end_ts.month}
     AND {prefix}day <= {end_ts.day})
)''')
    elif start_ts.year == end_ts.year:
        # do the start month as a list
        parts = [f'''({prefix}year = {start_ts.year}
     AND {prefix}month = {start_ts.month}
     AND {prefix}day >= {start_ts.day})''']
        # for month +1 to end month, add each month
        for m in range(start_ts.month+1, end_ts.month):
            parts.append(f'''({prefix}year = {start_ts.year}
            AND {prefix}month = {m})''')
        # then append the end month and return a parenthesis OR'ed together of all of it
        parts.append(f'''({prefix}year = {end_ts.year}
     AND {prefix}month = {end_ts.month}
     AND {prefix}day <= {end_ts.day})''')
        return('({})'.format(
            '\nOR\n'.join(parts)
        ))
    elif (end_ts.year - start_ts.year) == 1 and start_ts.month == 12 and end_ts.month == 1:
        return(f'''
(
    ({prefix}year = {start_ts.year}
     AND {prefix}month = {start_ts.month}
     AND {prefix}day >= {start_ts.day})
 OR ({prefix}year = {end_ts.year}
     AND {prefix}month = {end_ts.month}
     AND {prefix}day <= {end_ts.day})
)''')
    else:
        raise Exception('Difference between start and end timestamps is not implemented. See code for details.')
ldelench_wmf moved this task from Triage to Backlog on the Product-Analytics board.

I needed something like this for T309036, so I wrote a version that does this on a per-day basis. This can then be modified to do hourly queries by adding freq = 'H' to the parameters of the pandas.date_range call. If we want the right end of the range to be exclusive we'd also add inclusive = "left" to the parameters. I'll file a pull request for this, in the meantime here's the code for the day-based version with an example call and imports that make it work:

import wmfdata
import pandas as pd

def run_daily_queries(engine, query, start_date, end_date):
    '''
    For every day between `start_date` and `end_date` (inclusive), run `query` on
    the given wmfdata query engine (meaning it expects `engine.run()` to exist).
    
    Expects `query` to define format fields `year`, `month`, and `day` so that
    they can be inserted for each day.
    
    Example Spark query showing the format fields:
    
    SELECT
        to_date(concat(year, "-", month, "-", day)) AS view_date,
        access_method,
        agent_type,
        sum(view_count) AS num_page_views
    FROM wmf.pageview_hourly
    WHERE year = {year}
    AND month = {month}
    AND day = {day}
    AND project = "en.wikipedia"
    AND page_id = 5043734 -- the Wikipedia article
    GROUP BY to_date(concat(year, "-", month, "-", day)), access_method, agent_type
    '''
    
    results = list()
    
    for this_day in pd.date_range(start_date, end_date):
        results.append(
            engine.run(
                query.format(
                    year = this_day.year,
                    month = this_day.month,
                    day = this_day.day
                )
            )
        )
    return(pd.concat(results))

# imagine you define `test_query` as the example query above here,
# then the following import and call gets pageviews to the Wikipedia
# article on English Wikipedia for every day of May 2022
import datetime as dt
daily_enwp_pageviews = run_daily_queries(wmfdata.spark, test_query, dt.date(2022, 5, 1), dt.date(2022, 5, 31))

@nettrom_WMF nice, I think this will be very useful! Looking forward to that pull request 😊

@nettrom_WMF Thank you for sharing that code! I recently used it in T353666 and it was very helpful! Just wanted to show my appreciation.