package org.springframework.messaging.tcp.reactor;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Properties;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.configuration.ConfigurationReader;
import reactor.core.configuration.DispatcherConfiguration;
import reactor.core.configuration.ReactorConfiguration;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.netty.tcp.NettyTcpClient;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.2.1.redhat-211-03.zip:modules/system/layers/fuse/org/springframework/spring-messaging/main/spring-messaging-4.1.6.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor11TcpClient.class */
public class Reactor11TcpClient<P> implements TcpOperations<P> {
    public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
    private final TcpClient<Message<P>, Message<P>> tcpClient;
    private final Environment environment;

    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.2.1.redhat-211-03.zip:modules/system/layers/fuse/org/springframework/spring-messaging/main/spring-messaging-4.1.6.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor11TcpClient$SynchronousDispatcherConfigReader.class */
    private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
        private SynchronousDispatcherConfigReader() {
        }

        public ReactorConfiguration read() {
            return new ReactorConfiguration(Arrays.asList(new DispatcherConfiguration[0]), "sync", new Properties());
        }
    }

    public Reactor11TcpClient(String str, int i, Codec<Buffer, Message<P>, Message<P>> codec) {
        this.environment = new Environment(new SynchronousDispatcherConfigReader());
        this.tcpClient = (TcpClient) new TcpClientSpec(REACTOR_TCP_CLIENT_TYPE).env(this.environment).codec(codec).connect(str, i).get();
    }

    public Reactor11TcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
        Assert.notNull(tcpClient, "'tcpClient' must not be null");
        this.tcpClient = tcpClient;
        this.environment = null;
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler) {
        Promise open = this.tcpClient.open();
        composeConnectionHandling(open, tcpConnectionHandler);
        return new AbstractPromiseToListenableFutureAdapter<NetChannel<Message<P>, Message<P>>, Void>(open) { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter
            public Void adapt(NetChannel<Message<P>, Message<P>> netChannel) {
                return null;
            }
        };
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler, final ReconnectStrategy reconnectStrategy) {
        Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null");
        Stream open = this.tcpClient.open(new Reconnect() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.2
            public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress inetSocketAddress, int i) {
                return Tuple.of(inetSocketAddress, reconnectStrategy.getTimeToNextAttempt(i));
            }
        });
        composeConnectionHandling(open, tcpConnectionHandler);
        return new PassThroughPromiseToListenableFutureAdapter(Promises.next(open).map(new Function<NetChannel<Message<P>, Message<P>>, Void>() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.3
            public Void apply(NetChannel<Message<P>, Message<P>> netChannel) {
                return null;
            }
        }));
    }

    private void composeConnectionHandling(Composable<NetChannel<Message<P>, Message<P>>> composable, final TcpConnectionHandler<P> tcpConnectionHandler) {
        composable.when(Throwable.class, new Consumer<Throwable>() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.5
            public void accept(Throwable th) {
                tcpConnectionHandler.afterConnectFailure(th);
            }
        }).consume(new Consumer<NetChannel<Message<P>, Message<P>>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.4
            public void accept(NetChannel<Message<P>, Message<P>> netChannel) {
                netChannel.when(Throwable.class, new Consumer<Throwable>() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.4.3
                    public void accept(Throwable th) {
                        tcpConnectionHandler.handleFailure(th);
                    }
                }).consume(new Consumer<Message<P>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.4.2
                    public void accept(Message<P> message) {
                        tcpConnectionHandler.handleMessage(message);
                    }
                }).on().close(new Runnable() { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        tcpConnectionHandler.afterConnectionClosed();
                    }
                });
                tcpConnectionHandler.afterConnected(new Reactor11TcpConnection(netChannel));
            }
        });
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Boolean> shutdown() {
        try {
            return new AbstractPromiseToListenableFutureAdapter<Boolean, Boolean>(this.tcpClient.close()) { // from class: org.springframework.messaging.tcp.reactor.Reactor11TcpClient.6
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter
                public Boolean adapt(Boolean bool) {
                    return bool;
                }
            };
        } finally {
            this.environment.shutdown();
        }
    }
}
