/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.LockOwner;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.SubscriptionStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;

public class SubscriptionAddRemoveQueueTest
extends TestCase {
    Queue queue;
    ConsumerInfo info = new ConsumerInfo();
    List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
    ConnectionContext context = new ConnectionContext();
    ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
    ProducerInfo producerInfo = new ProducerInfo();
    ProducerState producerState = new ProducerState(this.producerInfo);
    ActiveMQDestination destination = new ActiveMQQueue("TEST");
    int numSubscriptions = 1000;
    boolean working = true;
    int senders = 20;

    public void setUp() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.start();
        DestinationStatistics parentStats = new DestinationStatistics();
        parentStats.setEnabled(true);
        TaskRunnerFactory taskFactory = new TaskRunnerFactory();
        MessageStore store = null;
        this.info.setDestination(this.destination);
        this.info.setPrefetchSize(100);
        this.producerBrokerExchange.setProducerState(this.producerState);
        this.producerBrokerExchange.setConnectionContext(this.context);
        this.queue = new Queue(brokerService, this.destination, store, parentStats, taskFactory);
        this.queue.initialize();
    }

    public void testNoDispatchToRemovedConsumers() throws Exception {
        final AtomicInteger producerId = new AtomicInteger();
        Runnable sender = new Runnable(){

            @Override
            public void run() {
                AtomicInteger id = new AtomicInteger();
                int producerIdAndIncrement = producerId.getAndIncrement();
                while (SubscriptionAddRemoveQueueTest.this.working) {
                    try {
                        ActiveMQMessage msg = new ActiveMQMessage();
                        msg.setDestination(SubscriptionAddRemoveQueueTest.this.destination);
                        msg.setMessageId(new MessageId(producerIdAndIncrement + ":0:" + id.getAndIncrement()));
                        SubscriptionAddRemoveQueueTest.this.queue.send(SubscriptionAddRemoveQueueTest.this.producerBrokerExchange, (Message)msg);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        TestCase.fail((String)("unexpected exception in sendMessage, ex:" + e));
                    }
                }
            }
        };
        Runnable subRemover = new Runnable(){

            @Override
            public void run() {
                for (Subscription subscription : SubscriptionAddRemoveQueueTest.this.subs) {
                    try {
                        SubscriptionAddRemoveQueueTest.this.queue.removeSubscription(SubscriptionAddRemoveQueueTest.this.context, subscription, 0L);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        TestCase.fail((String)("unexpected exception in removeSubscription, ex:" + e));
                    }
                }
            }
        };
        for (int i = 0; i < this.numSubscriptions; ++i) {
            SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
            this.subs.add(sub);
            this.queue.addSubscription(this.context, (Subscription)sub);
        }
        SubscriptionAddRemoveQueueTest.assertEquals((String)"there are X subscriptions", (long)this.numSubscriptions, (long)this.queue.getDestinationStatistics().getConsumers().getCount());
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < this.senders; ++i) {
            executor.submit(sender);
        }
        Thread.sleep(1000L);
        for (SimpleImmediateDispatchSubscription sub : this.subs) {
            SubscriptionAddRemoveQueueTest.assertTrue((String)"There are some locked messages in the subscription", (boolean)this.hasSomeLocks(sub.dispatched));
        }
        Future<?> result = executor.submit(subRemover);
        result.get();
        this.working = false;
        SubscriptionAddRemoveQueueTest.assertEquals((String)"there are no subscriptions", (long)0L, (long)this.queue.getDestinationStatistics().getConsumers().getCount());
        for (SimpleImmediateDispatchSubscription sub : this.subs) {
            SubscriptionAddRemoveQueueTest.assertTrue((String)"There are no locked messages in any removed subscriptions", (!this.hasSomeLocks(sub.dispatched) ? 1 : 0) != 0);
        }
    }

    private boolean hasSomeLocks(List<MessageReference> dispatched) {
        boolean hasLock = false;
        for (MessageReference mr : dispatched) {
            QueueMessageReference qmr = (QueueMessageReference)mr;
            if (qmr.getLockOwner() == null) continue;
            hasLock = true;
            break;
        }
        return hasLock;
    }

    public class SimpleImmediateDispatchSubscription
    implements Subscription,
    LockOwner {
        private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
        List<MessageReference> dispatched = Collections.synchronizedList(new ArrayList());

        public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
        }

        public void add(MessageReference node) throws Exception {
            QueueMessageReference qmr = (QueueMessageReference)node;
            qmr.lock((LockOwner)this);
            this.dispatched.add((MessageReference)qmr);
        }

        public ConnectionContext getContext() {
            return null;
        }

        public int getCursorMemoryHighWaterMark() {
            return 0;
        }

        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
        }

        public boolean isSlowConsumer() {
            return false;
        }

        public void unmatched(MessageReference node) throws IOException {
        }

        public long getTimeOfLastMessageAck() {
            return 0L;
        }

        public long getConsumedCount() {
            return 0L;
        }

        public void incrementConsumedCount() {
        }

        public void resetConsumedCount() {
        }

        public void add(ConnectionContext context, Destination destination) throws Exception {
        }

        public void destroy() {
        }

        public void gc() {
        }

        public ConsumerInfo getConsumerInfo() {
            return SubscriptionAddRemoveQueueTest.this.info;
        }

        public long getDequeueCounter() {
            return 0L;
        }

        public long getDispatchedCounter() {
            return 0L;
        }

        public int getDispatchedQueueSize() {
            return 0;
        }

        public long getEnqueueCounter() {
            return 0L;
        }

        public int getInFlightSize() {
            return 0;
        }

        public int getInFlightUsage() {
            return 0;
        }

        public ObjectName getObjectName() {
            return null;
        }

        public int getPendingQueueSize() {
            return 0;
        }

        public int getPrefetchSize() {
            return 0;
        }

        public String getSelector() {
            return null;
        }

        public boolean isBrowser() {
            return false;
        }

        public boolean isFull() {
            return false;
        }

        public boolean isHighWaterMark() {
            return false;
        }

        public boolean isLowWaterMark() {
            return false;
        }

        public boolean isRecoveryRequired() {
            return false;
        }

        public boolean isSlave() {
            return false;
        }

        public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
            return true;
        }

        public boolean matches(ActiveMQDestination destination) {
            return false;
        }

        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
        }

        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
            return null;
        }

        public boolean isWildcard() {
            return false;
        }

        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
            return new ArrayList<MessageReference>(this.dispatched);
        }

        public void setObjectName(ObjectName objectName) {
        }

        public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
        }

        public void updateConsumerPrefetch(int newPrefetch) {
        }

        public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
            return false;
        }

        public ActiveMQDestination getActiveMQDestination() {
            return null;
        }

        public int getLockPriority() {
            return 0;
        }

        public boolean isLockExclusive() {
            return false;
        }

        public void addDestination(Destination destination) {
        }

        public void removeDestination(Destination destination) {
        }

        public int countBeforeFull() {
            return 10;
        }

        public SubscriptionStatistics getSubscriptionStatistics() {
            return this.subscriptionStatistics;
        }

        public long getInFlightMessageSize() {
            return this.subscriptionStatistics.getInflightMessageSize().getTotalSize();
        }
    }
}

