您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何在分区之间平衡我的数据?

如何在分区之间平衡我的数据?

我认为内存开销限制超出了问题,这是由于在提取过程中使用了DirectMemory缓冲区。我认为它已在2.0.0中修复。(我们遇到了同样的问题,但是当发现升级到2.0.0可以解决问题时,我们就不再进行深入研究了。不幸的是,我没有Spark问题编号来支持我。)

之后的不均匀分隔repartition@H_419_5@令人惊讶。与https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443对比。Spark甚至会在中生成随机密钥repartition@H_419_5@,因此不会使用可能有偏差的哈希来完成。

我尝试了您的示例,并使用Spark 1.6.2和Spark 2.0.0获得了 完全相同的 结果。但不是来自Scala spark-shell@H_419_5@:

scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24

scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res1: Seq[Int] = WrappedArray(1000, 2000, 3000)

scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res2: Seq[Int] = WrappedArray(1999, 2001, 2000)

scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)

scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)
@H_419_5@

如此美丽的分区!

(对不起,这不是一个完整的答案。到目前为止,我只想分享我的发现。)

其他 2022/1/1 18:46:31 有459人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶