/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.server.impl;

import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQPropertyConversionException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.MessageReferenceImpl;
import org.hornetq.core.server.impl.ScheduledDeliveryHandlerImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.ReferenceCounter;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
import org.junit.Assert;
import org.junit.Test;

public class ScheduledDeliveryHandlerTest
extends Assert {
    @Test
    public void testScheduleRandom() throws Exception {
        ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
        long nextMessage = 0L;
        long NUMBER_OF_SEQUENCES = 100000L;
        int i = 0;
        while ((long)i < NUMBER_OF_SEQUENCES) {
            int numberOfMessages = RandomUtil.randomInt() % 10;
            if (numberOfMessages == 0) {
                numberOfMessages = 1;
            }
            long nextScheduledTime = RandomUtil.randomPositiveLong();
            for (int j = 0; j < numberOfMessages; ++j) {
                boolean tail = RandomUtil.randomBoolean();
                this.addMessage(handler, nextMessage++, nextScheduledTime, tail);
            }
            ++i;
        }
        this.debugList(true, handler, nextMessage);
    }

    @Test
    public void testScheduleSameTimeHeadAndTail() throws Exception {
        int i;
        ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
        long time = System.currentTimeMillis() + 10000L;
        for (i = 10001; i < 20000; ++i) {
            this.addMessage(handler, i, time, true);
        }
        this.addMessage(handler, 10000L, time, false);
        time = System.currentTimeMillis() + 5000L;
        for (i = 1; i < 10000; ++i) {
            this.addMessage(handler, i, time, true);
        }
        this.addMessage(handler, 0L, time, false);
        this.debugList(true, handler, 20000L);
        this.validateSequence(handler);
    }

    @Test
    public void testScheduleFixedSample() throws Exception {
        ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
        this.addMessage(handler, 0L, 48L, true);
        this.addMessage(handler, 1L, 75L, true);
        this.addMessage(handler, 2L, 56L, true);
        this.addMessage(handler, 3L, 7L, false);
        this.addMessage(handler, 4L, 69L, true);
        this.debugList(true, handler, 5L);
    }

    @Test
    public void testScheduleWithAddHeads() throws Exception {
        ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
        this.addMessage(handler, 0L, 1L, true);
        this.addMessage(handler, 1L, 2L, true);
        this.addMessage(handler, 2L, 3L, true);
        this.addMessage(handler, 3L, 3L, true);
        this.addMessage(handler, 4L, 4L, true);
        this.addMessage(handler, 10L, 5L, false);
        this.addMessage(handler, 9L, 5L, false);
        this.addMessage(handler, 8L, 5L, false);
        this.addMessage(handler, 7L, 5L, false);
        this.addMessage(handler, 6L, 5L, false);
        this.addMessage(handler, 5L, 5L, false);
        this.validateSequence(handler);
    }

    @Test
    public void testScheduleFixedSampleTailAndHead() throws Exception {
        ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
        this.addMessage(handler, 1L, 48L, true);
        this.addMessage(handler, 2L, 48L, true);
        this.addMessage(handler, 3L, 48L, true);
        this.addMessage(handler, 4L, 48L, true);
        this.addMessage(handler, 5L, 48L, true);
        this.addMessage(handler, 0L, 48L, false);
        this.addMessage(handler, 13L, 59L, true);
        this.addMessage(handler, 14L, 59L, true);
        this.addMessage(handler, 15L, 59L, true);
        this.addMessage(handler, 16L, 59L, true);
        this.addMessage(handler, 17L, 59L, true);
        this.addMessage(handler, 12L, 59L, false);
        this.addMessage(handler, 7L, 49L, true);
        this.addMessage(handler, 8L, 49L, true);
        this.addMessage(handler, 9L, 49L, true);
        this.addMessage(handler, 10L, 49L, true);
        this.addMessage(handler, 11L, 49L, true);
        this.addMessage(handler, 6L, 49L, false);
        this.validateSequence(handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScheduleNow() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(50);
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
        try {
            for (int i = 0; i < 100; ++i) {
                this.internalSchedule(executor, scheduler);
            }
        }
        finally {
            scheduler.shutdownNow();
            executor.shutdownNow();
        }
    }

    private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception {
        int NUMBER_OF_MESSAGES = 200;
        int NUMBER_OF_THREADS = 20;
        final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl((ScheduledExecutorService)scheduler);
        final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(200 * NUMBER_OF_THREADS);
        final long now = System.currentTimeMillis();
        final CountDownLatch latchDone = new CountDownLatch(NUMBER_OF_THREADS);
        final AtomicInteger error = new AtomicInteger(0);
        for (int i = 0; i < NUMBER_OF_THREADS; ++i) {
            class ProducerThread
            implements Runnable {
                ProducerThread() {
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 200; ++i) {
                            ScheduledDeliveryHandlerTest.this.checkAndSchedule(handler, i, now, false, fakeQueue);
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        error.incrementAndGet();
                    }
                    finally {
                        latchDone.countDown();
                    }
                }
            }
            executor.execute(new ProducerThread());
        }
        ScheduledDeliveryHandlerTest.assertTrue((boolean)latchDone.await(1L, TimeUnit.MINUTES));
        ScheduledDeliveryHandlerTest.assertEquals((long)0L, (long)error.get());
        if (!fakeQueue.waitCompletion(2L, TimeUnit.SECONDS)) {
            ScheduledDeliveryHandlerTest.fail((String)("Couldn't complete queue.add, expected 200, still missing " + fakeQueue.expectedElements.toString()));
        }
    }

    private void validateSequence(ScheduledDeliveryHandlerImpl handler) {
        long lastSequence = -1L;
        for (MessageReference ref : handler.getScheduledReferences()) {
            ScheduledDeliveryHandlerTest.assertEquals((long)(lastSequence + 1L), (long)ref.getMessage().getMessageID());
            lastSequence = ref.getMessage().getMessageID();
        }
    }

    private void addMessage(ScheduledDeliveryHandlerImpl handler, long nextMessageID, long nextScheduledTime, boolean tail) {
        MessageReferenceImpl refImpl = new MessageReferenceImpl((ServerMessage)new FakeMessage(nextMessageID), null);
        refImpl.setScheduledDeliveryTime(nextScheduledTime);
        handler.addInPlace(nextScheduledTime, (MessageReference)refImpl, tail);
    }

    private void checkAndSchedule(ScheduledDeliveryHandlerImpl handler, long nextMessageID, long nextScheduledTime, boolean tail, Queue queue) {
        MessageReferenceImpl refImpl = new MessageReferenceImpl((ServerMessage)new FakeMessage(nextMessageID), queue);
        refImpl.setScheduledDeliveryTime(nextScheduledTime);
        handler.checkAndSchedule((MessageReference)refImpl, tail);
    }

    private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) {
        List refs = handler.getScheduledReferences();
        HashSet<Long> messages = new HashSet<Long>();
        long lastTime = -1L;
        for (MessageReference ref : refs) {
            ScheduledDeliveryHandlerTest.assertFalse((boolean)messages.contains(ref.getMessage().getMessageID()));
            messages.add(ref.getMessage().getMessageID());
            if (fail) {
                ScheduledDeliveryHandlerTest.assertTrue((ref.getScheduledDeliveryTime() >= lastTime ? 1 : 0) != 0);
            } else if (ref.getScheduledDeliveryTime() < lastTime) {
                System.out.println("^^^fail at " + ref.getScheduledDeliveryTime());
            }
            lastTime = ref.getScheduledDeliveryTime();
        }
        for (long i = 0L; i < numberOfExpectedMessages; ++i) {
            ScheduledDeliveryHandlerTest.assertTrue((boolean)messages.contains(i));
        }
    }

    public class FakeQueueForScheduleUnitTest
    implements Queue {
        final CountDownLatch expectedElements;
        LinkedList<MessageReference> messages = new LinkedList();

        public void unproposed(SimpleString groupID) {
        }

        public FakeQueueForScheduleUnitTest(int expectedElements) {
            this.expectedElements = new CountDownLatch(expectedElements);
        }

        public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception {
            return this.expectedElements.await(timeout, timeUnit);
        }

        public SimpleString getName() {
            return null;
        }

        public long getID() {
            return 0L;
        }

        public Filter getFilter() {
            return null;
        }

        public PageSubscription getPageSubscription() {
            return null;
        }

        public boolean isDurable() {
            return false;
        }

        public boolean isTemporary() {
            return false;
        }

        public void addConsumer(Consumer consumer) throws Exception {
        }

        public void removeConsumer(Consumer consumer) {
        }

        public int getConsumerCount() {
            return 0;
        }

        public void setConsumersRefCount(HornetQServer server) {
        }

        public ReferenceCounter getConsumersRefCount() {
            return null;
        }

        public void reload(MessageReference ref) {
        }

        public void addTail(MessageReference ref) {
        }

        public void addTail(MessageReference ref, boolean direct) {
        }

        public void addHead(MessageReference ref) {
        }

        public void addHead(List<MessageReference> refs) {
            for (MessageReference ref : refs) {
                this.addFirst(ref);
            }
        }

        private void addFirst(MessageReference ref) {
            this.expectedElements.countDown();
            this.messages.addFirst(ref);
        }

        public void acknowledge(MessageReference ref) throws Exception {
        }

        public void acknowledge(Transaction tx, MessageReference ref) throws Exception {
        }

        public void reacknowledge(Transaction tx, MessageReference ref) throws Exception {
        }

        public void cancel(Transaction tx, MessageReference ref) {
        }

        public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {
        }

        public void cancel(MessageReference reference, long timeBase) throws Exception {
        }

        public void deliverAsync() {
        }

        public void forceDelivery() {
        }

        public void deleteQueue() throws Exception {
        }

        public void deleteQueue(boolean removeConsumers) throws Exception {
        }

        public void destroyPaging() throws Exception {
        }

        public long getMessageCount() {
            return 0L;
        }

        public int getDeliveringCount() {
            return 0;
        }

        public void referenceHandled() {
        }

        public int getScheduledCount() {
            return 0;
        }

        public List<MessageReference> getScheduledMessages() {
            return null;
        }

        public Map<String, List<MessageReference>> getDeliveringMessages() {
            return null;
        }

        public long getMessagesAdded() {
            return 0L;
        }

        public long getMessagesAcknowledged() {
            return 0L;
        }

        public MessageReference removeReferenceWithID(long id) throws Exception {
            return null;
        }

        public MessageReference getReference(long id) {
            return null;
        }

        public int deleteAllReferences() throws Exception {
            return 0;
        }

        public int deleteAllReferences(int flushLimit) throws Exception {
            return 0;
        }

        public boolean deleteReference(long messageID) throws Exception {
            return false;
        }

        public int deleteMatchingReferences(Filter filter) throws Exception {
            return 0;
        }

        public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
            return 0;
        }

        public boolean expireReference(long messageID) throws Exception {
            return false;
        }

        public int expireReferences(Filter filter) throws Exception {
            return 0;
        }

        public void expireReferences() throws Exception {
        }

        public void expire(MessageReference ref) throws Exception {
        }

        public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
            return false;
        }

        public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
            return 0;
        }

        public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception {
            return false;
        }

        public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception {
            return 0;
        }

        public boolean moveReference(long messageID, SimpleString toAddress) throws Exception {
            return false;
        }

        public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception {
            return false;
        }

        public int moveReferences(Filter filter, SimpleString toAddress) throws Exception {
            return 0;
        }

        public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates) throws Exception {
            return 0;
        }

        public void addRedistributor(long delay) {
        }

        public void cancelRedistributor() throws Exception {
        }

        public boolean hasMatchingConsumer(ServerMessage message) {
            return false;
        }

        public Collection<Consumer> getConsumers() {
            return null;
        }

        public boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception {
            return false;
        }

        public LinkedListIterator<MessageReference> iterator() {
            return null;
        }

        public LinkedListIterator<MessageReference> totalIterator() {
            return null;
        }

        public SimpleString getExpiryAddress() {
            return null;
        }

        public void pause() {
        }

        public void resume() {
        }

        public boolean isPaused() {
            return false;
        }

        public Executor getExecutor() {
            return null;
        }

        public void resetAllIterators() {
        }

        public boolean flushExecutor() {
            return false;
        }

        public void close() throws Exception {
        }

        public boolean isDirectDeliver() {
            return false;
        }

        public SimpleString getAddress() {
            return null;
        }

        public boolean isInternalQueue() {
            return false;
        }

        public void setInternalQueue(boolean internalQueue) {
        }

        public void resetMessagesAdded() {
        }

        public void resetMessagesAcknowledged() {
        }

        public void incrementMesssagesAdded() {
        }

        public List<MessageReference> cancelScheduledMessages() {
            return null;
        }

        public void route(ServerMessage message, RoutingContext context) throws Exception {
        }

        public void routeWithAck(ServerMessage message, RoutingContext context) {
        }

        public void postAcknowledge(MessageReference ref) {
        }

        public float getRate() {
            return 0.0f;
        }
    }

    class FakeMessage
    implements ServerMessage {
        final long id;

        public FakeMessage(long id) {
            this.id = id;
        }

        public FakeMessage setMessageID(long id) {
            return this;
        }

        public long getMessageID() {
            return this.id;
        }

        public MessageReference createReference(Queue queue) {
            return null;
        }

        public void forceAddress(SimpleString address) {
        }

        public int incrementRefCount() throws Exception {
            return 0;
        }

        public int decrementRefCount() throws Exception {
            return 0;
        }

        public int incrementDurableRefCount() {
            return 0;
        }

        public int decrementDurableRefCount() {
            return 0;
        }

        public ServerMessage copy(long newID) {
            return null;
        }

        public void finishCopy() throws Exception {
        }

        public ServerMessage copy() {
            return null;
        }

        public int getMemoryEstimate() {
            return 0;
        }

        public int getRefCount() {
            return 0;
        }

        public ServerMessage makeCopyForExpiryOrDLA(long newID, MessageReference originalReference, boolean expiry, boolean copyOriginalHeaders) throws Exception {
            return null;
        }

        public void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry) {
        }

        public void setPagingStore(PagingStore store) {
        }

        public PagingStore getPagingStore() {
            return null;
        }

        public boolean hasInternalProperties() {
            return false;
        }

        public boolean storeIsPaging() {
            return false;
        }

        public void encodeMessageIDToBuffer() {
        }

        public byte[] getDuplicateIDBytes() {
            return new byte[0];
        }

        public Object getDuplicateProperty() {
            return null;
        }

        public void encode(HornetQBuffer buffer) {
        }

        public void decode(HornetQBuffer buffer) {
        }

        public void decodeFromBuffer(HornetQBuffer buffer) {
        }

        public int getEndOfMessagePosition() {
            return 0;
        }

        public int getEndOfBodyPosition() {
            return 0;
        }

        public void checkCopy() {
        }

        public void bodyChanged() {
        }

        public void resetCopied() {
        }

        public boolean isServerMessage() {
            return false;
        }

        public HornetQBuffer getEncodedBuffer() {
            return null;
        }

        public int getHeadersAndPropertiesEncodeSize() {
            return 0;
        }

        public HornetQBuffer getWholeBuffer() {
            return null;
        }

        public void encodeHeadersAndProperties(HornetQBuffer buffer) {
        }

        public void decodeHeadersAndProperties(HornetQBuffer buffer) {
        }

        public BodyEncoder getBodyEncoder() throws HornetQException {
            return null;
        }

        public InputStream getBodyInputStream() {
            return null;
        }

        public void setAddressTransient(SimpleString address) {
        }

        public TypedProperties getTypedProperties() {
            return null;
        }

        public UUID getUserID() {
            return null;
        }

        public FakeMessage setUserID(UUID userID) {
            return this;
        }

        public SimpleString getAddress() {
            return null;
        }

        public Message setAddress(SimpleString address) {
            return null;
        }

        public byte getType() {
            return 0;
        }

        public boolean isDurable() {
            return false;
        }

        public FakeMessage setDurable(boolean durable) {
            return this;
        }

        public long getExpiration() {
            return 0L;
        }

        public boolean isExpired() {
            return false;
        }

        public FakeMessage setExpiration(long expiration) {
            return this;
        }

        public long getTimestamp() {
            return 0L;
        }

        public FakeMessage setTimestamp(long timestamp) {
            return this;
        }

        public byte getPriority() {
            return 0;
        }

        public FakeMessage setPriority(byte priority) {
            return this;
        }

        public int getEncodeSize() {
            return 0;
        }

        public boolean isLargeMessage() {
            return false;
        }

        public HornetQBuffer getBodyBuffer() {
            return null;
        }

        public HornetQBuffer getBodyBufferCopy() {
            return null;
        }

        public Message putBooleanProperty(SimpleString key, boolean value) {
            return null;
        }

        public Message putBooleanProperty(String key, boolean value) {
            return null;
        }

        public Message putByteProperty(SimpleString key, byte value) {
            return null;
        }

        public Message putByteProperty(String key, byte value) {
            return null;
        }

        public Message putBytesProperty(SimpleString key, byte[] value) {
            return null;
        }

        public Message putBytesProperty(String key, byte[] value) {
            return null;
        }

        public Message putShortProperty(SimpleString key, short value) {
            return null;
        }

        public Message putShortProperty(String key, short value) {
            return null;
        }

        public Message putCharProperty(SimpleString key, char value) {
            return null;
        }

        public Message putCharProperty(String key, char value) {
            return null;
        }

        public Message putIntProperty(SimpleString key, int value) {
            return null;
        }

        public Message putIntProperty(String key, int value) {
            return null;
        }

        public Message putLongProperty(SimpleString key, long value) {
            return null;
        }

        public Message putLongProperty(String key, long value) {
            return null;
        }

        public Message putFloatProperty(SimpleString key, float value) {
            return null;
        }

        public Message putFloatProperty(String key, float value) {
            return null;
        }

        public Message putDoubleProperty(SimpleString key, double value) {
            return null;
        }

        public Message putDoubleProperty(String key, double value) {
            return null;
        }

        public Message putStringProperty(SimpleString key, SimpleString value) {
            return null;
        }

        public Message putStringProperty(String key, String value) {
            return null;
        }

        public Message putObjectProperty(SimpleString key, Object value) throws HornetQPropertyConversionException {
            return null;
        }

        public Message putObjectProperty(String key, Object value) throws HornetQPropertyConversionException {
            return null;
        }

        public Object removeProperty(SimpleString key) {
            return null;
        }

        public Object removeProperty(String key) {
            return null;
        }

        public boolean containsProperty(SimpleString key) {
            return false;
        }

        public boolean containsProperty(String key) {
            return false;
        }

        public Boolean getBooleanProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Boolean getBooleanProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public Byte getByteProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Byte getByteProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public Double getDoubleProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Double getDoubleProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public Integer getIntProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Integer getIntProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public Long getLongProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Long getLongProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public Object getObjectProperty(SimpleString key) {
            return null;
        }

        public Object getObjectProperty(String key) {
            return null;
        }

        public Short getShortProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Short getShortProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public Float getFloatProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public Float getFloatProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public String getStringProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public String getStringProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public SimpleString getSimpleStringProperty(SimpleString key) throws HornetQPropertyConversionException {
            return null;
        }

        public SimpleString getSimpleStringProperty(String key) throws HornetQPropertyConversionException {
            return null;
        }

        public byte[] getBytesProperty(SimpleString key) throws HornetQPropertyConversionException {
            return new byte[0];
        }

        public byte[] getBytesProperty(String key) throws HornetQPropertyConversionException {
            return new byte[0];
        }

        public Set<SimpleString> getPropertyNames() {
            return null;
        }

        public Map<String, Object> toMap() {
            return null;
        }

        public FakeMessage writeBodyBufferBytes(byte[] bytes) {
            return this;
        }

        public FakeMessage writeBodyBufferString(String string) {
            return this;
        }
    }
}

