package org.apache.activemq.artemis.tests.integration.stomp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.IOException;
import java.net.Socket;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Before;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.class */
public abstract class StompTestBase extends ActiveMQTestBase {
    private ConnectionFactory connectionFactory;
    protected Connection connection;
    protected Session session;
    protected Queue queue;
    protected Topic topic;
    protected JMSServerManager server;
    protected final int port = 61613;
    protected String defUser = "brianm";
    protected String defPass = "wombats";
    protected boolean autoCreateServer = true;
    private List<Bootstrap> bootstraps = new ArrayList();
    private List<BlockingQueue<String>> priorityQueues = new ArrayList();
    private List<EventLoopGroup> groups = new ArrayList();
    private List<Channel> channels = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/stomp/StompTestBase$StompClientHandler.class */
    public class StompClientHandler extends SimpleChannelInboundHandler<String> {
        int index;
        StringBuffer currentMessage = new StringBuffer("");

        /* JADX INFO: Access modifiers changed from: package-private */
        public StompClientHandler(int i) {
            this.index = 0;
            this.index = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
            this.currentMessage.append(str);
            String stringBuffer = this.currentMessage.toString();
            if (stringBuffer.contains("��\n")) {
                int indexOf = stringBuffer.indexOf("��\n");
                String substring = stringBuffer.substring(0, indexOf);
                String substring2 = stringBuffer.substring(indexOf + 2);
                this.currentMessage = new StringBuffer("");
                BlockingQueue blockingQueue = (BlockingQueue) StompTestBase.this.priorityQueues.get(this.index);
                if (blockingQueue == null) {
                    blockingQueue = new ArrayBlockingQueue(1000);
                    StompTestBase.this.priorityQueues.add(this.index, blockingQueue);
                }
                blockingQueue.add(substring);
                if (substring2.length() > 0) {
                    channelRead(channelHandlerContext, substring2);
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            th.printStackTrace();
            channelHandlerContext.close();
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        if (this.autoCreateServer) {
            this.server = createServer();
            addServer(this.server.getActiveMQServer());
            this.server.start();
            this.connectionFactory = createConnectionFactory();
            createBootstrap();
            if (isSecurityEnabled()) {
                this.connection = this.connectionFactory.createConnection("brianm", "wombats");
            } else {
                this.connection = this.connectionFactory.createConnection();
            }
            this.session = this.connection.createSession(false, 1);
            this.queue = this.session.createQueue(getQueueName());
            this.topic = this.session.createTopic(getTopicName());
            this.connection.start();
        }
    }

    private void createBootstrap() {
        createBootstrap(0, 61613);
    }

    protected void createBootstrap(int i) {
        createBootstrap(0, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createBootstrap(final int i, int i2) {
        this.priorityQueues.add(i, new ArrayBlockingQueue(1000));
        this.groups.add(i, new NioEventLoopGroup());
        this.bootstraps.add(i, new Bootstrap());
        this.bootstraps.get(i).group(this.groups.get(i)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.activemq.artemis.tests.integration.stomp.StompTestBase.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                StompTestBase.this.addChannelHandlers(i, socketChannel);
            }
        });
        try {
            this.channels.add(i, this.bootstraps.get(i).connect("localhost", i2).sync().channel());
            handshake();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    protected void handshake() throws InterruptedException {
    }

    protected void addChannelHandlers(int i, SocketChannel socketChannel) throws URISyntaxException {
        socketChannel.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
        socketChannel.pipeline().addLast(new ChannelHandler[]{new StompClientHandler(i)});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setUpAfterServer() throws Exception {
        setUpAfterServer(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setUpAfterServer(boolean z) throws Exception {
        this.connectionFactory = createConnectionFactory();
        this.connectionFactory.setCompressLargeMessage(z);
        createBootstrap();
        this.connection = this.connectionFactory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.queue = this.session.createQueue(getQueueName());
        this.topic = this.session.createTopic(getTopicName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JMSServerManager createServer() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("protocols", AbstractStompClientConnection.STOMP_COMMAND);
        hashMap.put("port", 61613);
        hashMap.put("stompConsumerCredits", "-1");
        TransportConfiguration transportConfiguration = new TransportConfiguration(NettyAcceptorFactory.class.getName(), hashMap);
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(NettyAcceptorFactory.class.getName());
        ConfigurationImpl addAcceptorConfiguration = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(transportConfiguration).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
        addAcceptorConfiguration.addAcceptorConfiguration(transportConfiguration2);
        ActiveMQServer addServer = addServer(ActiveMQServers.newActiveMQServer(addAcceptorConfiguration, this.defUser, this.defPass));
        if (isSecurityEnabled()) {
            addServer.getSecurityManager().getConfiguration().addRole(this.defUser, "testRole");
            addAcceptorConfiguration.getSecurityRoles().put("#", new HashSet<Role>() { // from class: org.apache.activemq.artemis.tests.integration.stomp.StompTestBase.2
                {
                    add(new Role("testRole", true, true, true, true, true, true, true, true));
                }
            });
        }
        JMSConfigurationImpl jMSConfigurationImpl = new JMSConfigurationImpl();
        jMSConfigurationImpl.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(new String[]{getQueueName()}));
        jMSConfigurationImpl.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(new String[]{getTopicName()}));
        this.server = new JMSServerManagerImpl(addServer, jMSConfigurationImpl);
        this.server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
        return this.server;
    }

    @After
    public void tearDown() throws Exception {
        if (this.autoCreateServer) {
            this.connection.close();
            for (EventLoopGroup eventLoopGroup : this.groups) {
                if (eventLoopGroup != null) {
                    Iterator<Channel> it = this.channels.iterator();
                    while (it.hasNext()) {
                        it.next().close();
                    }
                    eventLoopGroup.shutdownGracefully(0L, 5000L, TimeUnit.MILLISECONDS);
                }
            }
        }
        super.tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanUp() throws Exception {
        this.connection.close();
        if (this.groups.get(0) != null) {
            this.groups.get(0).shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reconnect() throws Exception {
        reconnect(0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reconnect(long j) throws Exception {
        this.groups.get(0).shutdown();
        if (j > 0) {
            Thread.sleep(j);
        }
        createBootstrap();
    }

    protected ConnectionFactory createConnectionFactory() {
        return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration[]{new TransportConfiguration(InVMConnectorFactory.class.getName())});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Socket createSocket() throws IOException {
        return new Socket("localhost", 61613);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getQueueName() {
        return "test";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getQueuePrefix() {
        return "jms.queue.";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getTopicName() {
        return "testtopic";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getTopicPrefix() {
        return "jms.topic.";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertChannelClosed() throws InterruptedException {
        assertChannelClosed(0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertChannelClosed(int i) throws InterruptedException {
        assertTrue("channel not closed", this.channels.get(i).closeFuture().await(5000L));
    }

    public void sendFrame(String str) throws Exception {
        sendFrame(0, str);
    }

    public void sendFrame(int i, String str) throws Exception {
        this.channels.get(i).writeAndFlush(str);
    }

    public void sendFrame(byte[] bArr) throws Exception {
        sendFrame(0, bArr);
    }

    public void sendFrame(int i, byte[] bArr) throws Exception {
        ByteBuf buffer = Unpooled.buffer(bArr.length);
        buffer.writeBytes(bArr);
        this.channels.get(i).writeAndFlush(buffer);
    }

    public String receiveFrame(long j) throws Exception {
        return receiveFrame(0, j);
    }

    public String receiveFrame(int i, long j) throws Exception {
        return this.priorityQueues.get(i).poll(j, TimeUnit.MILLISECONDS);
    }

    public void sendMessage(String str) throws Exception {
        sendMessage(str, (Destination) this.queue);
    }

    public void sendMessage(String str, Destination destination) throws Exception {
        this.session.createProducer(destination).send(this.session.createTextMessage(str));
    }

    public void sendMessage(byte[] bArr, Destination destination) throws Exception {
        sendMessage(bArr, "foo", "xyz", destination);
    }

    public void sendMessage(String str, String str2, String str3) throws Exception {
        sendMessage(str.getBytes(StandardCharsets.UTF_8), str2, str3, this.queue);
    }

    public void sendMessage(byte[] bArr, String str, String str2, Destination destination) throws Exception {
        MessageProducer createProducer = this.session.createProducer(destination);
        BytesMessage createBytesMessage = this.session.createBytesMessage();
        createBytesMessage.setStringProperty(str, str2);
        createBytesMessage.writeBytes(bArr);
        createProducer.send(createBytesMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForReceipt() throws Exception {
        String receiveFrame = receiveFrame(50000L);
        assertNotNull(receiveFrame);
        assertTrue(receiveFrame.indexOf("RECEIPT") > -1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForFrameToTakeEffect() throws InterruptedException {
        Thread.sleep(500L);
    }

    public boolean isSecurityEnabled() {
        return false;
    }
}
