package reactor.net;

import com.gs.collections.impl.list.mutable.FastList;
import java.util.Collection;
import java.util.Iterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.event.Event;
import reactor.event.registry.CachingRegistry;
import reactor.event.registry.Registration;
import reactor.event.registry.Registry;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.util.Assert;

/* loaded from: input_file:reactor/net/AbstractNetPeer.class */
public abstract class AbstractNetPeer<IN, OUT> {
    private final Registry<NetChannel<IN, OUT>> netChannels = new CachingRegistry();
    private final Event<AbstractNetPeer<IN, OUT>> selfEvent = Event.wrap(this);
    private final Selector open = Selectors.$();
    private final Selector close = Selectors.$();
    private final Selector start = Selectors.$();
    private final Selector shutdown = Selectors.$();
    private final Environment env;

    /* renamed from: reactor, reason: collision with root package name */
    private final Reactor f0reactor;
    private final Codec<Buffer, IN, OUT> codec;
    private final Collection<Consumer<NetChannel<IN, OUT>>> consumers;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractNetPeer(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> collection) {
        this.env = environment;
        this.f0reactor = reactor2;
        this.codec = codec;
        this.consumers = collection;
        for (final Consumer<NetChannel<IN, OUT>> consumer : collection) {
            reactor2.on(this.open, new Consumer<Event<NetChannel<IN, OUT>>>() { // from class: reactor.net.AbstractNetPeer.1
                public void accept(Event<NetChannel<IN, OUT>> event) {
                    consumer.accept(event.getData());
                }
            });
        }
    }

    public Promise<Boolean> close() {
        Deferred defer = Promises.defer(this.env, this.f0reactor.getDispatcher());
        close((Consumer<Boolean>) defer);
        return defer.compose();
    }

    public void close(@Nullable Consumer<Boolean> consumer) {
        Iterator it = getChannels().iterator();
        while (it.hasNext()) {
            Registration registration = (Registration) it.next();
            if (!registration.isCancelled()) {
                doCloseChannel((NetChannel) registration.getObject());
            }
        }
        if (null != consumer) {
            this.f0reactor.schedule(consumer, true);
        }
    }

    public Iterator<NetChannel<IN, OUT>> iterator() {
        FastList newList = FastList.newList();
        Iterator it = getChannels().iterator();
        while (it.hasNext()) {
            newList.add(((Registration) it.next()).getObject());
        }
        return newList.iterator();
    }

    protected <C> Registration<? extends NetChannel<IN, OUT>> register(@Nonnull C c, @Nonnull NetChannel<IN, OUT> netChannel) {
        Assert.notNull(c, "Channel cannot be null.");
        Assert.notNull(netChannel, "NetChannel cannot be null.");
        return this.netChannels.register(Selectors.$(c), netChannel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <C> NetChannel<IN, OUT> select(@Nonnull C c) {
        Assert.notNull(c, "Channel cannot be null.");
        Iterator it = this.netChannels.select(c).iterator();
        if (it.hasNext()) {
            return (NetChannel) ((Registration) it.next()).getObject();
        }
        NetChannel<IN, OUT> createChannel = createChannel(c);
        register(c, createChannel);
        notifyOpen(createChannel);
        return createChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <C> void close(@Nonnull C c) {
        Assert.notNull(c, "Channel cannot be null");
        for (Registration registration : this.netChannels.select(c)) {
            NetChannel<IN, OUT> netChannel = (NetChannel) registration.getObject();
            registration.cancel();
            notifyClose(netChannel);
        }
    }

    protected abstract <C> NetChannel<IN, OUT> createChannel(C c);

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyStart(Runnable runnable) {
        getReactor().notify(this.start.getObject(), this.selfEvent);
        if (null != runnable) {
            getReactor().schedule(new Consumer<Runnable>() { // from class: reactor.net.AbstractNetPeer.2
                public void accept(Runnable runnable2) {
                    runnable2.run();
                }
            }, runnable);
        }
    }

    protected void notifyError(@Nonnull Throwable th) {
        Assert.notNull(th, "Error cannot be null.");
        this.f0reactor.notify(th.getClass(), Event.wrap(th));
    }

    protected void notifyOpen(@Nonnull NetChannel<IN, OUT> netChannel) {
        this.f0reactor.notify(this.open.getObject(), Event.wrap(netChannel));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyClose(@Nonnull NetChannel<IN, OUT> netChannel) {
        this.f0reactor.notify(this.close.getObject(), Event.wrap(netChannel));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyShutdown() {
        getReactor().notify(this.shutdown.getObject(), this.selfEvent);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Codec<Buffer, IN, OUT> getCodec() {
        return this.codec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nonnull
    public Environment getEnvironment() {
        return this.env;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nonnull
    public Reactor getReactor() {
        return this.f0reactor;
    }

    @Nonnull
    protected Collection<Consumer<NetChannel<IN, OUT>>> getConsumers() {
        return this.consumers;
    }

    @Nonnull
    protected Registry<NetChannel<IN, OUT>> getChannels() {
        return this.netChannels;
    }

    protected void doClose(@Nullable Consumer<Boolean> consumer) {
        getReactor().schedule(consumer, true);
    }

    protected void doCloseChannel(NetChannel<IN, OUT> netChannel) {
        netChannel.close();
    }
}
