您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Spark上的Redis:任务不可序列化

Spark上的Redis:任务不可序列化

在Spark中,RDDs(如此map处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。

Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。

解决方案是在本地执行上下文中的执行器上创建那些连接。做到这一点的方法很少。我想到的两个是:

mapPartitions 仅需对程序结构进行少量更改即可轻松实现:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

可以使用持有对连接的延迟引用的对象对单例连接管理器进行建模(注意:可变引用也将起作用)。

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

然后可以使用该对象实例化每个辅助JVM的1个连接,并用作Serializable操作闭包中的对象。

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

使用单例对象的优点是开销较小,因为连接仅由JVM创建一次(而不是每个RDD分区1个)

还有一些缺点:

(*)代码用于说明目的。未经编译或测试。

其他 2022/1/1 18:20:36 有529人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶