Page MenuHomePhabricator

Implement an Airflow operator for moving data from point A to B
Open, Needs TriagePublic

Description

In this task, we want to implement a new Airflow operator for moving data from point A to B.

The first use case will be to move data from HDFS to the clouddumps servers.

Something like:

publish = PublishData(
  source = "hdfs://wmf/data/archives/file_export",
  targets = ["sftp://clouddumps1001", "sftp://clouddumps1002"]
)

Context:

We need to add the step that syncs from hdfs to public. Not sure how that should be done.

In HDFS, we have folder /wmf/data/archive where you can move your files to. Let's say you do it to /wmf/data/archive/cirrus-search-index/{date}/blah.

Then you can set an hdfs_tools::hdfs_rsync_job in puppet to rsync from that HDFS path to the clouddumps* nodes that serve the dumps (examples here).

I'd say that there are some other options to be considered, too. That puppet based mechanism that calls hdfs-rsync will work, but it's maybe a bit of a legacy way to do it.
When we migrated the dumps v1 to Airflow recently, we needed to find a way to publish from the CephFS mount point /mnt/dumpsdata to the clouddumps hosts.

We created a sync-utils container image and we then add specific tasks into our DAG that is responsible for publishing the files created.
In the case of the dumps, we found that the best option was to use parallel-rsync and specify both clouddumps1001 and clouddumps1002 as the targets.
This allows us to have one task that either successfully publishes to both target locations, or it fails.

So for example, if you look at the current cirrussearch dumps, you will see that they have a sync_cirrussearch_dumps task, which calls parallel-rsync with these custom arguments.

image.png (577×1 px, 149 KB)

There are many options around how you would schedule and trigger these publishing tasks, so they don't all need to be sequential like the example shown here.

However, for this requirement it would be a little different, because the source files are presumably going to be created on HDFS, rather than CephFS.
This means that we won't be able to have the source directory mounted as a locally available file system.

I think that we have at least a few ways that we could tackle this, though.
One that occurs to me immediately is that we could use rclone instead of parallel-rsync

rclone already has an hdfs remote capability built-in, so we could use this for one side of the connection and an sftp remote on the other side.
We would be able to give it access to the kerberos credential cache and Hadoop configuration files for the HDFS connection, and the SSH private key for the SFTP connection.

Then we could just execute an rclone sync command and supply the source and destination paths.

@xcollazo - I can see this being a good option for T384381: Airflow jobs to do monthly XML dumps as well.

Event Timeline

Blunderbuss could easily do this for you, with minimal resource usage on the Airflow executor side :-)

Blunderbuss could easily do this for you, with minimal resource usage on the Airflow executor side :-)

Would data sizes be any concern? One of our use cases is a weekly transfer of a few hundred gb spread across ~10k files in a nested directory structure.

Blunderbuss could easily do this for you, with minimal resource usage on the Airflow executor side :-)

Interesting. We are looking at an equivalent to rsync. In my use case , we need to be able to sync ~9TBs (typical size of a full dump), and also remove data on the target when it is removed from the source. Would Blunderbuss do that?

Would data sizes be any concern? One of our use cases is a weekly transfer of a few hundred gb spread across ~10k files in a nested directory structure.

we need to be able to sync ~9TBs (typical size of a full dump), and also remove data on the target when it is removed from the source. Would Blunderbuss do that?

I haven't tested the performance on datasets this big. The artifact cache warming functionality moves files about 1.5GB big with ease and it's quite fast too. The copy is buffered (1MB buffer as far as I recall) although we do have to use the local filesystem for temporary cache.

How big are the individual files we need to move for this?

For this specific use-case (HDFS <-> filesystem) we have created hdfs-rsync, already used to move data to clouddump.

For this specific use-case (HDFS <-> filesystem) we have created hdfs-rsync, already used to move data to clouddump.

The idea for this ticket is to have an Airflow operator rather than a puppet definition. Running hdfs-rsync underneath the operator is a reasonable choice, but wanted to explore whether a non-hdfs specific mechanism would work as well. @BTullis had mentioned rclone, which would also support ceph out of the box.

...
How big are the individual files we need to move for this?

For my use case files are ~512MB-1GB each.

The idea for this ticket is to have an Airflow operator rather than a puppet definition. Running hdfs-rsync underneath the operator is a reasonable choice, but wanted to explore whether a non-hdfs specific mechanism would work as well. @BTullis had mentioned rclone, which would also support ceph out of the box.

rclone is advertised to support HDFS out of the box too!

How big are the individual files we need to move for this?

In my dataset we've targeted 0.5GB - 1.5GB for the file sizes, although some wikis are smaller and have a single file smaller than that.

TIL rclone!

Agree the puppet scheduled pulls of hdfs-rsync are not ideal.

If we can use this (or hdfs-rsync?) to rsync push from HDFS (scheduled by airflow) elsewhere, that would solve a lot of use cases. Maybe T380626 (although that might involve git commits)? I think @CDanis has another traffic use case for pushing computed data to traffic servers? (can't find ticket rn).

Imagine if you could airflow schedule a rsync push to Toolforge Ceph/S3!

This all feels very achievable, but I wonder if we might be making things difficult for ourselves by trying to define one operator that can do it all, like a single Swiss Army knife.
It might be a little more practical to think of it as a small number of closely related operators, each using a different underlying tool for the transfer.

Or perhaps I'm just not thinking about it with a sufficiently Python mindset and there wouldn't be any trouble making this work.

To start with, we have a number of different source and target technologies that are under consideration.
Let's just think about this from the point of view of an Airflow task, for now.

Sync Sources:
TypeURLAlready implemented in an Airflow task
A cephfs locally mounted file system pathe.g. /mnt/dumpsdata in the dumps_v1 podsYes - dumps_v1
An HDFS pathe.g. hdfs://wmf/data/archives/file_exportNo - although we do move files to other HDFS locations
An HDFS FUSE mounted pathe.g. /mnt/hdfs/wmf/data/archives/file_exportNo - no requirement has arisen yet

Others, not currently considered: S3 buckets, Swift containers, CephFS FUSE, LibCephFS, Locally mounted Ceph block devices.

Sync Targets:
TypeURLAlready implemented in an Airflow task
A single SSH/SFTP servere.g. clouddumps1001No - no requirement has arisen yet
Multiple SSH/SFTP serverse.g clouddumps1001 and clouddumps1002Yes - dumps_v1
An S3 bucket on the DPE clustershttps://rgw.eqiad.dpe.anycast.wmnet or https://rgw.codfw.dpe.anycast.wmnetNo - no requirement has arisen yet
As S3 bucket on the WMCS clusterhttps://object.eqiad1.wikimediacloud.org/No - no requirement has arisen yet

Others, not currently considered: An HDFS path, CephFS paths, Locally mounted Ceph block devices.


Now let's think about the different tools that we have at our disposal and...

ToolBinary executableLocal file systemsHDFS file systemsSSH/SFTP protocolrsync protocolS3 protocolMultiple targetsIncluded in sync-utilsNotes
SFTPsftpNo built-in sync protocol
rsyncrsyncCan use SSH or rsync protocol transport
parallel-rsyncparallel-rsyncWraps rsync for multiple targets
hdfs-rsynchdfs-rsyncMaintained by WMF
rclonercloneFeature rich in terms of sync

I am sure that there will be other tools that we could add to this comparison.

So for dumps_1 we chose parallel-rsync because:

  • The source was a locally mounted cephfs file system.
  • The target was multiple SSH servers.
  • We wanted to use the rsync protocol over SSH for efficiency in the sync protocol.
  • We wanted to minimise the number of tasks, so having one task that synced to both servers was ideal.

@brouberol has already been discussing making our parallel-rsync based tasks into an operator, but I think that this is a good opportunity to take a look at the big picture and work out whether it's one multi-purpose operator or a small collection of closely related data-transfer operators, or something else.

My feeling is that rclone would be a good tool to look at for the current requirement to publish files from HDFS to multiple SSH/SFTP servers, but we would lose the ability to have a single task sync to multiple targets. In this case, I think that's probably a good trade-off.

There is no real impediment to adding hdfs-rsync to the sync-utils container spec either.
However, hdfs-rsync does not support remote SSH/SFTP targets, so might not be an efficient route to take for this requirement.

This all feels very achievable, but I wonder if we might be making things difficult for ourselves by trying to define one operator that can do it all, like a single Swiss Army knife.
It might be a little more practical to think of it as a small number of closely related operators, each using a different underlying tool for the transfer.

Agreed. Immediately, we just want to support the use cases at hand. Perhaps what we should do is have an interface that allows us to evolve the operator in the future to support more targets and syncs.

However, hdfs-rsync does not support remote SSH/SFTP targets,

Took a look at the code, seems like it requires to run locally on the target, and the target needs to have the hdfs client utils. Is that correct, @JAllemandou ?

However, hdfs-rsync does not support remote SSH/SFTP targets,

Took a look at the code, seems like it requires to run locally on the target, and the target needs to have the hdfs client utils. Is that correct, @JAllemandou ?

That's right!

This all feels very achievable, but I wonder if we might be making things difficult for ourselves by trying to define one operator that can do it all, like a single Swiss Army knife.

Other than copying to multiple targets using one command/operation, am I wrong in assuming that rclone would indeed be the Swiss Army knife in this scenario? Fast, talks to all the different systems we need it to talk to, doesn't need a Java installation, would be super easy to implement using a Docker image.

Other than copying to multiple targets using one command/operation, am I wrong in assuming that rclone would indeed be the Swiss Army knife in this scenario? Fast, talks to all the different systems we need it to talk to, doesn't need a Java installation, would be super easy to implement using a Docker image.

Also, rclone is packaged in Debian already, so it's easily installable on either production Docker images as well as bare-metal hosts. (And at least trixie's rclone version outputs hdfs as part of rclone help backends.)

We already use rclone for the out-of-band backups of our postgresql clusters behind the airflow instances. https://wikitech.wikimedia.org/wiki/Data_Platform/Systems/PostgreSQL/Backup_and_Restore

Using it in puppetland and dse-k8s land are very similar.
Much like envoyproxy.

Ottomata added a subscriber: Htriedman.

cc @Htriedman I think there may be an WME use case here?

@Ottomata Yes correct! I'm not 100% sure on the details but likely some kind of S3 bucket sync to WME's infra

We discussed this work on today's DPE SRE / DE sync up.

T384382: Production-level file export (aka dump) of MW Content in XML, for which we wanted to use this as a first use case, will be shipping soon. However, DPE SRE folks are committed with other work.

Thus we have decided to not block T384382 on this new mechanism, as we have an existing puppet-based mechanism that we can use for the time being.

Once DPE SRE folks have more time we can explore this idea.

As an experiment, I'm going to build a version of sync-utils that has support for hdfs-fuse mounts.

I think that this might be a quicker win than getting rclone to work, because if we can get a FUSE mount working for HDFS then we can reuse all of the parallel-rsync stuff that we already have in place for dumps_v1.

I'm just flagging here an investigation that I looked at as part of T405360.
In T402943#11297764 we can see that we currently use hdfs-rsync with an NFS source (clouddumps1002) and an HDFS target.

Each of these system timers running on an-launcher1003 is importing files that are generated using the dumps_v1 mechanism into HDFS.
There's a massive inefficiency here, because the files are already available on a cephfs volume, before being published to the clouddumps100[1-2] servers using rsync over ssh.

The systemd timer is then pulling them from the clouddumps servers over NFS, via an-launcher1003 and then writing them to HDFS.

We could skip the middle-man and have either a DAG or a task that simply copies the completed dumps from cephfs to HDFS.