/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usecases.DurableSubscriptionOfflineTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ActiveDurableSubscriptionBrowseExpireTest
extends DurableSubscriptionOfflineTestBase {
    private boolean enableExpiration = true;

    public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) {
        this.keepDurableSubsActive = true;
        this.enableExpiration = enableExpiration;
    }

    @Parameterized.Parameters(name="enableExpiration_{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({false}, {true});
    }

    @Override
    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
        return super.setPersistenceAdapter(broker, TestSupport.PersistenceAdapterChoice.MEM);
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + this.getName(true));
        connectionFactory.setWatchTopicAdvisories(false);
        return connectionFactory;
    }

    @Test(timeout=60000L)
    public void testBrowseExpireActiveSub() throws Exception {
        int numberOfMessages = 10;
        this.broker.setEnableMessageExpirationOnActiveDurableSubs(this.enableExpiration);
        Connection con = this.createConnection("consumer");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId");
        long timeStamp = System.currentTimeMillis();
        this.sendMessages(10, timeStamp);
        ObjectName[] subs = this.broker.getAdminView().getDurableTopicSubscribers();
        Assert.assertEquals((long)1L, (long)subs.length);
        ObjectName subName = subs[0];
        DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
        Assert.assertEquals((Object)true, (Object)sub.isActive());
        CompositeData[] data = sub.browse();
        Assert.assertNotNull((Object)data);
        Assert.assertEquals((long)10L, (long)data.length);
        org.apache.activemq.broker.region.Destination dest = this.broker.getDestination((ActiveMQDestination)this.topic);
        Assert.assertEquals((long)0L, (long)dest.getDestinationStatistics().getExpired().getCount());
        TopicMessageStore topicStore = (TopicMessageStore)dest.getMessageStore();
        final LinkedList messagesToExpire = new LinkedList();
        topicStore.recover(new MessageRecoveryListener(){

            public boolean recoverMessage(Message message) throws Exception {
                int index = (Integer)message.getProperty("index");
                if (index % 3 == 0) {
                    messagesToExpire.add(message);
                }
                return true;
            }

            public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                return true;
            }

            public boolean hasSpace() {
                return true;
            }

            public boolean canRecoveryNextMessage() {
                return true;
            }

            public boolean isDuplicate(MessageId id) {
                return false;
            }
        });
        for (Message message : messagesToExpire) {
            message.setExpiration(timeStamp - 1L);
            topicStore.updateMessage(message);
        }
        data = sub.browse();
        Assert.assertNotNull((Object)data);
        Assert.assertEquals((long)(this.enableExpiration ? (long)messagesToExpire.size() : 0L), (long)dest.getDestinationStatistics().getExpired().getCount());
        session.close();
        con.close();
    }

    private void sendMessages(int numberOfMessages, long timeStamp) throws Exception {
        Connection con = this.createConnection("producer");
        Session session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        for (int i = 0; i < numberOfMessages; ++i) {
            javax.jms.Message message = session.createMessage();
            message.setIntProperty("index", i);
            message.setJMSTimestamp(timeStamp);
            producer.send((Destination)this.topic, message);
        }
        session.close();
        con.close();
    }
}

