您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何将spark SchemaRDD转换为我的案例类的RDD?

如何将spark SchemaRDD转换为我的案例类的RDD?

我想出的最好的解决方案是对新类进行最少的复制和粘贴,如下所示(不过,我仍然希望看到另一种解决方案)

首先,您必须定义案例类,以及(部分)可重用的工厂方法

import org.apache.spark.sql.catalyst.expressions

case class MyClass(fooBar: Long, fred: Long)

// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
  def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T = 
    fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}

一些锅炉板将已经可用

import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.sqlContext(sc)
import sqlContext.createSchemaRDD

魔术

import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD

def camelToUnderscores(name: String) = 
  "[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())

def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
  case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)

def caseClassTosqlCols[T: TypeTag]: List[String] = 
  getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)

def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
  val tmpName = "tmpTableName" // Maybe should use a random string
  schemaRDD.registerAsTable(tmpName)
  sqlContext.sql("SELECT " + caseClassTosqlCols[T].mkString(", ") + " FROM " + tmpName)
  .map(fac)
}

使用范例

val parquetFile = sqlContext.parquetFile(path)

val normalRDD: RDD[MyClass] = 
  schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))

也可以看看:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert- SchemaRDD-back-to-RDD-td9071.html

Though I Failed to find any example or documentation by following the JIRA link.

其他 2022/1/1 18:34:54 有571人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶