Page MenuHomePhabricator
Paste P43588

Test geocoded data with Spark on Caffeine
ActivePublic

Authored by Antoine_Quhen on Feb 6 2023, 8:24 AM.
-- Test that the generation of geocoded data by Spark with Caffeine caching generates the same result as with Hive with
-- Guava caching.
-- Note: reading wmf_raw.webrequest is slow as the json file format does not support reading only a column.
-- Note: distinct behave differently on Hive and on Spark (distinct structs not supported)
-- Note: The other UDFs are not ready for Spark preventing the generation of the exact same table
-- spark3-sql \
-- --master yarn \
-- --deploy-mode client \
-- --conf spark.driver.memory=2g \
-- --conf spark.driver.cores=1 \
-- --conf spark.executor.memory=6g \
-- --conf spark.executor.cores=2 \
-- --conf spark.dynamicAllocation.maxExecutors=64 \
-- --conf spark.executor.memoryOverhead=2048 \
-- --conf spark.sql.shuffle.partitions=1024 \
-- --conf spark.memory.fraction=0.8 \
-- --name test-webrequest-maxmind
SET parquet.compression = SNAPPY;
SET spark.sql.debug.maxToStringFields = 1000;
ADD JAR /usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar;
ADD JAR /home/aqu/analytics-refinery-source/refinery-hive/target/refinery-hive-0.2.11-SNAPSHOT-shaded.jar;
CREATE TEMPORARY FUNCTION geocoded_data as 'org.wikimedia.analytics.refinery.hive.GeocodedDataUDF';
-- Estimate the source
select count(1)
from wmf_raw.webrequest
WHERE webrequest_source='text' AND
year=2023 AND month=1 AND day=26 AND hour=17;
-- => 379_222_409
SELECT geocoded_data(ip) as geocoded_data
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2023 AND month=1 AND day=26 AND hour=17
limit 100;
-- => OK
-- Generate geocoded data
SELECT
ip as client_ip,
geocoded_data(ip) as geocoded_data
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2023 AND month=1 AND day=26 AND hour=17
limit 10;
-- OK
-- Look for differences in generated geocoded data
SELECT ip, count(distinct(cast(geocoded_data(ip) as string))) as total
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2023 AND month=2 AND day=1 AND hour=7
group by ip
having total > 1
limit 10 ;
-- no line returned
-- Put data into an adhoc table
use aqu ;
drop table geocoded_data;
CREATE EXTERNAL TABLE IF NOT EXISTS `geocoded_data`(
`ip` string,
`geocoded_data` string
)
PARTITIONED BY (
`webrequest_source` string COMMENT 'Source cluster',
`year` int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day` int COMMENT 'Unpadded day of request',
`hour` int COMMENT 'Unpadded hour of request'
)
STORED AS PARQUET
LOCATION 'hdfs://analytics-hadoop/user/aqu/geocoded_data'
;
INSERT OVERWRITE TABLE aqu.geocoded_data
PARTITION(webrequest_source='text',year=2023,month=2,day=1,hour=7)
SELECT distinct ip, cast(geocoded_data(ip) as string) as geocoded_data
FROM wmf_raw.webrequest
WHERE webrequest_source='text' AND year=2023 AND month=2 AND day=1 AND hour=7;
-- Look for differences with the old process
with new_geocoded_data as (
SELECT ip, geocoded_data
FROM aqu.geocoded_data
WHERE webrequest_source='text' AND year=2023 AND month=2 AND day=1 AND hour=7
), old_geocoded_data as (
SELECT client_ip as ip, cast(geocoded_data as string) as geocoded_data
FROM wmf.webrequest
WHERE webrequest_source='text' AND year=2023 AND month=2 AND day=1 AND hour=7
)
select count(1) as differences
from old_geocoded_data
left outer join new_geocoded_data using (ip, geocoded_data)
where new_geocoded_data.ip is null ;
-- 0