package org.apache.activemq.bugs;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6240Test.class */
public class AMQ6240Test extends EmbeddedBrokerTestSupport {
    static final Logger LOG = LoggerFactory.getLogger(AMQ6240Test.class);

    public void testBlockedTxProducerConnectionTimeoutConnectionCanClose() throws Exception {
        final ActiveMQConnection createConnection = createConnection();
        final ActiveMQDestination createDestination = createDestination("noPfc");
        createConnection.setSendTimeout(4000);
        createConnection.setCloseTimeout(1000);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.bugs.AMQ6240Test.1
            @Override // java.lang.Runnable
            public void run() {
                int i = 0;
                try {
                    AMQ6240Test.LOG.info("Sender thread starting");
                    Session createSession = createConnection.createSession(true, 0);
                    MessageProducer createProducer = createSession.createProducer(createDestination);
                    BytesMessage createBytesMessage = createSession.createBytesMessage();
                    createBytesMessage.writeBytes(new byte[8192]);
                    while (i < 100) {
                        createProducer.send(createBytesMessage);
                        i++;
                    }
                    AMQ6240Test.LOG.info("Done sending..");
                } catch (JMSException e) {
                    if (!(e.getCause() instanceof RequestTimedOutIOException)) {
                        e.printStackTrace();
                    } else {
                        atomicInteger.incrementAndGet();
                        AMQ6240Test.LOG.info("Got expected send time out on message: " + i);
                    }
                }
            }
        };
        createConnection.start();
        Thread thread = new Thread(runnable);
        thread.start();
        thread.join(7000L);
        createConnection.close();
        assertTrue("No exception from the broker", atomicInteger.get() > 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.addConnector(this.bindAddress);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setProducerFlowControl(false);
        policyMap.put(new ActiveMQQueue("noPfc"), policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        new KahaDBPersistenceAdapter().setJournalMaxFileLength(16384);
        brokerService.getSystemUsage().getStoreUsage().setLimit(34816L);
        brokerService.setDeleteAllMessagesOnStartup(true);
        return brokerService;
    }
}
