/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageNotWriteableException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.network.NetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerNetworkWithStuckMessagesTest {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class);
    private BrokerService localBroker;
    private BrokerService remoteBroker;
    private BrokerService secondRemoteBroker;
    private DemandForwardingBridge bridge;
    protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
    protected ArrayList<StubConnection> connections = new ArrayList();
    protected TransportConnector connector;
    protected TransportConnector remoteConnector;
    protected TransportConnector secondRemoteConnector;
    protected long idGenerator;
    protected int msgIdGenerator;
    protected int tempDestGenerator;
    protected int maxWait = 4000;
    protected String queueName = "TEST";
    protected String amqDomain = "org.apache.activemq";

    @Before
    public void setUp() throws Exception {
        this.createBroker();
        this.createRemoteBroker();
        FileUtils.deleteDirectory((File)new File("activemq-data"));
        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
        config.setBrokerName("local");
        config.setDispatchAsync(false);
        config.setDuplex(true);
        Transport localTransport = this.createTransport();
        Transport remoteTransport = this.createRemoteTransport();
        this.bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
        this.bridge.setBrokerService(this.localBroker);
        this.bridge.start();
        this.createSecondRemoteBroker();
        config = new NetworkBridgeConfiguration();
        config.setBrokerName("remote");
        config.setDuplex(true);
        localTransport = this.createRemoteTransport();
        remoteTransport = this.createSecondRemoteTransport();
        this.bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
        this.bridge.setBrokerService(this.remoteBroker);
        this.bridge.start();
        this.waitForBridgeFormation();
    }

    protected void waitForBridgeFormation() throws Exception {
        for (final BrokerService broker : this.brokers.values()) {
            if (broker.getNetworkConnectors().isEmpty()) continue;
            Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return !((NetworkConnector)broker.getNetworkConnectors().get(0)).activeBridges().isEmpty();
                }
            });
        }
    }

    @After
    public void tearDown() throws Exception {
        this.bridge.stop();
        this.localBroker.stop();
        this.remoteBroker.stop();
        this.secondRemoteBroker.stop();
    }

    @Test(timeout=120000L)
    public void testBrokerNetworkWithStuckMessages() throws Exception {
        int counter;
        int sendNumMessages = 10;
        int receiveNumMessages = 5;
        StubConnection connection1 = this.createConnection();
        ConnectionInfo connectionInfo1 = this.createConnectionInfo();
        SessionInfo sessionInfo1 = this.createSessionInfo(connectionInfo1);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo1);
        connection1.send((Command)connectionInfo1);
        connection1.send((Command)sessionInfo1);
        connection1.send((Command)producerInfo);
        ActiveMQDestination destinationInfo1 = null;
        for (int i = 0; i < sendNumMessages; ++i) {
            destinationInfo1 = this.createDestinationInfo(connection1, connectionInfo1, (byte)1);
            connection1.request((Command)this.createMessage(producerInfo, destinationInfo1, 1));
        }
        Object[] messages = this.browseQueueWithJmx(this.localBroker);
        Assert.assertEquals((long)sendNumMessages, (long)messages.length);
        StubConnection connection2 = this.createRemoteConnection();
        ConnectionInfo connectionInfo2 = this.createConnectionInfo();
        SessionInfo sessionInfo2 = this.createSessionInfo(connectionInfo2);
        connection2.send((Command)connectionInfo2);
        connection2.send((Command)sessionInfo2);
        ActiveMQDestination destinationInfo2 = this.createDestinationInfo(connection2, connectionInfo2, (byte)1);
        ConsumerInfo consumerInfo2 = this.createConsumerInfo(sessionInfo2, destinationInfo2);
        connection2.send((Command)consumerInfo2);
        for (int i = 0; i < receiveNumMessages; ++i) {
            Message message1 = this.receiveMessage(connection2, 20000L);
            Assert.assertNotNull((Object)message1);
            LOG.info("on remote, got: " + message1.getMessageId());
            connection2.send((Command)this.createAck(consumerInfo2, message1, 1, (byte)4));
        }
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] result = BrokerNetworkWithStuckMessagesTest.this.browseQueueWithJmx(BrokerNetworkWithStuckMessagesTest.this.localBroker);
                return 0 == result.length;
            }
        });
        messages = this.browseQueueWithJmx(this.localBroker);
        Assert.assertEquals((long)0L, (long)messages.length);
        LOG.info("creating demand on second remote...");
        StubConnection connection3 = this.createSecondRemoteConnection();
        ConnectionInfo connectionInfo3 = this.createConnectionInfo();
        SessionInfo sessionInfo3 = this.createSessionInfo(connectionInfo3);
        connection3.send((Command)connectionInfo3);
        connection3.send((Command)sessionInfo3);
        ActiveMQDestination destinationInfo3 = this.createDestinationInfo(connection3, connectionInfo3, (byte)1);
        ConsumerInfo consumerInfoS3 = this.createConsumerInfo(sessionInfo3, destinationInfo3);
        connection3.send((Command)consumerInfoS3);
        Message messageExceedingTtl = this.receiveMessage(connection3, 5000L);
        if (messageExceedingTtl != null) {
            LOG.error("got message on Second remote: " + messageExceedingTtl);
            connection3.send((Command)this.createAck(consumerInfoS3, messageExceedingTtl, 1, (byte)4));
        }
        LOG.info("Closing consumer on remote");
        connection2.send((Command)consumerInfo2.createRemoveCommand());
        connection2.send((Command)connectionInfo2.createRemoveCommand());
        Assert.assertTrue((String)"correct stuck message count", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] result = BrokerNetworkWithStuckMessagesTest.this.browseQueueWithJmx(BrokerNetworkWithStuckMessagesTest.this.remoteBroker);
                return 5 == result.length;
            }
        }));
        messages = this.browseQueueWithJmx(this.remoteBroker);
        Assert.assertEquals((long)5L, (long)messages.length);
        LOG.info("Messages now stuck on remote");
        ConsumerInfo consumerInfo1 = this.createConsumerInfo(sessionInfo1, destinationInfo1);
        connection1.send((Command)consumerInfo1);
        LOG.info("create local consumer: " + consumerInfo1);
        Message message1 = this.receiveMessage(connection1, 20000L);
        Assert.assertNotNull((String)"Expect to get a replay as remote consumer is gone", (Object)message1);
        connection1.send((Command)this.createAck(consumerInfo1, message1, 1, (byte)4));
        LOG.info("acked one message on origin, waiting for all messages to percolate back");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] result = BrokerNetworkWithStuckMessagesTest.this.browseQueueWithJmx(BrokerNetworkWithStuckMessagesTest.this.localBroker);
                return 4 == result.length;
            }
        });
        messages = this.browseQueueWithJmx(this.localBroker);
        Assert.assertEquals((long)4L, (long)messages.length);
        LOG.info("checking for messages on remote again");
        connection2 = this.createRemoteConnection();
        connectionInfo2 = this.createConnectionInfo();
        sessionInfo2 = this.createSessionInfo(connectionInfo2);
        connection2.send((Command)connectionInfo2);
        connection2.send((Command)sessionInfo2);
        ConsumerInfo consumerInfo3 = this.createConsumerInfo(sessionInfo2, destinationInfo2);
        connection2.send((Command)consumerInfo3);
        message1 = this.receiveMessage(connection2, 20000L);
        Assert.assertNull((String)("Messages have migrated back: " + message1), (Object)message1);
        for (counter = 1; counter < receiveNumMessages; ++counter) {
            message1 = this.receiveMessage(connection1);
            LOG.info("local consume of: " + (message1 != null ? message1.getMessageId() : " null"));
            connection1.send((Command)this.createAck(consumerInfo1, message1, 1, (byte)4));
        }
        Assert.assertEquals((long)receiveNumMessages, (long)counter);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] result = BrokerNetworkWithStuckMessagesTest.this.browseQueueWithJmx(BrokerNetworkWithStuckMessagesTest.this.remoteBroker);
                return 0 == result.length;
            }
        });
        messages = this.browseQueueWithJmx(this.remoteBroker);
        Assert.assertEquals((long)0L, (long)messages.length);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Object[] result = BrokerNetworkWithStuckMessagesTest.this.browseQueueWithJmx(BrokerNetworkWithStuckMessagesTest.this.localBroker);
                return 0 == result.length;
            }
        });
        messages = this.browseQueueWithJmx(this.localBroker);
        Assert.assertEquals((long)0L, (long)messages.length);
        connection2.send((Command)consumerInfo3.createRemoveCommand());
        connection1.stop();
        connection2.stop();
        connection3.stop();
    }

    protected BrokerService createBroker() throws Exception {
        this.localBroker = new BrokerService();
        this.localBroker.setBrokerName("localhost");
        this.localBroker.setUseJmx(true);
        this.localBroker.setPersistenceAdapter(null);
        this.localBroker.setPersistent(false);
        this.connector = this.createConnector();
        this.localBroker.addConnector(this.connector);
        this.configureBroker(this.localBroker);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        this.localBroker.getManagementContext().setConnectorPort(2221);
        this.brokers.put(this.localBroker.getBrokerName(), this.localBroker);
        return this.localBroker;
    }

    private void configureBroker(BrokerService broker) {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(0L);
        ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory();
        filterFactory.setReplayWhenNoConsumers(true);
        defaultEntry.setNetworkBridgeFilterFactory((NetworkBridgeFilterFactory)filterFactory);
        policyMap.setDefaultEntry(defaultEntry);
        broker.setDestinationPolicy(policyMap);
    }

    protected BrokerService createRemoteBroker() throws Exception {
        this.remoteBroker = new BrokerService();
        this.remoteBroker.setBrokerName("remotehost");
        this.remoteBroker.setUseJmx(true);
        this.remoteBroker.setPersistenceAdapter(null);
        this.remoteBroker.setPersistent(false);
        this.remoteConnector = this.createRemoteConnector();
        this.remoteBroker.addConnector(this.remoteConnector);
        this.configureBroker(this.remoteBroker);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.remoteBroker.getManagementContext().setConnectorPort(2222);
        this.brokers.put(this.remoteBroker.getBrokerName(), this.remoteBroker);
        return this.remoteBroker;
    }

    protected BrokerService createSecondRemoteBroker() throws Exception {
        this.secondRemoteBroker = new BrokerService();
        this.secondRemoteBroker.setBrokerName("secondRemotehost");
        this.secondRemoteBroker.setUseJmx(false);
        this.secondRemoteBroker.setPersistenceAdapter(null);
        this.secondRemoteBroker.setPersistent(false);
        this.secondRemoteConnector = this.createSecondRemoteConnector();
        this.secondRemoteBroker.addConnector(this.secondRemoteConnector);
        this.configureBroker(this.secondRemoteBroker);
        this.secondRemoteBroker.start();
        this.secondRemoteBroker.waitUntilStarted();
        this.brokers.put(this.secondRemoteBroker.getBrokerName(), this.secondRemoteBroker);
        return this.secondRemoteBroker;
    }

    protected Transport createTransport() throws Exception {
        Transport transport = TransportFactory.connect((URI)this.connector.getServer().getConnectURI());
        return transport;
    }

    protected Transport createRemoteTransport() throws Exception {
        Transport transport = TransportFactory.connect((URI)this.remoteConnector.getServer().getConnectURI());
        return transport;
    }

    protected Transport createSecondRemoteTransport() throws Exception {
        Transport transport = TransportFactory.connect((URI)this.secondRemoteConnector.getServer().getConnectURI());
        return transport;
    }

    protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
        return new TransportConnector(TransportFactory.bind((URI)new URI(this.getLocalURI())));
    }

    protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
        return new TransportConnector(TransportFactory.bind((URI)new URI(this.getRemoteURI())));
    }

    protected TransportConnector createSecondRemoteConnector() throws Exception, IOException, URISyntaxException {
        return new TransportConnector(TransportFactory.bind((URI)new URI(this.getSecondRemoteURI())));
    }

    protected String getRemoteURI() {
        return "vm://remotehost";
    }

    protected String getSecondRemoteURI() {
        return "vm://secondRemotehost";
    }

    protected String getLocalURI() {
        return "vm://localhost";
    }

    protected StubConnection createConnection() throws Exception {
        Transport transport = TransportFactory.connect((URI)this.connector.getServer().getConnectURI());
        StubConnection connection = new StubConnection(transport);
        this.connections.add(connection);
        return connection;
    }

    protected StubConnection createRemoteConnection() throws Exception {
        Transport transport = TransportFactory.connect((URI)this.remoteConnector.getServer().getConnectURI());
        StubConnection connection = new StubConnection(transport);
        this.connections.add(connection);
        return connection;
    }

    protected StubConnection createSecondRemoteConnection() throws Exception {
        Transport transport = TransportFactory.connect((URI)this.secondRemoteConnector.getServer().getConnectURI());
        StubConnection connection = new StubConnection(transport);
        this.connections.add(connection);
        return connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
        Object[] messages = null;
        Connection connection = null;
        Session session = null;
        try {
            URI brokerUri = this.connector.getUri();
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, 1);
            Queue destination = session.createQueue(this.queueName);
            QueueBrowser browser = session.createBrowser(destination);
            ArrayList list = new ArrayList();
            Enumeration enumn = browser.getEnumeration();
            while (enumn.hasMoreElements()) {
                list.add(enumn.nextElement());
            }
            messages = list.toArray();
        }
        finally {
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
        LOG.info("+Browsed with JMS: " + messages.length);
        return messages;
    }

    private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
        Hashtable<String, String> params = new Hashtable<String, String>();
        params.put("brokerName", broker.getBrokerName());
        params.put("type", "Broker");
        params.put("destinationType", "Queue");
        params.put("destinationName", this.queueName);
        ObjectName queueObjectName = ObjectName.getInstance(this.amqDomain, params);
        ManagementContext mgmtCtx = broker.getManagementContext();
        QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true);
        Object[] messages = queueView.browse();
        LOG.info("+Browsed with JMX: " + messages.length);
        return messages;
    }

    protected ConnectionInfo createConnectionInfo() throws Exception {
        ConnectionInfo info = new ConnectionInfo();
        info.setConnectionId(new ConnectionId("connection:" + ++this.idGenerator));
        info.setClientId(info.getConnectionId().getValue());
        return info;
    }

    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
        SessionInfo info = new SessionInfo(connectionInfo, ++this.idGenerator);
        return info;
    }

    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
        ProducerInfo info = new ProducerInfo(sessionInfo, ++this.idGenerator);
        return info;
    }

    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++this.idGenerator);
        info.setBrowser(false);
        info.setDestination(destination);
        info.setPrefetchSize(1000);
        info.setDispatchAsync(false);
        return info;
    }

    protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
        DestinationInfo info = new DestinationInfo();
        info.setConnectionId(connectionInfo.getConnectionId());
        info.setOperationType((byte)0);
        info.setDestination(ActiveMQDestination.createDestination((String)(info.getConnectionId() + ":" + ++this.tempDestGenerator), (byte)destinationType));
        return info;
    }

    protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception {
        if ((destinationType & 4) != 0) {
            DestinationInfo info = this.createTempDestinationInfo(connectionInfo1, destinationType);
            connection.send((Command)info);
            return info.getDestination();
        }
        return ActiveMQDestination.createDestination((String)this.queueName, (byte)destinationType);
    }

    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
        Message message = this.createMessage(producerInfo, destination);
        message.setPersistent(deliveryMode == 2);
        return message;
    }

    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setMessageId(new MessageId(producerInfo, (long)(++this.msgIdGenerator)));
        message.setDestination(destination);
        message.setPersistent(false);
        try {
            message.setText("Test Message Payload.");
        }
        catch (MessageNotWriteableException e) {
            // empty catch block
        }
        return message;
    }

    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
        MessageAck ack = new MessageAck();
        ack.setAckType(ackType);
        ack.setConsumerId(consumerInfo.getConsumerId());
        ack.setDestination(msg.getDestination());
        ack.setLastMessageId(msg.getMessageId());
        ack.setMessageCount(count);
        return ack;
    }

    public Message receiveMessage(StubConnection connection) throws InterruptedException {
        return this.receiveMessage(connection, this.maxWait);
    }

    public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
        Object o;
        do {
            if ((o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS)) != null) continue;
            return null;
        } while (!(o instanceof MessageDispatch));
        MessageDispatch dispatch = (MessageDispatch)o;
        if (dispatch.getMessage() == null) {
            return null;
        }
        dispatch.setMessage(dispatch.getMessage().copy());
        dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
        return dispatch.getMessage();
    }
}

