您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何在可观察的流中处理前n个项目并以不同的方式处理其余项

如何在可观察的流中处理前n个项目并以不同的方式处理其余项

你可以分享你M的流,然后合并到一起take()skip()流,是这样的:

    int m = 10;
    int n = 8;
    Observable<Integer> numbeRSStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .publish();

    Observable<Integer> firstNItemsStream = numbeRSStream.take(n)
            .map(i -> i * 2);

    Observable<Integer> remainingItemsStream = numbeRSStream.skip(n)
            .map(i -> i * 3);

    Observable.merge(firstNItemsStream, remainingItemsStream)
            .subscribe(integer -> System.out.println("result = " + integer));
    numbeRSStream.connect();

@AE daphne指出,share()它将与第一个订阅者一起开始发射,因此,如果Observable已开始发射项目,则第二个订阅者可能会错过通知,因此在这种情况下,还有其他可能性: -将答复所有缓存发出的项目并将其回复给每个新订户,但会牺牲取消订阅的能力,因此需要谨慎使用。 -将创建Observable的是reply()以前所有项目的每个新用户(类似缓存),但会取消时,从它的最后一个订户退订。

在这两种情况下,都应考虑内存,因为Observable它将在内存中缓存所有发出的项目。

-在不缓存所有先前项目的情况下,另一种可能性是使用publish()createConnectableObservable,并connect()在所有必需的订阅者都订阅之后调用它的方法来开始发射,这样将获得同步,并且所有订阅者将正确获得所有通知

其他 2022/1/1 18:30:30 有531人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶