package reactor.net;

import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.Queue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.support.NotifyConsumer;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.registry.Registration;
import reactor.event.routing.EventRouter;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.event.support.EventConsumer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.batch.BatchConsumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.queue.BlockingQueueFactory;
import reactor.util.Assert;

/* loaded from: input_file:reactor/net/AbstractNetChannel.class */
public abstract class AbstractNetChannel<IN, OUT> implements NetChannel<IN, OUT> {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Selector read = Selectors.$();
    private final Environment env;
    private final Reactor ioReactor;
    private final Reactor eventsReactor;
    private final Codec<Buffer, IN, OUT> codec;
    private final Function<Buffer, IN> decoder;
    private final Function<OUT, Buffer> encoder;
    private final Queue<Object> replyToKeys;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/net/AbstractNetChannel$WriteConsumer.class */
    public final class WriteConsumer implements BatchConsumer<OUT> {
        private final Deferred<Void, Promise<Void>> onComplete;
        private volatile boolean autoflush;

        private WriteConsumer(Deferred<Void, Promise<Void>> deferred) {
            this.autoflush = true;
            this.onComplete = deferred;
        }

        public void start() {
            this.autoflush = false;
        }

        public void end() {
            AbstractNetChannel.this.flush();
            this.autoflush = true;
        }

        public void accept(OUT out) {
            try {
                if (null != AbstractNetChannel.this.encoder) {
                    Buffer buffer = (Buffer) AbstractNetChannel.this.encoder.apply(out);
                    if (buffer.remaining() > 0) {
                        AbstractNetChannel.this.write(buffer, this.onComplete, this.autoflush);
                    }
                } else if (Buffer.class.isInstance(out)) {
                    AbstractNetChannel.this.write((Buffer) out, this.onComplete, this.autoflush);
                } else {
                    AbstractNetChannel.this.write(out, this.onComplete, this.autoflush);
                }
            } catch (Throwable th) {
                AbstractNetChannel.this.eventsReactor.notify(th.getClass(), Event.wrap(th));
                if (null != this.onComplete) {
                    this.onComplete.accept(th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractNetChannel(@Nonnull Environment environment, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Dispatcher dispatcher, @Nonnull Reactor reactor2) {
        Assert.notNull(environment, "IO Dispatcher cannot be null");
        Assert.notNull(environment, "Events Reactor cannot be null");
        this.env = environment;
        this.ioReactor = new Reactor(dispatcher, (EventRouter) null, reactor2.getDispatchErrorHandler(), reactor2.getUncaughtErrorHandler());
        this.eventsReactor = new Reactor(reactor2.getDispatcher(), (EventRouter) null, reactor2.getDispatchErrorHandler(), reactor2.getUncaughtErrorHandler());
        this.eventsReactor.getConsumerRegistry().clear();
        for (Registration registration : reactor2.getConsumerRegistry()) {
            this.eventsReactor.getConsumerRegistry().register(registration.getSelector(), registration.getObject());
        }
        this.codec = codec;
        if (null != codec) {
            this.decoder = codec.decoder(new NotifyConsumer(this.read.getObject(), this.eventsReactor));
            this.encoder = codec.encoder();
        } else {
            this.decoder = null;
            this.encoder = null;
        }
        this.replyToKeys = BlockingQueueFactory.createQueue();
        consume(new Consumer<IN>() { // from class: reactor.net.AbstractNetChannel.1
            public void accept(IN in) {
                try {
                    if (!AbstractNetChannel.this.replyToKeys.isEmpty()) {
                        AbstractNetChannel.this.eventsReactor.notify(AbstractNetChannel.this.replyToKeys.remove(), Event.wrap(in));
                    }
                } catch (NoSuchElementException e) {
                }
            }
        });
    }

    public Function<Buffer, IN> getDecoder() {
        return this.decoder;
    }

    public Function<OUT, Buffer> getEncoder() {
        return this.encoder;
    }

    @Override // reactor.net.NetChannel
    public Stream<IN> in() {
        final Deferred deferred = new Deferred(new Stream(this.eventsReactor, -1, (Composable) null, this.env));
        consume(new Consumer<IN>() { // from class: reactor.net.AbstractNetChannel.2
            public void accept(IN in) {
                deferred.accept(in);
            }
        });
        return deferred.compose();
    }

    @Override // reactor.net.NetChannel
    public BatchConsumer<OUT> out() {
        return new WriteConsumer(null);
    }

    @Override // reactor.net.NetChannel
    public <T extends Throwable> NetChannel<IN, OUT> when(Class<T> cls, Consumer<T> consumer) {
        this.eventsReactor.on(Selectors.T(cls), new EventConsumer(consumer));
        return this;
    }

    @Override // reactor.net.NetChannel
    public NetChannel<IN, OUT> consume(final Consumer<IN> consumer) {
        this.eventsReactor.on(this.read, new Consumer<Event<IN>>() { // from class: reactor.net.AbstractNetChannel.3
            public void accept(Event<IN> event) {
                consumer.accept(event.getData());
            }
        });
        return this;
    }

    @Override // reactor.net.NetChannel
    public NetChannel<IN, OUT> receive(final Function<IN, OUT> function) {
        consume(new Consumer<IN>() { // from class: reactor.net.AbstractNetChannel.4
            /* JADX WARN: Multi-variable type inference failed */
            public void accept(IN in) {
                AbstractNetChannel.this.send((AbstractNetChannel) function.apply(in));
            }
        });
        return this;
    }

    @Override // reactor.net.NetChannel
    public NetChannel<IN, OUT> send(Stream<OUT> stream) {
        stream.consume(new Consumer<OUT>() { // from class: reactor.net.AbstractNetChannel.5
            public void accept(OUT out) {
                AbstractNetChannel.this.send(out, null);
            }
        });
        return this;
    }

    @Override // reactor.net.NetChannel
    public Promise<Void> send(OUT out) {
        Deferred<Void, Promise<Void>> defer = Promises.defer(this.env, this.eventsReactor.getDispatcher());
        send(out, defer);
        return defer.compose();
    }

    @Override // reactor.net.NetChannel
    public NetChannel<IN, OUT> sendAndForget(OUT out) {
        send(out, null);
        return this;
    }

    @Override // reactor.net.NetChannel
    public Promise<IN> sendAndReceive(OUT out) {
        Deferred defer = Promises.defer(this.env, this.eventsReactor.getDispatcher());
        Selector $ = Selectors.$();
        this.eventsReactor.on($, new EventConsumer(defer)).cancelAfterUse();
        this.replyToKeys.add($.getObject());
        send(out, null);
        return defer.compose();
    }

    @Override // reactor.net.NetChannel
    public Promise<Boolean> close() {
        Deferred defer = Promises.defer(getEnvironment(), this.eventsReactor.getDispatcher());
        this.eventsReactor.getConsumerRegistry().unregister(this.read.getObject());
        close(defer);
        return defer.compose();
    }

    protected void send(OUT out, Deferred<Void, Promise<Void>> deferred) {
        this.ioReactor.schedule(new WriteConsumer(deferred), out);
    }

    public boolean read(Buffer buffer) {
        if (null == this.decoder || null == buffer.byteBuffer()) {
            this.eventsReactor.notify(this.read.getObject(), Event.wrap(buffer));
        } else {
            this.decoder.apply(buffer);
        }
        return buffer.remaining() > 0;
    }

    public void notifyRead(Object obj) {
        this.eventsReactor.notify(this.read.getObject(), Event.class.isInstance(obj) ? (Event) obj : Event.wrap(obj));
    }

    public void notifyError(Throwable th) {
        this.eventsReactor.notify(th.getClass(), Event.wrap(th));
    }

    protected void write(Buffer buffer, Deferred<Void, Promise<Void>> deferred, boolean z) {
        write(buffer.byteBuffer(), deferred, z);
    }

    protected abstract void write(ByteBuffer byteBuffer, Deferred<Void, Promise<Void>> deferred, boolean z);

    protected abstract void write(Object obj, Deferred<Void, Promise<Void>> deferred, boolean z);

    protected abstract void flush();

    protected Environment getEnvironment() {
        return this.env;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Reactor getEventsReactor() {
        return this.eventsReactor;
    }

    protected Reactor getIoReactor() {
        return this.ioReactor;
    }
}
