Page MenuHomePhabricator
Paste P6277

building a paldb from popularity data with spark+scala
ActivePublic

Authored by EBernhardson on Nov 7 2017, 12:57 AM.
# Rough draft of converting popularity to paldb (from spark 2.1.2 in scala spark-shell)
import org.apache.spark.sql.{functions => F}
val df_pop = spark.read.parquet("hdfs://analytics-hadoop/wmf/data/discovery/popularity_score/agg_days=7/year=2017/month=10/day=2")
val df_map = (spark.read.csv("hdfs://analytics-hadoop/wmf/data/raw/mediawiki/project_namespace_map/snapshot=2017-09/project_namespace_map")
.select(F.regexp_replace(F.col("_c0"), "\\.[^.]+$", "").alias("project"), F.col("_c1").alias("dbname"))
.dropDuplicates())
val df = df_pop.join(df_map, df_pop("project") === df_map("project")).drop("project").repartition(200).cache()
import scala.collection.JavaConversions._
val ords = new java.util.HashMap[String,Short]()
val wikis = df_map.select("dbname").collect.map { row => row.getAs[String](0) }
ords.putAll(Map(wikis.zipWithIndex.map { case (wiki, idx) => (wiki, idx.toShort) } : _*))
var bb:java.nio.ByteBuffer = java.nio.ByteBuffer.allocate(128)
import com.linkedin.paldb.api.{Configuration, PalDB}
val config = PalDB.newConfiguration()
val writer = PalDB.createWriter(new java.io.File("/home/ebernhardson/popularity.paldb"), config)
val baos = new java.io.ByteArrayOutputStream()
val oos = new java.io.ObjectOutputStream(baos)
oos.writeObject(ords)
writer.put("__ordinals__", baos.toByteArray())
for (row <- df.toLocalIterator) {
val page_id = row.getInt(0)
val score = row.getDouble(1).toFloat
val dbname = row.getString(2)
bb.position(0)
bb.putShort(ords.get(dbname))
bb.putInt(page_id)
val key = new Array[Byte](bb.position())
bb.position(0)
bb.get(key)
writer.put(key, score)
}
writer.close()