package org.apache.activemq.transport;

import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/SoWriteTimeoutClientTest.class */
public class SoWriteTimeoutClientTest extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
        brokerService.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
        return brokerService;
    }

    public void testSendWithClientWriteTimeout() throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("testClientWriteTimeout");
        this.messageTextPrefix = initMessagePrefix(81920);
        URI removeQuery = URISupport.removeQuery(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri());
        LOG.info("consuming using uri: " + removeQuery);
        Connection createConnection = new ActiveMQConnectionFactory(removeQuery).createConnection();
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(activeMQQueue);
        SocketProxy socketProxy = new SocketProxy();
        socketProxy.setTarget(removeQuery);
        socketProxy.open();
        final Connection createConnection2 = new ActiveMQConnectionFactory("failover:(" + socketProxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400").createConnection();
        createConnection2.start();
        socketProxy.pause();
        Executors.newCachedThreadPool().execute(new Runnable() { // from class: org.apache.activemq.transport.SoWriteTimeoutClientTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    SoWriteTimeoutClientTest.this.sendMessages(createConnection2, (Destination) activeMQQueue, 20);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        TimeUnit.SECONDS.sleep(8L);
        socketProxy.goOn();
        for (int i = 0; i < 20; i++) {
            assertNotNull("Got message " + i + " after reconnect", createConsumer.receive(5000L));
        }
        assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.SoWriteTimeoutClientTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                SoWriteTimeoutClientTest.LOG.info("current total message count: " + SoWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount());
                return SoWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount() == 0;
            }
        }));
    }

    private String initMessagePrefix(int i) {
        return new String(new byte[i]);
    }

    public static Test suite() {
        return suite(SoWriteTimeoutClientTest.class);
    }
}
