Page MenuHomePhabricator

Prototype Flink job for content Dumps
Closed, DeclinedPublic

Description

A design document is being written and a lot of decisions need to be made about the next architecture of dumps. It's hard to make some of those decisions without experimenting a little with Flink and what it could and couldn't do in terms of scale and friendliness to developers.

Update

The Flink prototype has been stuck for a while on a confusing pom configuration issue to do with Java service providers and registering all that's needed for the Kafka -> Iceberg pipeline. Code with a few different iterations available on Dan's github. Going to break down into some more exploration to try to solve the problem by walking around it:

  • write the prototype in Spark Streaming (was already a separate task: T322326: Prototype Spark Streaming Job for Content Dumps)
  • try Maven wrapper plugin and get advice from Guillaume
  • try to write to just hdfs from Kafka (already done, but maybe playing more with it will shed some light)
  • try to write to Iceberg backed by local filesystem (probably same issue of including the fs.impl)
  • talk more with Flink slack community (their suggestions)
  • try gradle instead of maven (this example is the closest I've been able to find to what we're facing and they use gradle) - canceled (got advice that it would have the same issue as maven)

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

Got the basics set up in the Flink SQL client. Updating my code from before. I think I'm going to leave Flink SQL here. The problem is that it has pretty bad actual SQL support (like no built-in timestamp functions etc.) So to use it to actually do the kinds of transformations we need we'd have to build timestamp parsing UDFs and stuff like that. I feel that if you're writing Java/Scala anyway, you might as well just stay in Java and write the whole job there. That way at least all the logic is in one place and understanding the code doesn't require understanding multiple environments. Maybe if we do more work to make the Flink SQL environment painless, we can come back to this. For now, a scala or python Flink job seem to me the best way forward.

(on stat1004.eqiad.wmnet)

Proxies

export PATH=$PATH:/home/milimetric/.local/bin
export http_proxy=http://webproxy:8080
export https_proxy=http://webproxy:8080
export HTTP_PROXY=http://webproxy:8080
export HTTPS_PROXY=http://webproxy:8080
export no_proxy=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org
export NO_PROXY=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org
export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt

Downloads

wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.15.2/flink-sql-connector-hive-2.3.6_2.12-1.15.2.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.1/kafka-clients-3.3.1.jar
wget https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar
Build Iceberg for iceberg-flink-runtime
(Latest available for download is this and it has bugs: https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.12.1/iceberg-flink-runtime-0.12.1.jar)
git clone https://github.com/apache/iceberg.git
./gradlew build -x test -x integrationTest
(copy flink/v1.15/flink-runtime/build/libs/iceberg-flink-runtime-1.15-0.15.0-SNAPSHOT.jar to stat1004 if built locally)

Extracts

tar xzvf flink-1.15.2-bin-scala_2.12.tgz

(organize directories however you like and go into extracted flink dir)

Configure

Making these changes in conf/flink-conf.yaml:

security.kerberos.login.use-ticket-cache: true
security.kerberos.krb5-conf.path: /etc/krb5.conf

Make a file like init-flink-iceberg-catalog.sql:

CREATE CATALOG hive_iceberg_catalog WITH (
    'type'='iceberg',
    'catalog-type'='hive',
    'uri'='thrift://analytics-hive.eqiad.wmnet:9083',
    'property-version'='1',
    'hive-conf-dir'='/etc/hive/conf/',
    'hadoop-conf-dir'='/etc/hadoop/conf/',
    'warehouse'='hdfs://analytics-hadoop/user/hive/warehouse'
);

CREATE CATALOG hive_catalog WITH (
    'type'='hive',
    'property-version'='1',
    'hive-conf-dir'='/etc/hive/conf/',
    'hadoop-conf-dir'='/etc/hadoop/conf/'
);

USE hive_iceberg_catalog.from_kafka;
USE hive_catalog.milimetric;

Run

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HBASE_CONF_DIR=/etc/hbase/conf
./bin/stop-cluster.sh
./bin/start-cluster.sh
./bin/sql-client.sh embedded                                \
    -i ../init-flink-iceberg-catalog.sql                    \
    -j ../iceberg-flink-runtime-1.15-0.15.0-SNAPSHOT.jar    \
    -j ../flink-sql-connector-hive-2.3.6_2.12-1.15.2.jar    \
    -j ../flink-connector-kafka-1.15.2.jar                  \
    -j ../kafka-clients-3.3.1.jar                           \
    -j ../snappy-java-1.1.8.4.jar                           \
    -l /etc/hadoop/conf                                     \
    -l /usr/lib/hadoop/lib/                                 \
    -l /usr/lib/hadoop/                                     \
    -l /usr/lib/hadoop-hdfs/./                              \
    -l /usr/lib/hadoop-hdfs/lib/                            \
    -l /usr/lib/hadoop-hdfs/                                \
    -l /usr/lib/hadoop-yarn/lib/                            \
    -l /usr/lib/hadoop-yarn/                                \
    -l /usr/lib/hadoop-mapreduce/lib/                       \
    -l /usr/lib/hadoop-mapreduce/                           \
    shell

Create Database and Table

CREATE DATABASE hive_iceberg_catalog.from_kafka;

CREATE TABLE hive_iceberg_catalog.from_kafka.revision (
    wiki_db     string  not null    COMMENT '',
    rev_id      bigint  not null    COMMENT '',
    is_public   boolean not null    COMMENT '',
    content     string  null        COMMENT '',

    PRIMARY KEY (wiki_db, rev_id) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');

CREATE TABLE hive_iceberg_catalog.from_kafka.page (
    wiki_db     string  not null    COMMENT '',
    page_id     bigint  not null    COMMENT '',
    is_public   boolean not null    COMMENT '',

    PRIMARY KEY (wiki_db, page_id) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');

Create Kafka Table on top of a topic

CREATE TABLE mw_page_create_stream (
    `comment` string,
    database string,
    page_id bigint,
    rev_timestamp
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (
  'connector' = 'kafka',
  'topic' = 'eqiad.mediawiki.page-create',
  'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092',
  'properties.group.id' = 'milimetricTest',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE mw_page_delete_stream (
    `comment` string,
    database string,
    page_id bigint
) WITH (
  'connector' = 'kafka',
  'topic' = 'eqiad.mediawiki.page-delete',
  'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092',
  'properties.group.id' = 'milimetricTest',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

and then play with some data

insert into revision values ('enwiki', -1, true, 'hello world !');
insert into revision values ('rowiki', -2, true, 'bună lume!');
select * from revision;
insert into revision  /*+ OPTIONS('upsert-enabled'='true') */
values ('rowiki', -2, false, 'bună lume!');
select * from revision;

-- this doesn't work
-- (yes the upsert is not needed here)
-- setting up commit properly is most likely the issue: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#writing
insert into page /*+ OPTIONS('upsert-enabled'='true') */
 select database as wiki_db,
        page_id,
        true as is_public
   from hive_catalog.milimetric.mw_page_create_stream
  limit 10
;

SET execution.result-mode=tableau;
select * from page;

I was wrong to think I'd finish this by the end of the week. It's just been a series of errors with no docs to help. Current state is Iceberg is having trouble reading metadata, seems like somehow it doesn't know how to use HDFS?

2022-11-01 22:17:11,173 WARN  org.apache.iceberg.util.Tasks                                [] - Retrying task after failure: Failed to open input stream for file: hdfs://analytics-hadoop/user/hive/warehouse/from_kafka.db/revision/metadata/00000-a16538fb-5aed-4754-bdd4-2bc3c8966f71.metadata.json
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://analytics-hadoop/user/hive/warehouse/from_kafka.db/revision/metadata/00000-a16538fb-5aed-4754-bdd4-2bc3c8966f71.metadata.json
	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:185) ~[blob_p-a25da36a5d64560abd15cadbe33cf2517cb1ffa8-3653660b09fe11412653c8f0090345f7:?]
	at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:265) ~[blob_p-a25da36a5d64560abd15cadbe33cf2517cb1ffa8-3653660b09fe11412653c8f0090345f7:?]

...

2022-11-01 22:10:56,622 WARN  org.apache.hadoop.hdfs.DFSClient                             [] - Failed to connect to /10.64.5.41:50010 for block BP-1552854784-10.    64.21.110-1405114489661:blk_2063953460_990294987, add to deadNodes and continue.~
399 java.io.IOException: The stream is closed
400 >---at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:118) ~[hadoop-common-2.10.2.jar:?]
401 >---at java.io.DataOutputStream.write(DataOutputStream.java:107) ~[?:1.8.0_342]
402 >---at com.google.protobuf.CodedOutputStream.refreshBuffer(CodedOutputStream.java:833) ~[protobuf-java-2.5.0.jar:?]

...

more in stat1004:/home/milimetric/flink-1.15.2/log/flink-milimetric-taskexecutor-0-stat1004.log

Deciding against Flink, at least for now. Documenting as a decision record here.

Reason: In trying to write a proof of concept job, we met with too many Java dependency problems. Building a maven pom with Flink, Iceberg, Kafka, and jars we need for our Hadoop environment runs into duplications and version mismatches. It's basically jar hell that may not be worth untangling. It's possible that in the near future the Iceberg team will solve this problem for us. It's also the case that Spark (Streaming or plain) seems to be working quite well for this use case.

note for myself: https://github.com/apache/iceberg/pull/6182/files is recent activity about supporting deletes in future Flink / Iceberg APIs