/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.postoffice.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.api.core.HornetQAddressFullException;
import org.hornetq.api.core.HornetQDuplicateIdException;
import org.hornetq.api.core.HornetQInterruptedException;
import org.hornetq.api.core.HornetQNonExistentQueueException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.CoreNotificationType;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.AddressManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
import org.hornetq.core.postoffice.impl.BindingsImpl;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.hornetq.core.postoffice.impl.SimpleAddressManager;
import org.hornetq.core.postoffice.impl.WildcardAddressManager;
import org.hornetq.core.server.HornetQMessageBundle;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;

public class PostOfficeImpl
implements PostOffice,
NotificationListener,
BindingsFactory {
    private static final boolean isTrace = HornetQServerLogger.LOGGER.isTraceEnabled();
    public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
    public static final SimpleString BRIDGE_CACHE_STR = new SimpleString("BRIDGE.");
    private final AddressManager addressManager;
    private final QueueFactory queueFactory;
    private final StorageManager storageManager;
    private final PagingManager pagingManager;
    private volatile boolean started;
    private final ManagementService managementService;
    private Reaper reaperRunnable;
    private volatile Thread reaperThread;
    private final long reaperPeriod;
    private final int reaperPriority;
    private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
    private final int idCacheSize;
    private final boolean persistIDCache;
    private final Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
    private final Object notificationLock = new Object();
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
    private final HornetQServer server;

    public PostOfficeImpl(HornetQServer server, StorageManager storageManager, PagingManager pagingManager, QueueFactory bindableFactory, ManagementService managementService, long reaperPeriod, int reaperPriority, boolean enableWildCardRouting, int idCacheSize, boolean persistIDCache, HierarchicalRepository<AddressSettings> addressSettingsRepository) {
        this.storageManager = storageManager;
        this.queueFactory = bindableFactory;
        this.managementService = managementService;
        this.pagingManager = pagingManager;
        this.reaperPeriod = reaperPeriod;
        this.reaperPriority = reaperPriority;
        this.addressManager = enableWildCardRouting ? new WildcardAddressManager(this) : new SimpleAddressManager(this);
        this.idCacheSize = idCacheSize;
        this.persistIDCache = persistIDCache;
        this.addressSettingsRepository = addressSettingsRepository;
        this.server = server;
    }

    @Override
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        this.managementService.addNotificationListener(this);
        this.queueFactory.setPostOffice(this);
        this.started = true;
    }

    @Override
    public synchronized void stop() throws Exception {
        this.started = false;
        this.managementService.removeNotificationListener(this);
        if (this.reaperRunnable != null) {
            this.reaperRunnable.stop();
        }
        if (this.reaperThread != null) {
            this.reaperThread.join();
            this.reaperThread = null;
        }
        this.addressManager.clear();
        this.queueInfos.clear();
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNotification(Notification notification) {
        if (!(notification.getType() instanceof CoreNotificationType)) {
            return;
        }
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("Receiving notification : " + notification + " on server " + this.server);
        }
        Object object = this.notificationLock;
        synchronized (object) {
            CoreNotificationType type = (CoreNotificationType)notification.getType();
            switch (type) {
                case BINDING_ADDED: {
                    TypedProperties props = notification.getProperties();
                    if (!props.containsProperty(ManagementHelper.HDR_BINDING_TYPE)) {
                        throw HornetQMessageBundle.BUNDLE.bindingTypeNotSpecified();
                    }
                    Integer bindingType = props.getIntProperty(ManagementHelper.HDR_BINDING_TYPE);
                    if (bindingType == 2) {
                        return;
                    }
                    SimpleString routingName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
                    if (!props.containsProperty(ManagementHelper.HDR_BINDING_ID)) {
                        throw HornetQMessageBundle.BUNDLE.bindingIdNotSpecified();
                    }
                    long id = props.getLongProperty(ManagementHelper.HDR_BINDING_ID);
                    SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
                    if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                        throw HornetQMessageBundle.BUNDLE.distancenotSpecified();
                    }
                    int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
                    QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
                    this.queueInfos.put(clusterName, info);
                    break;
                }
                case BINDING_REMOVED: {
                    TypedProperties props = notification.getProperties();
                    if (!props.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                        throw new IllegalStateException("No cluster name");
                    }
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    QueueInfo info = this.queueInfos.remove(clusterName);
                    if (info != null) break;
                    throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
                }
                case CONSUMER_CREATED: {
                    TypedProperties props = notification.getProperties();
                    if (!props.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                        throw new IllegalStateException("No cluster name");
                    }
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
                    QueueInfo info = this.queueInfos.get(clusterName);
                    if (info == null) {
                        throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
                    }
                    info.incrementConsumers();
                    if (filterString != null) {
                        List<SimpleString> filterStrings = info.getFilterStrings();
                        if (filterStrings == null) {
                            filterStrings = new ArrayList<SimpleString>();
                            info.setFilterStrings(filterStrings);
                        }
                        filterStrings.add(filterString);
                    }
                    if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                        throw new IllegalStateException("No distance");
                    }
                    int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
                    if (distance <= 0) break;
                    SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
                    if (queueName == null) {
                        throw new IllegalStateException("No queue name");
                    }
                    Binding binding = this.getBinding(queueName);
                    if (binding == null) break;
                    Queue queue = (Queue)binding.getBindable();
                    AddressSettings addressSettings = this.addressSettingsRepository.getMatch(binding.getAddress().toString());
                    long redistributionDelay = addressSettings.getRedistributionDelay();
                    if (redistributionDelay == -1L) break;
                    queue.addRedistributor(redistributionDelay);
                    break;
                }
                case CONSUMER_CLOSED: {
                    TypedProperties props = notification.getProperties();
                    SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
                    if (clusterName == null) {
                        throw new IllegalStateException("No cluster name");
                    }
                    SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
                    QueueInfo info = this.queueInfos.get(clusterName);
                    if (info == null) {
                        return;
                    }
                    info.decrementConsumers();
                    if (filterString != null) {
                        List<SimpleString> filterStrings = info.getFilterStrings();
                        filterStrings.remove(filterString);
                    }
                    if (info.getNumberOfConsumers() != 0) break;
                    if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                        throw new IllegalStateException("No cluster name");
                    }
                    int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
                    if (distance != 0) break;
                    SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
                    if (queueName == null) {
                        throw new IllegalStateException("No queue name");
                    }
                    Binding binding = this.getBinding(queueName);
                    if (binding == null) {
                        throw new IllegalStateException("No queue " + queueName);
                    }
                    Queue queue = (Queue)binding.getBindable();
                    AddressSettings addressSettings = this.addressSettingsRepository.getMatch(binding.getAddress().toString());
                    long redistributionDelay = addressSettings.getRedistributionDelay();
                    if (redistributionDelay == -1L) break;
                    queue.addRedistributor(redistributionDelay);
                    break;
                }
            }
        }
    }

    @Override
    public synchronized void addBinding(Binding binding) throws Exception {
        this.addressManager.addBinding(binding);
        TypedProperties props = new TypedProperties();
        props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
        props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
        props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
        props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
        props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
        props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
        Filter filter = binding.getFilter();
        if (filter != null) {
            props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
        }
        String uid = UUIDGenerator.getInstance().generateStringUUID();
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("ClusterCommunication::Sending notification for addBinding " + binding + " from server " + this.server);
        }
        this.managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_ADDED, props));
    }

    @Override
    public synchronized Binding removeBinding(SimpleString uniqueName, Transaction tx2) throws Exception {
        this.addressSettingsRepository.clearCache();
        Binding binding = this.addressManager.removeBinding(uniqueName, tx2);
        if (binding == null) {
            throw new HornetQNonExistentQueueException();
        }
        if (this.addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
            this.pagingManager.deletePageStore(binding.getAddress());
            this.managementService.unregisterAddress(binding.getAddress());
            this.deleteDuplicateCache(binding.getAddress());
        }
        if (binding.getType() == BindingType.LOCAL_QUEUE) {
            this.managementService.unregisterQueue(uniqueName, binding.getAddress());
        } else if (binding.getType() == BindingType.DIVERT) {
            this.managementService.unregisterDivert(uniqueName);
        }
        if (binding.getType() != BindingType.DIVERT) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
            props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
            if (binding.getFilter() == null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, null);
            } else {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, binding.getFilter().getFilterString());
            }
            this.managementService.sendNotification(new Notification(null, CoreNotificationType.BINDING_REMOVED, props));
        }
        binding.close();
        return binding;
    }

    private void deleteDuplicateCache(SimpleString address) throws Exception {
        DuplicateIDCache cache = (DuplicateIDCache)this.duplicateIDCaches.remove(address);
        if (cache != null) {
            cache.clear();
        }
        if ((cache = (DuplicateIDCache)this.duplicateIDCaches.remove(BRIDGE_CACHE_STR.concat(address))) != null) {
            cache.clear();
        }
    }

    @Override
    public boolean isAddressBound(SimpleString address) throws Exception {
        Bindings bindings = this.getBindingsForAddress(address);
        return bindings != null && !bindings.getBindings().isEmpty();
    }

    @Override
    public Bindings getBindingsForAddress(SimpleString address) throws Exception {
        Bindings bindings = this.addressManager.getBindingsForRoutingAddress(address);
        if (bindings == null) {
            bindings = this.createBindings(address);
        }
        return bindings;
    }

    @Override
    public Bindings lookupBindingsForAddress(SimpleString address) throws Exception {
        return this.addressManager.getBindingsForRoutingAddress(address);
    }

    @Override
    public Binding getBinding(SimpleString name) {
        return this.addressManager.getBinding(name);
    }

    @Override
    public Bindings getMatchingBindings(SimpleString address) throws Exception {
        return this.addressManager.getMatchingBindings(address);
    }

    @Override
    public void route(ServerMessage message, boolean direct) throws Exception {
        this.route(message, (Transaction)null, direct);
    }

    @Override
    public void route(ServerMessage message, Transaction tx2, boolean direct) throws Exception {
        this.route(message, new RoutingContextImpl(tx2), direct);
    }

    @Override
    public void route(ServerMessage message, Transaction tx2, boolean direct, boolean rejectDuplicates) throws Exception {
        this.route(message, new RoutingContextImpl(tx2), direct, rejectDuplicates);
    }

    @Override
    public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception {
        this.route(message, context, direct, true);
    }

    @Override
    public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception {
        Bindings bindings;
        if (message.getRefCount() > 0) {
            throw new IllegalStateException("Message cannot be routed more than once");
        }
        SimpleString address = message.getAddress();
        this.setPagingStore(message);
        AtomicBoolean startedTX = new AtomicBoolean(false);
        this.applyExpiryDelay(message, address);
        if (!this.checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
            return;
        }
        if (message.hasInternalProperties()) {
            this.cleanupInternalPropertiesBeforeRouting(message);
        }
        if ((bindings = this.addressManager.getBindingsForRoutingAddress(address)) != null) {
            bindings.route(message, context);
        } else if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
        }
        if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
            HornetQServerLogger.LOGGER.trace("Message after routed=" + message);
        }
        if (context.getQueueCount() == 0) {
            AddressSettings addressSettings = this.addressSettingsRepository.getMatch(address.toString());
            boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
            if (sendToDLA) {
                SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
                if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                    HornetQServerLogger.LOGGER.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
                }
                if (dlaAddress == null) {
                    HornetQServerLogger.LOGGER.noDLA(address);
                } else {
                    message.setOriginalHeaders(message, null, false);
                    message.setAddress(dlaAddress);
                    this.route(message, context.getTransaction(), false);
                }
            } else {
                if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                    HornetQServerLogger.LOGGER.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
                }
                if (message.isLargeMessage()) {
                    ((LargeServerMessage)message).deleteFile();
                }
            }
        } else {
            try {
                this.processRoute(message, context, direct);
            }
            catch (HornetQAddressFullException e) {
                if (startedTX.get()) {
                    context.getTransaction().rollback();
                } else if (context.getTransaction() != null) {
                    context.getTransaction().markAsRollbackOnly(e);
                }
                throw e;
            }
        }
        if (startedTX.get()) {
            context.getTransaction().commit();
        }
    }

    private void applyExpiryDelay(ServerMessage message, SimpleString address) {
        long expirationOverride = this.addressSettingsRepository.getMatch(address.toString()).getExpiryDelay();
        if (expirationOverride >= 0L && message.getExpiration() == 0L) {
            message.setExpiration(System.currentTimeMillis() + expirationOverride);
        }
    }

    @Override
    public MessageReference reroute(ServerMessage message, Queue queue, Transaction tx2) throws Exception {
        this.setPagingStore(message);
        MessageReference reference = message.createReference(queue);
        if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
            Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
            reference.setScheduledDeliveryTime(scheduledDeliveryTime);
        }
        message.incrementDurableRefCount();
        message.incrementRefCount();
        if (tx2 == null) {
            queue.reload(reference);
        } else {
            ArrayList<MessageReference> refs = new ArrayList<MessageReference>(1);
            refs.add(reference);
            tx2.addOperation(new AddOperation(refs));
        }
        return reference;
    }

    @Override
    public Pair<RoutingContext, ServerMessage> redistribute(ServerMessage message, Queue originatingQueue, Transaction tx2) throws Exception {
        RoutingContextImpl context;
        boolean routed;
        ServerMessage copyRedistribute = message.copy(this.storageManager.generateUniqueID());
        Bindings bindings = this.addressManager.getBindingsForRoutingAddress(message.getAddress());
        if (bindings != null && (routed = bindings.redistribute(copyRedistribute, originatingQueue, context = new RoutingContextImpl(tx2)))) {
            return new Pair<RoutingContext, ServerMessage>(context, copyRedistribute);
        }
        return null;
    }

    @Override
    public DuplicateIDCache getDuplicateIDCache(SimpleString address) {
        DuplicateIDCache oldCache;
        DuplicateIDCache cache = (DuplicateIDCache)this.duplicateIDCaches.get(address);
        if (cache == null && (oldCache = this.duplicateIDCaches.putIfAbsent(address, cache = new DuplicateIDCacheImpl(address, this.idCacheSize, this.storageManager, this.persistIDCache))) != null) {
            cache = oldCache;
        }
        return cache;
    }

    public ConcurrentMap<SimpleString, DuplicateIDCache> getDuplicateIDCaches() {
        return this.duplicateIDCaches;
    }

    @Override
    public Object getNotificationLock() {
        return this.notificationLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception {
        Binding binding = this.addressManager.getBinding(queueName);
        if (binding == null) {
            throw new IllegalStateException("Cannot find queue " + queueName);
        }
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("PostOffice.sendQueueInfoToQueue on server=" + this.server + ", queueName=" + queueName + " and address=" + address);
        }
        Queue queue = (Queue)binding.getBindable();
        Object object = this.notificationLock;
        synchronized (object) {
            ServerMessage message = new ServerMessageImpl(this.storageManager.generateUniqueID(), 50);
            message.setAddress(queueName);
            message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
            this.routeQueueInfo(message, queue, false);
            for (QueueInfo info : this.queueInfos.values()) {
                if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                    HornetQServerLogger.LOGGER.trace("QueueInfo on sendQueueInfoToQueue = " + info);
                }
                if (!info.getAddress().startsWith(address)) continue;
                message = this.createQueueInfoMessage(CoreNotificationType.BINDING_ADDED, queueName);
                message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
                message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
                this.routeQueueInfo(message, queue, true);
                int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
                for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; ++i) {
                    message = this.createQueueInfoMessage(CoreNotificationType.CONSUMER_CREATED, queueName);
                    message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                    message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                    message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                    message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
                    this.routeQueueInfo(message, queue, true);
                }
                if (info.getFilterStrings() == null) continue;
                for (SimpleString filterString : info.getFilterStrings()) {
                    message = this.createQueueInfoMessage(CoreNotificationType.CONSUMER_CREATED, queueName);
                    message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                    message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                    message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
                    message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
                    message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
                    this.routeQueueInfo(message, queue, true);
                }
            }
        }
    }

    public String toString() {
        return "PostOfficeImpl [server=" + this.server + "]";
    }

    protected void cleanupInternalPropertiesBeforeRouting(ServerMessage message) {
        LinkedList<SimpleString> valuesToRemove = null;
        for (SimpleString name : message.getPropertyNames()) {
            if (!name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) || name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) continue;
            if (valuesToRemove == null) {
                valuesToRemove = new LinkedList<SimpleString>();
            }
            valuesToRemove.add(name);
        }
        if (valuesToRemove != null) {
            for (SimpleString removal : valuesToRemove) {
                message.removeProperty(removal);
            }
        }
    }

    private void setPagingStore(ServerMessage message) throws Exception {
        PagingStore store = this.pagingManager.getPageStore(message.getAddress());
        message.setPagingStore(store);
    }

    private void routeQueueInfo(ServerMessage message, Queue queue, boolean applyFilters) throws Exception {
        if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
            RoutingContextImpl context = new RoutingContextImpl(null);
            queue.route(message, context);
            this.processRoute(message, context, false);
        }
    }

    @Override
    public void processRoute(ServerMessage message, RoutingContext context, final boolean direct) throws Exception {
        final ArrayList<MessageReference> refs = new ArrayList<MessageReference>();
        Transaction tx2 = context.getTransaction();
        for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
            MessageReference reference;
            PagingStore store = this.pagingManager.getPageStore(entry.getKey());
            if (this.storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
                if (message.isLargeMessage()) {
                    this.confirmLargeMessageSend(tx2, message);
                }
                this.schedulePageDelivery(tx2, entry);
                continue;
            }
            for (Queue queue : entry.getValue().getNonDurableQueues()) {
                reference = message.createReference(queue);
                refs.add(reference);
                if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                    Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
                    reference.setScheduledDeliveryTime(scheduledDeliveryTime);
                }
                message.incrementRefCount();
            }
            Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
            while (iter.hasNext()) {
                Queue queue;
                queue = iter.next();
                reference = message.createReference(queue);
                refs.add(reference);
                if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                    Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
                    reference.setScheduledDeliveryTime(scheduledDeliveryTime);
                }
                if (message.isDurable()) {
                    int durableRefCount = message.incrementDurableRefCount();
                    if (durableRefCount == 1) {
                        if (tx2 != null) {
                            this.storageManager.storeMessageTransactional(tx2.getID(), message);
                        } else {
                            this.storageManager.storeMessage(message);
                        }
                        if (message.isLargeMessage()) {
                            this.confirmLargeMessageSend(tx2, message);
                        }
                    }
                    if (tx2 != null) {
                        this.storageManager.storeReferenceTransactional(tx2.getID(), queue.getID(), message.getMessageID());
                        tx2.setContainsPersistent();
                    } else {
                        this.storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
                    }
                    if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
                        if (tx2 != null) {
                            this.storageManager.updateScheduledDeliveryTimeTransactional(tx2.getID(), reference);
                        } else {
                            this.storageManager.updateScheduledDeliveryTime(reference);
                        }
                    }
                }
                message.incrementRefCount();
            }
        }
        if (tx2 != null) {
            tx2.addOperation(new AddOperation(refs));
        } else {
            this.storageManager.afterCompleteOperations(new IOAsyncTask(){

                @Override
                public void onError(int errorCode, String errorMessage) {
                    HornetQServerLogger.LOGGER.ioErrorAddingReferences(errorCode, errorMessage);
                }

                @Override
                public void done() {
                    PostOfficeImpl.this.addReferences(refs, direct);
                }
            });
        }
    }

    private void confirmLargeMessageSend(Transaction tx2, ServerMessage message) throws Exception {
        LargeServerMessage largeServerMessage = (LargeServerMessage)message;
        if (largeServerMessage.getPendingRecordID() >= 0L) {
            if (tx2 == null) {
                this.storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
            } else {
                this.storageManager.confirmPendingLargeMessageTX(tx2, largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID());
            }
            largeServerMessage.setPendingRecordID(-1L);
        }
    }

    private void schedulePageDelivery(Transaction tx2, Map.Entry<SimpleString, RouteContextList> entry) {
        if (tx2 != null) {
            PageDelivery delivery = (PageDelivery)tx2.getProperty(7);
            if (delivery == null) {
                delivery = new PageDelivery();
                tx2.putProperty(7, delivery);
                tx2.addOperation(delivery);
            }
            delivery.addQueues(entry.getValue().getDurableQueues());
            delivery.addQueues(entry.getValue().getNonDurableQueues());
        } else {
            List<Queue> durableQueues = entry.getValue().getDurableQueues();
            List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
            final ArrayList<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
            queues.addAll(durableQueues);
            queues.addAll(nonDurableQueues);
            this.storageManager.afterCompleteOperations(new IOAsyncTask(){

                @Override
                public void onError(int errorCode, String errorMessage) {
                }

                @Override
                public void done() {
                    for (Queue queue : queues) {
                        queue.deliverAsync();
                    }
                }
            });
        }
    }

    private boolean checkDuplicateID(ServerMessage message, RoutingContext context, boolean rejectDuplicates, AtomicBoolean startedTX) throws Exception {
        Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
        if (bridgeDup != null) {
            byte[] bridgeDupBytes = (byte[])bridgeDup;
            DuplicateIDCache cacheBridge = this.getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress()));
            if (cacheBridge.contains(bridgeDupBytes)) {
                HornetQServerLogger.LOGGER.duplicateMessageDetectedThruBridge(message);
                if (context.getTransaction() != null) {
                    context.getTransaction().markAsRollbackOnly(new HornetQDuplicateIdException());
                }
                message.decrementRefCount();
                return false;
            }
            if (context.getTransaction() == null) {
                context.setTransaction(new TransactionImpl(this.storageManager));
                startedTX.set(true);
            }
            cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
            message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
        } else {
            byte[] duplicateIDBytes = message.getDuplicateIDBytes();
            DuplicateIDCache cache = null;
            boolean isDuplicate = false;
            if (duplicateIDBytes != null) {
                cache = this.getDuplicateIDCache(message.getAddress());
                isDuplicate = cache.contains(duplicateIDBytes);
                if (rejectDuplicates && isDuplicate) {
                    HornetQServerLogger.LOGGER.duplicateMessageDetected(message);
                    String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();
                    if (context.getTransaction() != null) {
                        context.getTransaction().markAsRollbackOnly(new HornetQDuplicateIdException(warnMessage));
                    }
                    message.decrementRefCount();
                    return false;
                }
            }
            if (cache != null && !isDuplicate) {
                if (context.getTransaction() == null) {
                    context.setTransaction(new TransactionImpl(this.storageManager));
                    startedTX.set(true);
                }
                cache.addToCache(duplicateIDBytes, context.getTransaction());
            }
        }
        return true;
    }

    private void addReferences(List<MessageReference> refs, boolean direct) {
        for (MessageReference ref : refs) {
            ref.getQueue().addTail(ref, direct);
        }
    }

    @Override
    public synchronized void startExpiryScanner() {
        if (this.reaperPeriod > 0L) {
            if (this.reaperRunnable != null) {
                this.reaperRunnable.stop();
            }
            this.reaperRunnable = new Reaper();
            this.reaperThread = new Thread((Runnable)this.reaperRunnable, "hornetq-expiry-reaper-thread");
            this.reaperThread.setPriority(this.reaperPriority);
            this.reaperThread.start();
        }
    }

    private ServerMessage createQueueInfoMessage(NotificationType type, SimpleString queueName) {
        ServerMessageImpl message = new ServerMessageImpl(this.storageManager.generateUniqueID(), 50);
        message.setAddress(queueName);
        String uid = UUIDGenerator.getInstance().generateStringUUID();
        message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
        message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
        message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
        return message;
    }

    @Override
    public Bindings createBindings(SimpleString address) throws Exception {
        GroupingHandler groupingHandler = this.server.getGroupingHandler();
        BindingsImpl bindings = new BindingsImpl(address, groupingHandler, this.pagingManager.getPageStore(address));
        if (groupingHandler != null) {
            groupingHandler.addListener(bindings);
        }
        return bindings;
    }

    public AddressManager getAddressManager() {
        return this.addressManager;
    }

    public HornetQServer getServer() {
        return this.server;
    }

    private static final class AddOperation
    implements TransactionOperation {
        private final List<MessageReference> refs;

        AddOperation(List<MessageReference> refs) {
            this.refs = refs;
        }

        @Override
        public void afterCommit(Transaction tx2) {
            for (MessageReference ref : this.refs) {
                ref.getQueue().addTail(ref, false);
            }
        }

        @Override
        public void afterPrepare(Transaction tx2) {
        }

        @Override
        public void afterRollback(Transaction tx2) {
        }

        @Override
        public void beforeCommit(Transaction tx2) throws Exception {
        }

        @Override
        public void beforePrepare(Transaction tx2) throws Exception {
        }

        @Override
        public void beforeRollback(Transaction tx2) throws Exception {
            for (MessageReference ref : this.refs) {
                ServerMessage message = ref.getMessage();
                if (message.isDurable() && ref.getQueue().isDurable()) {
                    message.decrementDurableRefCount();
                }
                message.decrementRefCount();
            }
        }

        @Override
        public List<MessageReference> getRelatedMessageReferences() {
            return this.refs;
        }

        @Override
        public List<MessageReference> getListOnConsumer(long consumerID) {
            return Collections.emptyList();
        }
    }

    private final class Reaper
    implements Runnable {
        private final CountDownLatch latch = new CountDownLatch(1);

        private Reaper() {
        }

        public void stop() {
            this.latch.countDown();
        }

        @Override
        public void run() {
            while (PostOfficeImpl.this.isStarted()) {
                try {
                    if (this.latch.await(PostOfficeImpl.this.reaperPeriod, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                }
                catch (InterruptedException e1) {
                    throw new HornetQInterruptedException(e1);
                }
                if (!PostOfficeImpl.this.isStarted()) {
                    return;
                }
                Map<SimpleString, Binding> nameMap = PostOfficeImpl.this.addressManager.getBindings();
                ArrayList<Queue> queues = new ArrayList<Queue>();
                for (Binding binding : nameMap.values()) {
                    if (binding.getType() != BindingType.LOCAL_QUEUE) continue;
                    Queue queue = (Queue)binding.getBindable();
                    queues.add(queue);
                }
                for (Queue queue : queues) {
                    try {
                        queue.expireReferences();
                    }
                    catch (Exception e) {
                        HornetQServerLogger.LOGGER.errorExpiringMessages(e);
                    }
                }
            }
        }
    }

    private static class PageDelivery
    extends TransactionOperationAbstract {
        private final Set<Queue> queues = new HashSet<Queue>();

        private PageDelivery() {
        }

        public void addQueues(List<Queue> queueList) {
            this.queues.addAll(queueList);
        }

        @Override
        public void afterCommit(Transaction tx2) {
            for (Queue queue : this.queues) {
                queue.deliverAsync();
            }
        }

        @Override
        public List<MessageReference> getRelatedMessageReferences() {
            return Collections.emptyList();
        }
    }
}

