就我个人而言,我会使用Python UDF,并且不会打扰其他任何事情:
但是,如果您真的想要其他选择,您可以:
使用以下结构创建Scala软件包:
.
├── build.sbt
└── udfs.scala
编辑build.sbt
(调整以反映Scala和Spark版本):
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.4.4",
"org.apache.spark" %% "spark-mllib" % "2.4.4"
)
编辑udfs.scala
:
package com.example.spark.udfs
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.DenseVector
object udfs {
val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
}
包:
sbt package
并包含(或等效项,具体取决于Scala版本):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
作为--driver-class-path
启动外壳程序/提交应用程序时的参数。
在PySpark中定义一个包装器:
from pyspark.sql.column import _to_java_column, _to_seq, Column
from pyspark import SparkContext
def as_vector(col):
sc = SparkContext.getOrCreate()
f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
测试:
with_vec = df.withColumn("vector", as_vector("temperatures"))
with_vec.show()
+--------+------------------+----------------+
| city| temperatures| vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_vec.printSchema()
root
|-- city: string (nullable = true)
|-- temperatures: array (nullable = true)
| |-- element: double (containsNull = true)
|-- vector: vector (nullable = true)
将数据转储为反映DenseVector
架构的JSON格式并读回:
from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT
json_vec = to_json(struct(struct( lit(1).alias(“type”), # type 1 is dense, type 0 is sparse col(“temperatures”).alias(“values”) ).alias(“v”)))
schema = StructType([StructField(“v”, VectorUDT())])
with_parsed_vector = df.withColumn( “parsed_vector”, from_json(json_vec, schema).getItem(“v”) )
with_parsed_vector.show()
+--------+------------------+----------------+
| city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+
with_parsed_vector.printSchema()
root
|– city: string (nullable = true) |– temperatures: array (nullable = true) | |– element: double (containsNull = true) |– parsed_vector: vector (nullable = true)