package io.vertx.rx.java.test;

import io.vertx.lang.rx.test.FakeWriteStream;
import io.vertx.rx.java.RxHelper;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.Executors;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/vertx/rx/java/test/WriteStreamSubscriberTest.class */
public class WriteStreamSubscriberTest extends VertxTestBase {
    @Test
    public void testObservableErrorReported() throws Exception {
        Exception exc = new Exception();
        FakeWriteStream fakeWriteStream = new FakeWriteStream(this.vertx);
        Observable.error(exc).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(fakeWriteStream).onError(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.sameInstance(exc)));
            complete();
        }));
        await();
        assertFalse("Did not expect writeStream end method to be invoked", fakeWriteStream.endInvoked());
    }

    @Test
    @Repeat(times = 10)
    public void testObservableToWriteStreamVertxThread() throws Exception {
        testObservableToWriteStream(RxHelper.scheduler(this.vertx.getOrCreateContext()));
    }

    @Test
    @Repeat(times = 10)
    public void testObservableToWriteStreamNonVertxThread() throws Exception {
        testObservableToWriteStream(Schedulers.from(Executors.newFixedThreadPool(5)));
    }

    private void testObservableToWriteStream(Scheduler scheduler) throws Exception {
        disableThreadChecks();
        FakeWriteStream fakeWriteStream = new FakeWriteStream(this.vertx);
        Observable.range(0, 10000).observeOn(scheduler).subscribeOn(scheduler).subscribe(RxHelper.toSubscriber(fakeWriteStream).onWriteStreamEnd(this::complete));
        await();
        assertTrue("Expected drainHandler to be invoked", fakeWriteStream.drainHandlerInvoked());
        assertEquals(10000, fakeWriteStream.getCount());
        assertTrue("Expected writeStream end method to be invoked", fakeWriteStream.endInvoked());
    }

    @Test
    public void testWriteStreamError() throws Exception {
        waitFor(2);
        RuntimeException runtimeException = new RuntimeException();
        FakeWriteStream failAfterWrite = new FakeWriteStream(this.vertx).failAfterWrite(runtimeException);
        Observable.create(subscriber -> {
            subscriber.onNext(0);
        }).observeOn(RxHelper.scheduler(this.vertx)).doOnUnsubscribe(this::complete).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(failAfterWrite).onWriteStreamError(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
            complete();
        }));
        await();
        assertFalse("Did not expect writeStream end method to be invoked", failAfterWrite.endInvoked());
    }
}
