/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ProducerFlowControlTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;

public class ProducerFlowControlSendFailTest
extends ProducerFlowControlTest {
    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService service = new BrokerService();
        service.setPersistent(false);
        service.setUseJmx(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setMemoryLimit(1L);
        policy.setPendingSubscriberPolicy((PendingSubscriberMessageStoragePolicy)new VMPendingSubscriberMessageStoragePolicy());
        policy.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new VMPendingQueueMessageStoragePolicy());
        policy.setProducerFlowControl(true);
        policyMap.setDefaultEntry(policy);
        service.setDestinationPolicy(policyMap);
        service.getSystemUsage().setSendFailIfNoSpace(true);
        this.connector = service.addConnector("tcp://localhost:0");
        return service;
    }

    @Override
    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
    }

    @Override
    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
    }

    @Override
    public void testPubisherRecoverAfterBlock() throws Exception {
        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)this.createConnectionFactory();
        factory.setUseAsyncSend(true);
        this.connection = (ActiveMQConnection)factory.createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        final Session session = this.connection.createSession(false, 2);
        final MessageProducer producer = session.createProducer((Destination)this.queueA);
        final AtomicBoolean keepGoing = new AtomicBoolean(true);
        Thread thread = new Thread("Filler"){

            @Override
            public void run() {
                while (keepGoing.get()) {
                    try {
                        producer.send((Message)session.createTextMessage("Test message"));
                        if (!ProducerFlowControlSendFailTest.this.gotResourceException.get()) continue;
                        Thread.sleep(200L);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();
        this.waitForBlockedOrResourceLimit(new AtomicBoolean(false));
        MessageConsumer consumer = session.createConsumer((Destination)this.queueA);
        for (int idx = 0; idx < 10; ++idx) {
            TextMessage msg = (TextMessage)consumer.receive(1000L);
            if (msg == null) continue;
            msg.acknowledge();
        }
        keepGoing.set(false);
    }

    public void testPubisherRecoverAfterBlockWithSyncSend() throws Exception {
        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)this.createConnectionFactory();
        factory.setExceptionListener(null);
        factory.setUseAsyncSend(false);
        this.connection = (ActiveMQConnection)factory.createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        final Session session = this.connection.createSession(false, 2);
        final MessageProducer producer = session.createProducer((Destination)this.queueA);
        final AtomicBoolean keepGoing = new AtomicBoolean(true);
        final AtomicInteger exceptionCount = new AtomicInteger(0);
        Thread thread = new Thread("Filler"){

            @Override
            public void run() {
                while (keepGoing.get()) {
                    try {
                        producer.send((Message)session.createTextMessage("Test message"));
                    }
                    catch (JMSException arg0) {
                        if (!(arg0 instanceof ResourceAllocationException)) continue;
                        ProducerFlowControlSendFailTest.this.gotResourceException.set(true);
                        exceptionCount.incrementAndGet();
                    }
                }
            }
        };
        thread.start();
        this.waitForBlockedOrResourceLimit(new AtomicBoolean(false));
        MessageConsumer consumer = session.createConsumer((Destination)this.queueA);
        for (int idx = 0; idx < 10; ++idx) {
            TextMessage msg = (TextMessage)consumer.receive(1000L);
            if (msg == null) continue;
            msg.acknowledge();
        }
        ProducerFlowControlSendFailTest.assertTrue((String)"we were blocked at least 5 times", (5 < exceptionCount.get() ? 1 : 0) != 0);
        keepGoing.set(false);
    }

    @Override
    protected ConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.connector.getConnectUri());
        connectionFactory.setExceptionListener(new ExceptionListener(){

            public void onException(JMSException arg0) {
                if (arg0 instanceof ResourceAllocationException) {
                    ProducerFlowControlSendFailTest.this.gotResourceException.set(true);
                }
            }
        });
        return connectionFactory;
    }
}

