可以简化可能导致错误或空流的元素处理的一种方法是operator handle
。
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
如我们所见,.handle
操作员需要通过BiConsumer<T, SynchronousSink<>
才能处理元素。在这里,我们的BiConsumer中有两个参数。第一个元素是上游元素,第二个元素是上游元素,SynchronousSink
它帮助我们向下游同步提供元素。这样的技术扩展了提供元素处理的不同结果的能力。例如,如果元素无效,我们可以向该元素提供错误,SycnchronousSync
这将取消上游onError
并向下游产生信号。反过来,我们可以使用相同的handle
运算符来“过滤” 。一旦手柄BiConsumer
被执行并且没有提供任何元素,Reactor会将其视为一种过滤,并将为我们请求其他元素。最后,在元素有效的情况下,我们可以简单地在SynchronousSink#next
下游调用和传播我们的元素,或者在其上应用一些映射,因此这里将handle
作为map
操作符。此外,我们可以安全地使用该运算符,而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误。
在映射期间引发异常的选项之一是替换map
为concatMap
。从本质上讲,concatMap
它的作用几乎相同flatMap
。唯一的区别是concatMap
一次仅允许一个子流。这种行为大大简化了内部实现,并且不影响性能。因此,我们可以使用以下代码来以更实用的方式引发异常:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
在上述示例中,如果用户无效,我们将使用返回异常Mono.error
。我们可以使用Flux.error
以下方法对通量执行相同的操作:
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
注意 ,在两种情况下,我们都返回仅包含一个元素的 冷流 。在Reactor中,在返回的流是冷 标量 流的情况下,有一些优化可以提高性能。因此,建议使用流量/单声道concatMap
+.just
,empty
,error
其结果是,当我们需要更复杂的映射,这可能与最终return null
或throw new ...
。
因此,如果repo.findById
返回null,Reactor将为您抛出NullPointerException。