您需要做的是减小进入爆炸状态的分区的大小。有2个选项可以执行此操作。首先,如果您的输入数据是可拆分的,则可以减小的大小,spark.sql.files.maxPartitionBytes
以便Spark读取较小的拆分。另一种选择是在爆炸之前重新分区。
该默认值的maxPartitionBytes
为128MB,这样的Spark将尝试在128MB块读取数据。如果数据不可拆分,则它将整个文件读入单个分区,在这种情况下,您需要执行一个操作repartition
。
在您的情况下,由于您正在爆炸,说它增加了100倍,每个分区有128MB的输入,最终每个分区有12GB +的输出!
您可能需要考虑的另一件事是混搭分区,因为您正在进行聚合。再次重申一下,您可能需要通过设置spark.sql.shuffle.partitions
为比默认值200高的值来增加爆炸后聚合的分区。您可以使用Spark UI查看随机播放阶段,并查看每个任务正在读取多少数据,以及相应地进行调整。
我在欧洲Spark Summit上的演讲中讨论了此调整建议以及其他调优建议。