diff --git a/src/main/java/io/reactivex/rxjava4/internal/util/ErrorMode.java b/src/main/java/io/reactivex/rxjava4/core/ErrorMode.java similarity index 94% rename from src/main/java/io/reactivex/rxjava4/internal/util/ErrorMode.java rename to src/main/java/io/reactivex/rxjava4/core/ErrorMode.java index bef79ff748..b5625c66a1 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/util/ErrorMode.java +++ b/src/main/java/io/reactivex/rxjava4/core/ErrorMode.java @@ -11,10 +11,11 @@ * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivex.rxjava4.internal.util; +package io.reactivex.rxjava4.core; /** * Indicates when an error from the main source should be reported. + * @since 4.0.0 */ public enum ErrorMode { /** Report the error immediately, cancelling the active inner source. */ diff --git a/src/main/java/io/reactivex/rxjava4/core/Maybe.java b/src/main/java/io/reactivex/rxjava4/core/Maybe.java index 1baf2232f0..e8b86e318a 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava4/core/Maybe.java @@ -31,7 +31,6 @@ import io.reactivex.rxjava4.internal.operators.maybe.*; import io.reactivex.rxjava4.internal.operators.mixed.*; import io.reactivex.rxjava4.internal.operators.observable.ObservableElementAtMaybe; -import io.reactivex.rxjava4.internal.util.ErrorMode; import io.reactivex.rxjava4.observers.TestObserver; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.schedulers.*; diff --git a/src/main/java/io/reactivex/rxjava4/core/Observable.java b/src/main/java/io/reactivex/rxjava4/core/Observable.java index 5bffb2aca5..d291f5d5f5 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Observable.java @@ -21,6 +21,7 @@ import io.reactivex.rxjava4.annotations.*; import io.reactivex.rxjava4.core.config.ObservableCombineLatestConfig; +import io.reactivex.rxjava4.core.config.ObservableConcatConfig; import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.functions.*; @@ -886,36 +887,38 @@ public static int bufferSize() { @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <@NonNull T> Observable concat(@NonNull Iterable<@NonNull ? extends ObservableSource> sources) { - Objects.requireNonNull(sources, "sources is null"); - return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize()); + return concat(sources, ObservableConcatConfig.DELAY_ERROR_BOUNDARY); } /** - * Returns an {@code Observable} that emits the items emitted by each of the {@link ObservableSource}s emitted by the - * {@code ObservableSource}, one after the other, without interleaving them. + * Concatenates the {@link Iterable} sequence of {@link ObservableSource}s into a single {@code Observable} sequence + * by subscribing to each {@code ObservableSource}, one after the other, one at a time and delays any errors till + * the all inner {@code ObservableSource}s terminate. *

- * + * *

*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
* * @param the common element base type - * @param sources - * an {@code ObservableSource} that emits {@code ObservableSource}s - * @return the new {@code Observable} instance - * @throws NullPointerException if {@code sources} is {@code null} - * @see ReactiveX operators documentation: Concat + * @param sources the {@code Iterable} sequence of {@code ObservableSource}s + * @param config the configuration record for this operator + * @return the new {@code Observable} with the concatenating behavior + * @throws NullPointerException if {@code sources} or {@code config} is {@code null} */ @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) @NonNull - public static <@NonNull T> Observable concat(@NonNull ObservableSource> sources) { - return concat(sources, bufferSize()); + @SchedulerSupport(SchedulerSupport.NONE) + public static <@NonNull T> Observable concat(@NonNull Iterable<@NonNull ? extends ObservableSource> sources, + @NonNull ObservableConcatConfig config) { + Objects.requireNonNull(sources, "sources is null"); + Objects.requireNonNull(config, "config is null"); + return concat(fromIterable(sources), config); } /** - * Returns an {@code Observable} that emits the items emitted by each of the {@link ObservableSource}s emitted by the outer + * Returns an {@code Observable} that emits the items emitted by each of the {@link ObservableSource}s emitted by the * {@code ObservableSource}, one after the other, without interleaving them. *

* @@ -927,87 +930,20 @@ public static int bufferSize() { * @param the common element base type * @param sources * an {@code ObservableSource} that emits {@code ObservableSource}s - * @param bufferSize - * the number of inner {@code ObservableSource}s expected to be buffered. * @return the new {@code Observable} instance * @throws NullPointerException if {@code sources} is {@code null} - * @throws IllegalArgumentException if {@code bufferSize} is non-positive * @see ReactiveX operators documentation: Concat */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @CheckReturnValue - @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public static <@NonNull T> Observable concat(@NonNull ObservableSource> sources, int bufferSize) { - Objects.requireNonNull(sources, "sources is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, Functions.identity(), bufferSize, ErrorMode.IMMEDIATE)); - } - - /** - * Returns an {@code Observable} that emits the items emitted by two {@link ObservableSource}s, one after the other, without - * interleaving them. - *

- * - *

- *
Scheduler:
- *
{@code concat} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the common element base type - * @param source1 - * an {@code ObservableSource} to be concatenated - * @param source2 - * an {@code ObservableSource} to be concatenated - * @return the new {@code Observable} instance - * @throws NullPointerException if {@code source1} or {@code source2} is {@code null} - * @see ReactiveX operators documentation: Concat - */ - @CheckReturnValue - @NonNull - @SchedulerSupport(SchedulerSupport.NONE) - public static <@NonNull T> Observable concat(@NonNull ObservableSource source1, ObservableSource source2) { - Objects.requireNonNull(source1, "source1 is null"); - Objects.requireNonNull(source2, "source2 is null"); - return concatArray(source1, source2); - } - - /** - * Returns an {@code Observable} that emits the items emitted by three {@link ObservableSource}s, one after the other, without - * interleaving them. - *

- * - *

- *
Scheduler:
- *
{@code concat} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the common element base type - * @param source1 - * an {@code ObservableSource} to be concatenated - * @param source2 - * an {@code ObservableSource} to be concatenated - * @param source3 - * an {@code ObservableSource} to be concatenated - * @return the new {@code Observable} instance - * @throws NullPointerException if {@code source1}, {@code source2} or {@code source3} is {@code null} - * @see ReactiveX operators documentation: Concat - */ - @CheckReturnValue @NonNull - @SchedulerSupport(SchedulerSupport.NONE) - public static <@NonNull T> Observable concat( - @NonNull ObservableSource source1, @NonNull ObservableSource source2, - @NonNull ObservableSource source3) { - Objects.requireNonNull(source1, "source1 is null"); - Objects.requireNonNull(source2, "source2 is null"); - Objects.requireNonNull(source3, "source3 is null"); - return concatArray(source1, source2, source3); + public static <@NonNull T> Observable concat(@NonNull ObservableSource> sources) { + return concat(sources, ObservableConcatConfig.DEFAULT); } /** - * Returns an {@code Observable} that emits the items emitted by four {@link ObservableSource}s, one after the other, without - * interleaving them. + * Returns an {@code Observable} that emits the items emitted by each of the {@link ObservableSource}s emitted by the outer + * {@code ObservableSource}, one after the other, without interleaving them. *

* *

@@ -1016,29 +952,23 @@ public static int bufferSize() { *
* * @param the common element base type - * @param source1 - * an {@code ObservableSource} to be concatenated - * @param source2 - * an {@code ObservableSource} to be concatenated - * @param source3 - * an {@code ObservableSource} to be concatenated - * @param source4 - * an {@code ObservableSource} to be concatenated + * @param sources + * an {@code ObservableSource} that emits {@code ObservableSource}s + * @param config + * the configuration record for this operator * @return the new {@code Observable} instance - * @throws NullPointerException if {@code source1}, {@code source2}, {@code source3} or {@code source4} is {@code null} + * @throws NullPointerException if {@code sources} or {@code config} is {@code null} * @see ReactiveX operators documentation: Concat */ + @SuppressWarnings({ "unchecked", "rawtypes" }) @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public static <@NonNull T> Observable concat( - @NonNull ObservableSource source1, @NonNull ObservableSource source2, - @NonNull ObservableSource source3, @NonNull ObservableSource source4) { - Objects.requireNonNull(source1, "source1 is null"); - Objects.requireNonNull(source2, "source2 is null"); - Objects.requireNonNull(source3, "source3 is null"); - Objects.requireNonNull(source4, "source4 is null"); - return concatArray(source1, source2, source3, source4); + public static <@NonNull T> Observable concat(@NonNull ObservableSource> sources, + @NonNull ObservableConcatConfig config) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new ObservableConcatMap( + sources, Functions.identity(), config.bufferSize(), config.errorMode())); } /** @@ -1054,53 +984,51 @@ public static int bufferSize() { * @param sources the array of sources * @param the common base value type * @return the new {@code Observable} instance - * @throws NullPointerException if {@code sources} is {@code null} + * @throws NullPointerException if {@code sources} or {@code config} is {@code null} */ @SuppressWarnings({ "unchecked", "rawtypes" }) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @NonNull @SafeVarargs - public static <@NonNull T> Observable concatArray(@NonNull ObservableSource... sources) { - Objects.requireNonNull(sources, "sources is null"); - if (sources.length == 0) { - return empty(); - } - if (sources.length == 1) { - return wrap((ObservableSource)sources[0]); - } - return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY)); + public static <@NonNull T> Observable concatArray( + @NonNull ObservableSource... sources) { + return concatArray(ObservableConcatConfig.DELAY_ERROR_BOUNDARY, sources); } /** - * Concatenates a variable number of {@link ObservableSource} sources and delays errors from any of them - * till all terminate. + * Concatenates a variable number of {@link ObservableSource} sources. + *

+ * Note: named this way because of overload conflict with {@code concat(ObservableSource)} *

* *

*
Scheduler:
- *
{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
{@code concatArray} does not operate by default on a particular {@link Scheduler}.
*
+ * @param config the configuration record of this operator * @param sources the array of sources * @param the common base value type * @return the new {@code Observable} instance - * @throws NullPointerException if {@code sources} is {@code null} + * @throws NullPointerException if {@code sources} or {@code config} is {@code null} */ + @SuppressWarnings({ "unchecked", "rawtypes" }) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @NonNull @SafeVarargs - public static <@NonNull T> Observable concatArrayDelayError(@NonNull ObservableSource... sources) { + public static <@NonNull T> Observable concatArray( + @NonNull ObservableConcatConfig config, + @NonNull ObservableSource... sources) { Objects.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); } if (sources.length == 1) { - @SuppressWarnings("unchecked") - Observable source = (Observable)wrap(sources[0]); - return source; + return wrap((ObservableSource)sources[0]); } - return concatDelayError(fromArray(sources)); + return RxJavaPlugins.onAssembly(new ObservableConcatMap( + fromArray(sources), Functions.identity(), bufferSize(), config.errorMode())); } /** @@ -1219,82 +1147,6 @@ public static int bufferSize() { return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), true, maxConcurrency, bufferSize); } - /** - * Concatenates the {@link Iterable} sequence of {@link ObservableSource}s into a single {@code Observable} sequence - * by subscribing to each {@code ObservableSource}, one after the other, one at a time and delays any errors till - * the all inner {@code ObservableSource}s terminate. - *

- * - *

- *
Scheduler:
- *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the common element base type - * @param sources the {@code Iterable} sequence of {@code ObservableSource}s - * @return the new {@code Observable} with the concatenating behavior - * @throws NullPointerException if {@code sources} is {@code null} - */ - @CheckReturnValue - @NonNull - @SchedulerSupport(SchedulerSupport.NONE) - public static <@NonNull T> Observable concatDelayError(@NonNull Iterable<@NonNull ? extends ObservableSource> sources) { - Objects.requireNonNull(sources, "sources is null"); - return concatDelayError(fromIterable(sources)); - } - - /** - * Concatenates the {@link ObservableSource} sequence of {@code ObservableSource}s into a single {@code Observable} sequence - * by subscribing to each inner {@code ObservableSource}, one after the other, one at a time and delays any errors till the - * all inner and the outer {@code ObservableSource}s terminate. - *

- * - *

- *
Scheduler:
- *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the common element base type - * @param sources the {@code ObservableSource} sequence of {@code ObservableSource}s - * @return the new {@code Observable} with the concatenating behavior - * @throws NullPointerException if {@code sources} is {@code null} - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - @NonNull - public static <@NonNull T> Observable concatDelayError(@NonNull ObservableSource> sources) { - return concatDelayError(sources, bufferSize(), true); - } - - /** - * Concatenates the {@link ObservableSource} sequence of {@code ObservableSource}s into a single sequence by subscribing to each inner {@code ObservableSource}, - * one after the other, one at a time and delays any errors till the all inner and the outer {@code ObservableSource}s terminate. - *

- * - *

- *
Scheduler:
- *
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the common element base type - * @param sources the {@code ObservableSource} sequence of {@code ObservableSource}s - * @param bufferSize the number of inner {@code ObservableSource}s expected to be buffered - * @param tillTheEnd if {@code true}, exceptions from the outer and all inner {@code ObservableSource}s are delayed to the end - * if {@code false}, exception from the outer {@code ObservableSource} is delayed till the active {@code ObservableSource} terminates - * @return the new {@code Observable} with the concatenating behavior - * @throws NullPointerException if {@code sources} is {@code null} - * @throws IllegalArgumentException if {@code bufferSize} is non-positive - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @CheckReturnValue - @NonNull - @SchedulerSupport(SchedulerSupport.NONE) - public static <@NonNull T> Observable concatDelayError(@NonNull ObservableSource> sources, int bufferSize, boolean tillTheEnd) { - Objects.requireNonNull(sources, "sources is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize is null"); - return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, Functions.identity(), bufferSize, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); - } - /** * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values. *

@@ -7500,7 +7352,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function concatWith(@NonNull ObservableSource other) { Objects.requireNonNull(other, "other is null"); - return concat(this, other); + return concatArray(this, other); } /** @@ -12810,7 +12662,7 @@ public final Observable startWithIterable(@NonNull Iterable item @SchedulerSupport(SchedulerSupport.NONE) public final Observable startWith(@NonNull CompletableSource other) { Objects.requireNonNull(other, "other is null"); - return Observable.concat(Completable.wrap(other).toObservable(), this); + return Observable.concatArray(Completable.wrap(other).toObservable(), this); } /** @@ -12832,7 +12684,7 @@ public final Observable startWith(@NonNull CompletableSource other) { @SchedulerSupport(SchedulerSupport.NONE) public final Observable startWith(@NonNull SingleSource other) { Objects.requireNonNull(other, "other is null"); - return Observable.concat(Single.wrap(other).toObservable(), this); + return Observable.concatArray(Single.wrap(other).toObservable(), this); } /** @@ -12854,7 +12706,7 @@ public final Observable startWith(@NonNull SingleSource other) { @SchedulerSupport(SchedulerSupport.NONE) public final Observable startWith(@NonNull MaybeSource other) { Objects.requireNonNull(other, "other is null"); - return Observable.concat(Maybe.wrap(other).toObservable(), this); + return Observable.concatArray(Maybe.wrap(other).toObservable(), this); } /** diff --git a/src/main/java/io/reactivex/rxjava4/core/Single.java b/src/main/java/io/reactivex/rxjava4/core/Single.java index d207192b2e..df21b24942 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Single.java +++ b/src/main/java/io/reactivex/rxjava4/core/Single.java @@ -33,7 +33,6 @@ import io.reactivex.rxjava4.internal.operators.mixed.*; import io.reactivex.rxjava4.internal.operators.observable.ObservableSingleSingle; import io.reactivex.rxjava4.internal.operators.single.*; -import io.reactivex.rxjava4.internal.util.ErrorMode; import io.reactivex.rxjava4.observers.TestObserver; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.schedulers.*; diff --git a/src/main/java/io/reactivex/rxjava4/core/config/MaybeConcatConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/MaybeConcatConfig.java index 1ab5ce7df4..235cbdfd1c 100644 --- a/src/main/java/io/reactivex/rxjava4/core/config/MaybeConcatConfig.java +++ b/src/main/java/io/reactivex/rxjava4/core/config/MaybeConcatConfig.java @@ -54,9 +54,7 @@ public MaybeConcatConfig(int prefetch) { * @param delayError should the error propagation be delayed? * @param prefetch the number of source sequences to request from a backpressured source */ - public MaybeConcatConfig(boolean delayError, int prefetch) { + public MaybeConcatConfig { ObjectHelper.verifyPositive(prefetch, "prefetch"); - this.delayError = delayError; - this.prefetch = prefetch; } } diff --git a/src/main/java/io/reactivex/rxjava4/core/config/ObservableConcatConfig.java b/src/main/java/io/reactivex/rxjava4/core/config/ObservableConcatConfig.java new file mode 100644 index 0000000000..98cbcc9058 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/core/config/ObservableConcatConfig.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.core.config; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.Observable; +import io.reactivex.rxjava4.internal.functions.ObjectHelper; +import io.reactivex.rxjava4.core.ErrorMode; + +/** + * Configuration record for Observable.concat() operators. + * @param errorMode how to handle when errors appear from the inner or outer sources + * @param bufferSize the expected number of outer sources to buffer while processing an inner source + * @since 4.0.0 + */ +public record ObservableConcatConfig(@NonNull ErrorMode errorMode, int bufferSize) { + + /** + * The default configuration with no error delays and bufferSize of {@link Observable#bufferSize()}. + */ + public static final ObservableConcatConfig DEFAULT = new ObservableConcatConfig(ErrorMode.IMMEDIATE, Observable.bufferSize()); + + /** + * The default configuration with error delays and bufferSize of {@link Observable#bufferSize()}. + */ + public static final ObservableConcatConfig DELAY_ERROR = new ObservableConcatConfig(ErrorMode.END, Observable.bufferSize()); + + /** + * The default configuration with error delays and bufferSize of {@link Observable#bufferSize()}. + */ + public static final ObservableConcatConfig DELAY_ERROR_BOUNDARY = new ObservableConcatConfig(ErrorMode.BOUNDARY, Observable.bufferSize()); + + /** + * Constructs a configuration record. + * @param errorMode how to handle when errors appear from the inner or outer sources + */ + public ObservableConcatConfig(@NonNull ErrorMode errorMode) { + this(errorMode, Observable.bufferSize()); + } + + /** + * Constructs a configuration record. + * @param bufferSize the expected number of outer sources to buffer while processing an inner source + */ + public ObservableConcatConfig(int bufferSize) { + this(ErrorMode.IMMEDIATE, bufferSize); + } + + /** + * Constructs a configuration record. + * @param errorMode how to handle when errors appear from the inner or outer sources + * @param bufferSize the expected number of outer sources to buffer while processing an inner source + */ + public ObservableConcatConfig { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableConcatMapEagerPublisher.java b/src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableConcatMapEagerPublisher.java index 61f6c1917e..e0a8b0557a 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableConcatMapEagerPublisher.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableConcatMapEagerPublisher.java @@ -18,7 +18,7 @@ import io.reactivex.rxjava4.core.Flowable; import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.operators.flowable.FlowableConcatMapEager.ConcatMapEagerDelayErrorSubscriber; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; /** * ConcatMapEager which works with an arbitrary Publisher source. diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainObserver.java b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainObserver.java index 5ec1d65ba0..63b9e8ea85 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainObserver.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainObserver.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.core.Observer; import io.reactivex.rxjava4.disposables.Disposable; import io.reactivex.rxjava4.internal.disposables.DisposableHelper; diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainSubscriber.java b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainSubscriber.java index 7a5e52440e..23234536f3 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainSubscriber.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/ConcatMapXMainSubscriber.java @@ -17,6 +17,7 @@ import static java.util.concurrent.Flow.*; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.core.FlowableSubscriber; import io.reactivex.rxjava4.exceptions.*; import io.reactivex.rxjava4.internal.subscriptions.SubscriptionHelper; diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybePublisher.java b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybePublisher.java index b03d440e5b..446113b2b7 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybePublisher.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybePublisher.java @@ -18,7 +18,7 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.operators.mixed.FlowableConcatMapMaybe.ConcatMapMaybeSubscriber; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; /** * Maps each upstream item into a {@link MaybeSource}, subscribes to them one after the other terminates diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSinglePublisher.java b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSinglePublisher.java index 07d0a39a2a..a76e2ecaea 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSinglePublisher.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSinglePublisher.java @@ -18,7 +18,7 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; /** * Maps each upstream item into a {@link SingleSource}, subscribes to them one after the other terminates diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/parallel/ParallelConcatMap.java b/src/main/java/io/reactivex/rxjava4/internal/operators/parallel/ParallelConcatMap.java index cd7bc7699e..4138c097a9 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/parallel/ParallelConcatMap.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/parallel/ParallelConcatMap.java @@ -17,7 +17,7 @@ import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.operators.flowable.FlowableConcatMap; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.parallel.ParallelFlowable; import io.reactivex.rxjava4.plugins.RxJavaPlugins; diff --git a/src/test/java/io/reactivex/rxjava4/core/config/ObservableConcatConfigTest.java b/src/test/java/io/reactivex/rxjava4/core/config/ObservableConcatConfigTest.java new file mode 100644 index 0000000000..2e909a51e2 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/core/config/ObservableConcatConfigTest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.core.config; + +import io.reactivex.rxjava4.core.ErrorMode; +import io.reactivex.rxjava4.core.RxJavaTest; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class ObservableConcatConfigTest extends RxJavaTest { + + @Test + public void validation() { + assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatConfig(ErrorMode.IMMEDIATE).errorMode(), "errorMode - true"); + assertEquals(ErrorMode.BOUNDARY, new ObservableConcatConfig(ErrorMode.BOUNDARY).errorMode(), "errorMode - false"); + assertEquals(ErrorMode.END, new ObservableConcatConfig(ErrorMode.END).errorMode(), "errorMode - false"); + assertEquals(5, new ObservableConcatConfig(5).bufferSize(), "bufferSize - 5"); + assertEquals(5, new ObservableConcatConfig(ErrorMode.IMMEDIATE, 5).bufferSize(), "bufferSize both - true, 5"); + assertEquals(5, new ObservableConcatConfig(ErrorMode.BOUNDARY, 5).bufferSize(), "bufferSize both - false, 5"); + assertEquals(5, new ObservableConcatConfig(ErrorMode.END, 5).bufferSize(), "bufferSize both - false, 5"); + assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatConfig(ErrorMode.IMMEDIATE, 5).errorMode(), "errorMode both - true, 5"); + assertEquals(ErrorMode.BOUNDARY, new ObservableConcatConfig(ErrorMode.BOUNDARY, 5).errorMode(), "errorMode both - false, 5"); + assertEquals(ErrorMode.END, new ObservableConcatConfig(ErrorMode.END, 5).errorMode(), "errorMode both - false, 5"); + } +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybeTest.java index 598937689e..c416a535c7 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapMaybeTest.java @@ -29,7 +29,7 @@ import io.reactivex.rxjava4.internal.functions.Functions; import io.reactivex.rxjava4.internal.operators.mixed.FlowableConcatMapMaybe.ConcatMapMaybeSubscriber; import io.reactivex.rxjava4.internal.subscriptions.BooleanSubscription; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.processors.*; import io.reactivex.rxjava4.schedulers.Schedulers; diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSingleTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSingleTest.java index 14dd842c01..7444e9eac3 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/FlowableConcatMapSingleTest.java @@ -28,7 +28,7 @@ import io.reactivex.rxjava4.internal.functions.Functions; import io.reactivex.rxjava4.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber; import io.reactivex.rxjava4.internal.subscriptions.BooleanSubscription; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.processors.*; import io.reactivex.rxjava4.subjects.SingleSubject; diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapMaybeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapMaybeTest.java index 14f817bc2c..a68366bca8 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapMaybeTest.java @@ -27,7 +27,7 @@ import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.functions.Functions; import io.reactivex.rxjava4.internal.operators.mixed.ObservableConcatMapMaybe.ConcatMapMaybeMainObserver; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.observers.TestObserver; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.schedulers.Schedulers; diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapSingleTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapSingleTest.java index 724893d7f0..b84b0818fc 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapSingleTest.java @@ -26,7 +26,7 @@ import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.functions.Functions; import io.reactivex.rxjava4.internal.operators.mixed.ObservableConcatMapSingle.ConcatMapSingleMainObserver; -import io.reactivex.rxjava4.internal.util.ErrorMode; +import io.reactivex.rxjava4.core.ErrorMode; import io.reactivex.rxjava4.observers.TestObserver; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.subjects.*; diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatTest.java index d9fef2c6d2..8555a5608f 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.rxjava4.core.config.ObservableConcatConfig; import org.junit.Test; import org.mockito.InOrder; @@ -48,7 +49,7 @@ public void concat() { final Observable odds = Observable.fromArray(o); final Observable even = Observable.fromArray(e); - Observable concat = Observable.concat(odds, even); + Observable concat = Observable.concatArray(odds, even); concat.subscribe(observer); verify(observer, times(7)).onNext(anyString()); @@ -106,7 +107,7 @@ public void simpleAsyncConcat() { TestObservable o1 = new TestObservable<>("one", "two", "three"); TestObservable o2 = new TestObservable<>("four", "five", "six"); - Observable.concat(Observable.unsafeCreate(o1), Observable.unsafeCreate(o2)).subscribe(observer); + Observable.concatArray(Observable.unsafeCreate(o1), Observable.unsafeCreate(o2)).subscribe(observer); try { // wait for async observables to complete @@ -377,7 +378,7 @@ public void concatUnsubscribe() { Observer observer = TestHelper.mockObserver(); TestObserver to = new TestObserver<>(observer); - final Observable concat = Observable.concat(Observable.unsafeCreate(w1), Observable.unsafeCreate(w2)); + final Observable concat = Observable.concatArray(Observable.unsafeCreate(w1), Observable.unsafeCreate(w2)); try { // Subscribe @@ -542,7 +543,7 @@ public void multipleObservers() { TestScheduler s = new TestScheduler(); Observable timer = Observable.interval(500, TimeUnit.MILLISECONDS, s).take(2); - Observable o = Observable.concat(timer, timer); + Observable o = Observable.concatArray(timer, timer); o.subscribe(o1); o.subscribe(o2); @@ -637,7 +638,7 @@ public void concatWithNonCompliantSourceDoubleOnComplete() { }); TestObserverEx to = new TestObserverEx<>(); - Observable.concat(o, o).subscribe(to); + Observable.concatArray(o, o).subscribe(to); to.awaitDone(500, TimeUnit.MILLISECONDS); to.assertTerminated(); to.assertNoErrors(); @@ -725,14 +726,14 @@ public void concatMapRangeAsyncLoopIssue2876() { @Test public void concat3() { - Observable.concat(Observable.just(1), Observable.just(2), Observable.just(3)) + Observable.concatArray(Observable.just(1), Observable.just(2), Observable.just(3)) .test() .assertResult(1, 2, 3); } @Test public void concat4() { - Observable.concat(Observable.just(1), Observable.just(2), + Observable.concatArray(Observable.just(1), Observable.just(2), Observable.just(3), Observable.just(4)) .test() .assertResult(1, 2, 3, 4); @@ -740,7 +741,9 @@ public void concat4() { @Test public void concatArrayDelayError() { - Observable.concatArrayDelayError(Observable.just(1), Observable.just(2), + Observable.concatArray( + ObservableConcatConfig.DELAY_ERROR, + Observable.just(1), Observable.just(2), Observable.just(3), Observable.just(4)) .test() .assertResult(1, 2, 3, 4); @@ -748,7 +751,9 @@ public void concatArrayDelayError() { @Test public void concatArrayDelayErrorWithError() { - Observable.concatArrayDelayError(Observable.just(1), Observable.just(2), + Observable.concatArray( + ObservableConcatConfig.DELAY_ERROR, + Observable.just(1), Observable.just(2), Observable.just(3).concatWith(Observable.error(new TestException())), Observable.just(4)) .test() @@ -757,58 +762,68 @@ public void concatArrayDelayErrorWithError() { @Test public void concatIterableDelayError() { - Observable.concatDelayError( + Observable.concat( Arrays.asList(Observable.just(1), Observable.just(2), - Observable.just(3), Observable.just(4))) + Observable.just(3), Observable.just(4)), + ObservableConcatConfig.DELAY_ERROR + ) .test() .assertResult(1, 2, 3, 4); } @Test public void concatIterableDelayErrorWithError() { - Observable.concatDelayError( + Observable.concat( Arrays.asList(Observable.just(1), Observable.just(2), Observable.just(3).concatWith(Observable.error(new TestException())), - Observable.just(4))) + Observable.just(4)), + ObservableConcatConfig.DELAY_ERROR + ) .test() .assertFailure(TestException.class, 1, 2, 3, 4); } @Test public void concatObservableDelayError() { - Observable.concatDelayError( + Observable.concat( Observable.just(Observable.just(1), Observable.just(2), - Observable.just(3), Observable.just(4))) + Observable.just(3), Observable.just(4)), + ObservableConcatConfig.DELAY_ERROR + ) .test() .assertResult(1, 2, 3, 4); } @Test public void concatObservableDelayErrorWithError() { - Observable.concatDelayError( + Observable.concat( Observable.just(Observable.just(1), Observable.just(2), Observable.just(3).concatWith(Observable.error(new TestException())), - Observable.just(4))) + Observable.just(4)), + ObservableConcatConfig.DELAY_ERROR + ) .test() .assertFailure(TestException.class, 1, 2, 3, 4); } @Test public void concatObservableDelayErrorBoundary() { - Observable.concatDelayError( + Observable.concat( Observable.just(Observable.just(1), Observable.just(2), Observable.just(3).concatWith(Observable.error(new TestException())), - Observable.just(4)), 2, false) + Observable.just(4)), + new ObservableConcatConfig(ErrorMode.BOUNDARY, 2)) .test() .assertFailure(TestException.class, 1, 2, 3); } @Test public void concatObservableDelayErrorTillEnd() { - Observable.concatDelayError( + Observable.concat( Observable.just(Observable.just(1), Observable.just(2), Observable.just(3).concatWith(Observable.error(new TestException())), - Observable.just(4)), 2, true) + Observable.just(4)), + new ObservableConcatConfig(ErrorMode.END, 2)) .test() .assertFailure(TestException.class, 1, 2, 3, 4); } @@ -839,12 +854,12 @@ public void concatMapIterableBufferSize() { @Test public void emptyArray() { - assertSame(Observable.empty(), Observable.concatArrayDelayError()); + assertSame(Observable.empty(), Observable.concatArray(ObservableConcatConfig.DELAY_ERROR)); } @Test public void singleElementArray() { - assertSame(Observable.never(), Observable.concatArrayDelayError(Observable.never())); + assertSame(Observable.never(), Observable.concatArray(ObservableConcatConfig.DELAY_ERROR, Observable.never())); } @Test @@ -914,7 +929,7 @@ public void noSubsequentSubscriptionDelayError() { s.onComplete(); }); - Observable.concatArrayDelayError(source, source).firstElement() + Observable.concatArray(ObservableConcatConfig.DELAY_ERROR, source, source).firstElement() .test() .assertResult(1); @@ -948,7 +963,7 @@ public void noSubsequentSubscriptionDelayErrorIterable() { s.onComplete(); }); - Observable.concatDelayError(Arrays.asList(source, source)).firstElement() + Observable.concat(Arrays.asList(source, source), ObservableConcatConfig.DELAY_ERROR).firstElement() .test() .assertResult(1); @@ -959,8 +974,8 @@ public void noSubsequentSubscriptionDelayErrorIterable() { public void concatReportsDisposedOnComplete() { final Disposable[] disposable = { null }; - Observable.concat(Observable.just(1), Observable.just(2)) - .subscribe(new Observer() { + Observable.concatArray(Observable.just(1), Observable.just(2)) + .subscribe(new Observer<>() { @Override public void onSubscribe(Disposable d) { @@ -987,8 +1002,10 @@ public void onComplete() { public void concatReportsDisposedOnCompleteDelayError() { final Disposable[] disposable = { null }; - Observable.concatArrayDelayError(Observable.just(1), Observable.just(2)) - .subscribe(new Observer() { + Observable.concatArray( + ObservableConcatConfig.DELAY_ERROR, + Observable.just(1), Observable.just(2)) + .subscribe(new Observer<>() { @Override public void onSubscribe(Disposable d) { @@ -1015,7 +1032,7 @@ public void onComplete() { public void concatReportsDisposedOnError() { final Disposable[] disposable = { null }; - Observable.concat(Observable.just(1), Observable.error(new TestException())) + Observable.concatArray(Observable.just(1), Observable.error(new TestException())) .subscribe(new Observer() { @Override @@ -1043,7 +1060,9 @@ public void onComplete() { public void concatReportsDisposedOnErrorDelayError() { final Disposable[] disposable = { null }; - Observable.concatArrayDelayError(Observable.just(1), Observable.error(new TestException())) + Observable.concatArray( + ObservableConcatConfig.DELAY_ERROR, + Observable.just(1), Observable.error(new TestException())) .subscribe(new Observer() { @Override diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapTest.java index 21e73dcdeb..0bac13e941 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapTest.java @@ -151,7 +151,7 @@ public void flatMapTransformsException() { Observable onComplete = Observable.fromIterable(List.of(4)); Observable onError = Observable.fromIterable(List.of(5)); - Observable source = Observable.concat( + Observable source = Observable.concatArray( Observable.fromIterable(Arrays.asList(10, 20, 30)), Observable. error(new RuntimeException("Forced failure!")) ); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableGroupByTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableGroupByTest.java index 3e3aa44cab..04dcb1a849 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableGroupByTest.java @@ -95,7 +95,7 @@ public void empty() { public void error() { Observable sourceStrings = Observable.just("one", "two", "three", "four", "five", "six"); Observable errorSource = Observable.error(new RuntimeException("forced failure")); - Observable source = Observable.concat(sourceStrings, errorSource); + Observable source = Observable.concatArray(sourceStrings, errorSource); Observable> grouped = source.groupBy(length); @@ -795,7 +795,7 @@ public void exceptionIfSubscribeToChildMoreThanOnce() { @Test @SuppressUndeliverable public void error2() { - Observable source = Observable.concat(Observable.just(0), + Observable source = Observable.concatArray(Observable.just(0), Observable. error(new TestException("Forced failure"))); Observable m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableObserveOnTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableObserveOnTest.java index 60eb10f16f..a2e0a40b04 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableObserveOnTest.java @@ -232,8 +232,7 @@ public void observeOnWithThreadPoolScheduler() { * Attempts to confirm that when pauses exist between events, the ScheduledObserver * does not lose or reorder any events since the scheduler will not block, but will * be re-scheduled when it receives new events after each pause. - * - * + *

* This is non-deterministic in proving success, but if it ever fails (non-deterministically) * it is a sign of potential issues as thread-races and scheduling should not affect output. */ @@ -318,7 +317,7 @@ private static int randomIntFrom0to100() { public void delayedErrorDeliveryWhenSafeSubscriberUnsubscribes() { TestScheduler testScheduler = new TestScheduler(); - Observable source = Observable.concat(Observable. error(new TestException()), Observable.just(1)); + Observable source = Observable.concatArray(Observable. error(new TestException()), Observable.just(1)); Observer o = TestHelper.mockObserver(); InOrder inOrder = inOrder(o); @@ -680,7 +679,7 @@ public void onNext(Integer t) { public void workerNotDisposedPrematurelyNormalInNormalOut() { DisposeTrackingScheduler s = new DisposeTrackingScheduler(); - Observable.concat( + Observable.concatArray( Observable.just(1).hide().observeOn(s), Observable.just(2) ) @@ -694,7 +693,7 @@ public void workerNotDisposedPrematurelyNormalInNormalOut() { public void workerNotDisposedPrematurelySyncInNormalOut() { DisposeTrackingScheduler s = new DisposeTrackingScheduler(); - Observable.concat( + Observable.concatArray( Observable.just(1).observeOn(s), Observable.just(2) ) @@ -712,7 +711,7 @@ public void workerNotDisposedPrematurelyAsyncInNormalOut() { us.onNext(1); us.onComplete(); - Observable.concat( + Observable.concatArray( us.observeOn(s), Observable.just(2) ) diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableReduceTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableReduceTest.java index bef4b0a6bc..2db10ae727 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableReduceTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableReduceTest.java @@ -56,7 +56,8 @@ public void aggregateAsIntSumObservable() { @Test public void aggregateAsIntSumSourceThrowsObservable() { - Observable result = Observable.concat(Observable.just(1, 2, 3, 4, 5), + Observable result = Observable.concatArray( + Observable.just(1, 2, 3, 4, 5), Observable. error(new TestException())) .reduce(0, sum).map(v -> v).toObservable(); @@ -132,7 +133,8 @@ public void aggregateAsIntSum() { @Test public void aggregateAsIntSumSourceThrows() { - Single result = Observable.concat(Observable.just(1, 2, 3, 4, 5), + Single result = Observable.concatArray( + Observable.just(1, 2, 3, 4, 5), Observable. error(new TestException())) .reduce(0, sum).map(v -> v); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSequenceEqualTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSequenceEqualTest.java index 00f64434fc..4cd3315821 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSequenceEqualTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSequenceEqualTest.java @@ -54,7 +54,7 @@ public void observable3() { @Test public void withError1Observable() { Observable o = Observable.sequenceEqual( - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException())), Observable.just("one", "two", "three")).toObservable(); verifyError(o); @@ -64,7 +64,7 @@ Observable. error(new TestException())), public void withError2Observable() { Observable o = Observable.sequenceEqual( Observable.just("one", "two", "three"), - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException()))).toObservable(); verifyError(o); } @@ -72,9 +72,9 @@ public void withError2Observable() { @Test public void withError3Observable() { Observable o = Observable.sequenceEqual( - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException())), - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException()))).toObservable(); verifyError(o); } @@ -180,7 +180,7 @@ public void three() { @Test public void withError1() { Single o = Observable.sequenceEqual( - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException())), Observable.just("one", "two", "three")); verifyError(o); @@ -190,7 +190,7 @@ Observable. error(new TestException())), public void withError2() { Single o = Observable.sequenceEqual( Observable.just("one", "two", "three"), - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException()))); verifyError(o); } @@ -198,9 +198,9 @@ public void withError2() { @Test public void withError3() { Single o = Observable.sequenceEqual( - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException())), - Observable.concat(Observable.just("one"), + Observable.concatArray(Observable.just("one"), Observable. error(new TestException()))); verifyError(o); } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSkipTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSkipTest.java index 4228d60fbd..a9919347d9 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSkipTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableSkipTest.java @@ -125,7 +125,7 @@ public void skipError() { Observable ok = Observable.just("one"); Observable error = Observable.error(e); - Observable skip = Observable.concat(ok, error).skip(100); + Observable skip = Observable.concatArray(ok, error).skip(100); Observer observer = TestHelper.mockObserver(); skip.subscribe(observer); diff --git a/src/test/java/io/reactivex/rxjava4/internal/util/MiscUtilTest.java b/src/test/java/io/reactivex/rxjava4/internal/util/MiscUtilTest.java index 535866d1dc..c882315b27 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/util/MiscUtilTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/util/MiscUtilTest.java @@ -17,6 +17,7 @@ import java.util.*; +import io.reactivex.rxjava4.core.ErrorMode; import org.junit.Test; import io.reactivex.rxjava4.core.RxJavaTest; diff --git a/src/test/java/io/reactivex/rxjava4/observable/ObservableConcatTests.java b/src/test/java/io/reactivex/rxjava4/observable/ObservableConcatTests.java index 3f10a4ee12..9ac5191516 100644 --- a/src/test/java/io/reactivex/rxjava4/observable/ObservableConcatTests.java +++ b/src/test/java/io/reactivex/rxjava4/observable/ObservableConcatTests.java @@ -30,7 +30,7 @@ public void concatSimple() { Observable o1 = Observable.just("one", "two"); Observable o2 = Observable.just("three", "four"); - List values = Observable.concat(o1, o2).toList().blockingGet(); + List values = Observable.concatArray(o1, o2).toList().blockingGet(); assertEquals("one", values.get(0)); assertEquals("two", values.get(1)); @@ -128,7 +128,7 @@ public void concatCovariance3() { Observable o1 = Observable.just(horrorMovie1, movie); Observable o2 = Observable.just(media, horrorMovie2); - List values = Observable.concat(o1, o2).toList().blockingGet(); + List values = Observable.concatArray(o1, o2).toList().blockingGet(); assertEquals(horrorMovie1, values.get(0)); assertEquals(movie, values.get(1)); @@ -153,7 +153,7 @@ public void concatCovariance4() { Observable o2 = Observable.just(media, horrorMovie2); - List values = Observable.concat(o1, o2).toList().blockingGet(); + List values = Observable.concatArray(o1, o2).toList().blockingGet(); assertEquals(horrorMovie1, values.get(0)); assertEquals(movie, values.get(1)); diff --git a/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java index afd9891fb8..fa291bd933 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava4/validators/ParamValidationCheckerTest.java @@ -629,6 +629,7 @@ public void checkParallelFlowable() { defaultValues.put(MaybeMergeConfig.class, MaybeMergeConfig.DEFAULT); defaultValues.put(ObservableCombineLatestConfig.class, ObservableCombineLatestConfig.DEFAULT); + defaultValues.put(ObservableConcatConfig.class, ObservableConcatConfig.DEFAULT); @SuppressWarnings("rawtypes") class MixedConverters implements FlowableConverter, ObservableConverter, SingleConverter,