Skip to content

Flowable#groupBy race leads to a back-pressure issue #7100

Description

@bsideup

Hi!

While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)

Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):

final int total = 100;

Long count = Flowable.range(0, total)
                     .groupBy(i -> (i / 2) * 2)
                     .flatMapMaybe(Flowable::firstElement, false, 1)
                     .observeOn(Schedulers.io())
                     .count()
                     .blockingGet();
assertThat(total - count).as("count").isZero();

Gives (not 100% reliably, consider running in "rerun until failure" mode):

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.

	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)

A few interesting observations:

  1. Changing observeOn's buffer size to 131 and higher makes it always pass
  2. 130 would sometimes fail with Unable to emit a new group (#99) due to lack of requests
  3. 129 would sometimes fail with Unable to emit a new group (#98) due to lack of requests
  4. 128 would sometimes fail with Unable to emit a new group (#97) due to lack of requests
  5. etc etc

So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.

Metadata

Metadata

Assignees

No one assigned

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions