package reactor.rx;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.fn.Function;
import reactor.fn.Supplier;
import reactor.fn.timer.Timer;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.fn.tuple.Tuple3;
import reactor.fn.tuple.Tuple4;
import reactor.fn.tuple.Tuple5;
import reactor.fn.tuple.Tuple6;
import reactor.fn.tuple.Tuple7;
import reactor.fn.tuple.Tuple8;
import reactor.rx.action.Action;
import reactor.rx.action.combination.CombineLatestAction;
import reactor.rx.action.combination.ConcatAction;
import reactor.rx.action.combination.DynamicMergeAction;
import reactor.rx.action.combination.MergeAction;
import reactor.rx.action.combination.SwitchAction;
import reactor.rx.action.combination.ZipAction;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.stream.DeferredStream;
import reactor.rx.stream.ErrorStream;
import reactor.rx.stream.FutureStream;
import reactor.rx.stream.IterableStream;
import reactor.rx.stream.PeriodicTimerStream;
import reactor.rx.stream.PublisherStream;
import reactor.rx.stream.RangeStream;
import reactor.rx.stream.SingleTimerStream;
import reactor.rx.stream.SingleValueStream;
import reactor.rx.stream.SupplierStream;

/* loaded from: input_file:reactor/rx/Streams.class */
public class Streams {
    private static final Stream NEVER = new Stream() { // from class: reactor.rx.Streams.3
        final Subscription NEVER_SUBSCRIPTION = new Subscription() { // from class: reactor.rx.Streams.3.1
            public void request(long j) {
            }

            public void cancel() {
            }
        };

        public void subscribe(Subscriber subscriber) {
            if (subscriber != null) {
                subscriber.onSubscribe(this.NEVER_SUBSCRIPTION);
            }
        }
    };

    public static <T> Stream<T> create(Publisher<T> publisher) {
        return Stream.class.isAssignableFrom(publisher.getClass()) ? (Stream) publisher : new PublisherStream(publisher);
    }

    public static <T> Stream<T> wrap(final Publisher<T> publisher) {
        return Stream.class.isAssignableFrom(publisher.getClass()) ? (Stream) publisher : new Stream<T>() { // from class: reactor.rx.Streams.1
            public void subscribe(Subscriber<? super T> subscriber) {
                try {
                    publisher.subscribe(subscriber);
                } catch (Throwable th) {
                    subscriber.onError(th);
                }
            }
        };
    }

    public static <T> Stream<T> defer(Supplier<? extends Publisher<T>> supplier) {
        return new DeferredStream(supplier);
    }

    public static <T> Stream<T> empty() {
        return SingleValueStream.EMPTY;
    }

    public static <T> Stream<T> never() {
        return NEVER;
    }

    public static <O, T extends Throwable> Stream<O> fail(T t) {
        return new ErrorStream(t);
    }

    public static <T> Stream<T> from(Iterable<? extends T> iterable) {
        return new IterableStream(iterable);
    }

    public static <T> Stream<T> from(T[] tArr) {
        return new IterableStream(Arrays.asList(tArr));
    }

    public static <T> Stream<T> from(Future<? extends T> future) {
        return new FutureStream(future);
    }

    public static <T> Stream<T> from(Future<? extends T> future, long j, TimeUnit timeUnit) {
        return new FutureStream(future, j, timeUnit);
    }

    public static Stream<Long> range(long j, long j2) {
        return new RangeStream(j, j2);
    }

    public static Stream<Long> timer(long j) {
        return timer(Environment.timer(), j, TimeUnit.SECONDS);
    }

    public static Stream<Long> timer(Timer timer, long j) {
        return timer(timer, j, TimeUnit.SECONDS);
    }

    public static Stream<Long> timer(long j, TimeUnit timeUnit) {
        return new SingleTimerStream(j, timeUnit, Environment.timer());
    }

    public static Stream<Long> timer(Timer timer, long j, TimeUnit timeUnit) {
        return new SingleTimerStream(j, timeUnit, timer);
    }

    public static Stream<Long> period(long j) {
        return period(Environment.timer(), -1L, j, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(Timer timer, long j) {
        return period(timer, -1L, j, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(long j, long j2) {
        return period(Environment.timer(), j, j2, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(Timer timer, long j, long j2) {
        return period(timer, j, j2, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(long j, TimeUnit timeUnit) {
        return period(Environment.timer(), -1L, j, timeUnit);
    }

    public static Stream<Long> period(Timer timer, long j, TimeUnit timeUnit) {
        return period(timer, -1L, j, timeUnit);
    }

    public static Stream<Long> period(long j, long j2, TimeUnit timeUnit) {
        return period(Environment.timer(), j, j2, timeUnit);
    }

    public static Stream<Long> period(Timer timer, long j, long j2, TimeUnit timeUnit) {
        return new PeriodicTimerStream(TimeUnit.MILLISECONDS.convert(j, timeUnit), j2, timeUnit, timer);
    }

    public static <T> Stream<T> just(T t) {
        return new SingleValueStream(t);
    }

    public static <T> Stream<T> just(T t, T t2) {
        return from(Arrays.asList(t, t2));
    }

    public static <T> Stream<T> just(T t, T t2, T t3) {
        return from(Arrays.asList(t, t2, t3));
    }

    public static <T> Stream<T> just(T t, T t2, T t3, T t4) {
        return from(Arrays.asList(t, t2, t3, t4));
    }

    public static <T> Stream<T> just(T t, T t2, T t3, T t4, T t5) {
        return from(Arrays.asList(t, t2, t3, t4, t5));
    }

    public static <T> Stream<T> just(T t, T t2, T t3, T t4, T t5, T t6) {
        return from(Arrays.asList(t, t2, t3, t4, t5, t6));
    }

    public static <T> Stream<T> just(T t, T t2, T t3, T t4, T t5, T t6, T t7) {
        return from(Arrays.asList(t, t2, t3, t4, t5, t6, t7));
    }

    public static <T> Stream<T> just(T t, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
        return from(Arrays.asList(t, t2, t3, t4, t5, t6, t7, t8));
    }

    public static <T> Stream<T> generate(Supplier<? extends T> supplier) {
        if (supplier == null) {
            throw new IllegalArgumentException("Supplier must be provided");
        }
        return new SupplierStream(SynchronousDispatcher.INSTANCE, supplier);
    }

    public static <T> Action<Publisher<? extends T>, T> switchOnNext() {
        return switchOnNext((Dispatcher) SynchronousDispatcher.INSTANCE);
    }

    public static <T> Action<Publisher<? extends T>, T> switchOnNext(Dispatcher dispatcher) {
        return new SwitchAction(dispatcher);
    }

    public static <T> Stream<T> switchOnNext(Publisher<? extends Publisher<? extends T>> publisher) {
        return switchOnNext(publisher, SynchronousDispatcher.INSTANCE);
    }

    public static <T> Stream<T> switchOnNext(Publisher<? extends Publisher<? extends T>> publisher, Dispatcher dispatcher) {
        SwitchAction switchAction = new SwitchAction(dispatcher);
        publisher.subscribe(switchAction);
        return switchAction;
    }

    public static <T> Stream<T> concat(Iterable<? extends Publisher<? extends T>> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends Publisher<? extends T>> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        int size = arrayList.size();
        if (size == 1) {
            return wrap((Publisher) arrayList.get(0));
        }
        if (size == 0) {
            return empty();
        }
        Subscriber concatAction = new ConcatAction();
        from(iterable).subscribe(concatAction);
        return concatAction;
    }

    public static <T> Stream<T> concat(Publisher<? extends Publisher<? extends T>> publisher) {
        ConcatAction concatAction = new ConcatAction();
        publisher.subscribe(concatAction);
        return concatAction;
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2) {
        return concat(Arrays.asList(publisher, publisher2));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3) {
        return concat(Arrays.asList(publisher, publisher2, publisher3));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4) {
        return concat(Arrays.asList(publisher, publisher2, publisher3, publisher4));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5) {
        return concat(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6) {
        return concat(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6, Publisher<? extends T> publisher7) {
        return concat(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6, Publisher<? extends T> publisher7, Publisher<? extends T> publisher8) {
        return concat(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7, publisher8));
    }

    public static <T> Stream<T> merge(Iterable<? extends Publisher<? extends T>> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends Publisher<? extends T>> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        return arrayList.size() == 0 ? empty() : arrayList.size() == 1 ? wrap((Publisher) arrayList.get(0)) : new MergeAction(SynchronousDispatcher.INSTANCE, arrayList);
    }

    public static <T, E extends T> Stream<E> merge(Publisher<? extends Publisher<E>> publisher) {
        DynamicMergeAction dynamicMergeAction = new DynamicMergeAction(null);
        publisher.subscribe(dynamicMergeAction);
        return dynamicMergeAction;
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2) {
        return merge(Arrays.asList(publisher, publisher2));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3) {
        return merge(Arrays.asList(publisher, publisher2, publisher3));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4) {
        return merge(Arrays.asList(publisher, publisher2, publisher3, publisher4));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5) {
        return merge(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6) {
        return merge(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6, Publisher<? extends T> publisher7) {
        return merge(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6, Publisher<? extends T> publisher7, Publisher<? extends T> publisher8) {
        return merge(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7, publisher8));
    }

    public static <T1, T2, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Function<Tuple2<T1, T2>, ? extends V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2), function);
    }

    public static <T1, T2, T3, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Function<Tuple3<T1, T2, T3>, ? extends V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2, publisher3), function);
    }

    public static <T1, T2, T3, T4, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Function<Tuple4<T1, T2, T3, T4>, V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2, publisher3, publisher4), function);
    }

    public static <T1, T2, T3, T4, T5, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Function<Tuple5<T1, T2, T3, T4, T5>, V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5), function);
    }

    public static <T1, T2, T3, T4, T5, T6, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Publisher<? extends T6> publisher6, Function<Tuple6<T1, T2, T3, T4, T5, T6>, V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6), function);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Publisher<? extends T6> publisher6, Publisher<? extends T7> publisher7, Function<Tuple7<T1, T2, T3, T4, T5, T6, T7>, V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7), function);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, T8, V> Stream<V> combineLatest(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Publisher<? extends T6> publisher6, Publisher<? extends T7> publisher7, Publisher<? extends T8> publisher8, Function<Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>, ? extends V> function) {
        return combineLatest(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7, publisher8), function);
    }

    public static <TUPLE extends Tuple, V> Stream<V> combineLatest(Iterable<? extends Publisher<?>> iterable, Function<TUPLE, ? extends V> function) {
        return new CombineLatestAction(SynchronousDispatcher.INSTANCE, function, iterable);
    }

    public static <E, TUPLE extends Tuple, V> Stream<V> combineLatest(Publisher<? extends Publisher<E>> publisher, Function<TUPLE, ? extends V> function) {
        DynamicMergeAction dynamicMergeAction = new DynamicMergeAction(new CombineLatestAction(SynchronousDispatcher.INSTANCE, function, null));
        publisher.subscribe(dynamicMergeAction);
        return dynamicMergeAction;
    }

    public static <T1, T2, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Function<Tuple2<T1, T2>, ? extends V> function) {
        return zip(Arrays.asList(publisher, publisher2), function);
    }

    public static <T1, T2, T3, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Function<Tuple3<T1, T2, T3>, ? extends V> function) {
        return zip(Arrays.asList(publisher, publisher2, publisher3), function);
    }

    public static <T1, T2, T3, T4, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Function<Tuple4<T1, T2, T3, T4>, V> function) {
        return zip(Arrays.asList(publisher, publisher2, publisher3, publisher4), function);
    }

    public static <T1, T2, T3, T4, T5, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Function<Tuple5<T1, T2, T3, T4, T5>, V> function) {
        return zip(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5), function);
    }

    public static <T1, T2, T3, T4, T5, T6, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Publisher<? extends T6> publisher6, Function<Tuple6<T1, T2, T3, T4, T5, T6>, V> function) {
        return zip(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6), function);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Publisher<? extends T6> publisher6, Publisher<? extends T7> publisher7, Function<Tuple7<T1, T2, T3, T4, T5, T6, T7>, V> function) {
        return zip(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7), function);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, T8, V> Stream<V> zip(Publisher<? extends T1> publisher, Publisher<? extends T2> publisher2, Publisher<? extends T3> publisher3, Publisher<? extends T4> publisher4, Publisher<? extends T5> publisher5, Publisher<? extends T6> publisher6, Publisher<? extends T7> publisher7, Publisher<? extends T8> publisher8, Function<Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>, ? extends V> function) {
        return zip(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7, publisher8), function);
    }

    public static <TUPLE extends Tuple, V> Stream<V> zip(Iterable<? extends Publisher<?>> iterable, Function<TUPLE, ? extends V> function) {
        return new ZipAction(SynchronousDispatcher.INSTANCE, function, iterable);
    }

    public static <E, TUPLE extends Tuple, V> Stream<V> zip(Publisher<? extends Publisher<E>> publisher, Function<TUPLE, ? extends V> function) {
        DynamicMergeAction dynamicMergeAction = new DynamicMergeAction(new ZipAction(SynchronousDispatcher.INSTANCE, function, null));
        publisher.subscribe(dynamicMergeAction);
        return dynamicMergeAction;
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2) {
        return join(Arrays.asList(publisher, publisher2));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3) {
        return join(Arrays.asList(publisher, publisher2, publisher3));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4) {
        return join(Arrays.asList(publisher, publisher2, publisher3, publisher4));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5) {
        return join(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6) {
        return join(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6, Publisher<? extends T> publisher7) {
        return join(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> publisher, Publisher<? extends T> publisher2, Publisher<? extends T> publisher3, Publisher<? extends T> publisher4, Publisher<? extends T> publisher5, Publisher<? extends T> publisher6, Publisher<? extends T> publisher7, Publisher<? extends T> publisher8) {
        return join(Arrays.asList(publisher, publisher2, publisher3, publisher4, publisher5, publisher6, publisher7, publisher8));
    }

    public static <T> Stream<List<T>> join(Iterable<? extends Publisher<? extends T>> iterable) {
        return (Action) zip(iterable, ZipAction.joinZipper());
    }

    public static <T> Stream<List<T>> join(Publisher<? extends Publisher<T>> publisher) {
        return zip(publisher, ZipAction.joinZipper());
    }

    public static void await(Publisher<?> publisher) throws Throwable {
        long j = 30000;
        if (Environment.alive()) {
            j = ((Long) Environment.get().getProperty("reactor.await.defaultTimeout", Long.class, 30000L)).longValue();
        }
        await(publisher, j, TimeUnit.MILLISECONDS, true);
    }

    public static void await(Publisher<?> publisher, long j) throws Throwable {
        await(publisher, j, TimeUnit.SECONDS, true);
    }

    public static void await(Publisher<?> publisher, long j, TimeUnit timeUnit) throws Throwable {
        await(publisher, j, timeUnit, true);
    }

    public static void await(Publisher<?> publisher, long j, TimeUnit timeUnit, final boolean z) throws Throwable {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        publisher.subscribe(new DefaultSubscriber<Object>() { // from class: reactor.rx.Streams.2
            Subscription s;

            @Override // reactor.rx.action.support.DefaultSubscriber
            public void onSubscribe(Subscription subscription) {
                this.s = subscription;
                if (z) {
                    subscription.request(Long.MAX_VALUE);
                }
            }

            @Override // reactor.rx.action.support.DefaultSubscriber
            public void onError(Throwable th) {
                atomicReference.set(th);
                cancel();
                countDownLatch.countDown();
            }

            @Override // reactor.rx.action.support.DefaultSubscriber
            public void onComplete() {
                cancel();
                countDownLatch.countDown();
            }

            void cancel() {
                if (this.s != null) {
                    try {
                        this.s.cancel();
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            }
        });
        countDownLatch.await(j, timeUnit);
        if (atomicReference.get() != null) {
            throw ((Throwable) atomicReference.get());
        }
    }
}
