/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.server.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQNonExistentQueueException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.server.BindingQueryResult;
import org.hornetq.core.server.HornetQMessageBundle;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerConsumerImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;

public class ServerSessionImpl
implements ServerSession,
FailureListener {
    private static final boolean isTrace = HornetQServerLogger.LOGGER.isTraceEnabled();
    private final String username;
    private final String password;
    private final int minLargeMessageSize;
    private final boolean autoCommitSends;
    private final boolean autoCommitAcks;
    private final boolean preAcknowledge;
    private final boolean strictUpdateDeliveryCount;
    private final RemotingConnection remotingConnection;
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
    private Transaction tx;
    private final boolean xa;
    private final StorageManager storageManager;
    private final ResourceManager resourceManager;
    public final PostOffice postOffice;
    private final SecurityStore securityStore;
    private final ManagementService managementService;
    private volatile boolean started = false;
    private final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>();
    private final String name;
    private final HornetQServer server;
    private final SimpleString managementAddress;
    private volatile LargeServerMessage currentLargeMessage;
    private final RoutingContext routingContext = new RoutingContextImpl(null);
    private final SessionCallback callback;
    private volatile SimpleString defaultAddress;
    private volatile int timeoutSeconds;
    private Map<String, String> metaData;
    private final OperationContext context;
    private final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
    private final long creationTime = System.currentTimeMillis();
    private volatile boolean closed = false;

    public ServerSessionImpl(String name, String username, String password, int minLargeMessageSize, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean strictUpdateDeliveryCount, boolean xa, RemotingConnection remotingConnection, StorageManager storageManager, PostOffice postOffice, ResourceManager resourceManager, SecurityStore securityStore, ManagementService managementService, HornetQServer server, SimpleString managementAddress, SimpleString defaultAddress, SessionCallback callback, OperationContext context) throws Exception {
        this.username = username;
        this.password = password;
        this.minLargeMessageSize = minLargeMessageSize;
        this.autoCommitSends = autoCommitSends;
        this.autoCommitAcks = autoCommitAcks;
        this.preAcknowledge = preAcknowledge;
        this.remotingConnection = remotingConnection;
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.resourceManager = resourceManager;
        this.securityStore = securityStore;
        this.timeoutSeconds = resourceManager.getTimeoutSeconds();
        this.xa = xa;
        this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
        this.managementService = managementService;
        this.name = name;
        this.server = server;
        this.managementAddress = managementAddress;
        this.callback = callback;
        this.defaultAddress = defaultAddress;
        remotingConnection.addFailureListener(this);
        this.context = context;
        if (!xa) {
            this.tx = this.newTransaction();
        }
    }

    @Override
    public OperationContext getSessionContext() {
        return this.context;
    }

    @Override
    public String getUsername() {
        return this.username;
    }

    @Override
    public String getPassword() {
        return this.password;
    }

    @Override
    public int getMinLargeMessageSize() {
        return this.minLargeMessageSize;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public Object getConnectionID() {
        return this.remotingConnection.getID();
    }

    @Override
    public Set<ServerConsumer> getServerConsumers() {
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        return Collections.unmodifiableSet(consumersClone);
    }

    @Override
    public void removeConsumer(long consumerID) throws Exception {
        if (this.consumers.remove(consumerID) == null) {
            throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doClose(boolean failed) throws Exception {
        ServerSessionImpl serverSessionImpl = this;
        synchronized (serverSessionImpl) {
            if (this.closed) {
                return;
            }
            if (this.tx != null && this.tx.getXid() == null) {
                try {
                    this.rollback(failed, false);
                }
                catch (Exception e) {
                    HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
                }
            }
            this.server.removeSession(this.name);
            this.remotingConnection.removeFailureListener(this);
            this.callback.closed();
            this.closed = true;
        }
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        for (ServerConsumer consumer : consumersClone) {
            consumer.close(failed);
        }
        this.consumers.clear();
        if (this.currentLargeMessage != null) {
            try {
                this.currentLargeMessage.deleteFile();
            }
            catch (Throwable error) {
                HornetQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
            }
        }
    }

    @Override
    public void createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly) throws Exception {
        this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null);
    }

    public void createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly, boolean supportLargeMessage, Integer credits) throws Exception {
        Binding binding = this.postOffice.getBinding(queueName);
        if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
            throw HornetQMessageBundle.BUNDLE.noSuchQueue(queueName);
        }
        this.securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
        Filter filter = FilterImpl.createFilter(filterString);
        ServerConsumerImpl consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding)binding, filter, this.started, browseOnly, this.storageManager, this.callback, this.preAcknowledge, this.strictUpdateDeliveryCount, this.managementService, supportLargeMessage, credits);
        this.consumers.put(consumer.getID(), consumer);
        if (!browseOnly) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
            Queue theQueue = (Queue)binding.getBindable();
            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
            props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(this.username));
            props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(this.remotingConnection.getRemoteAddress()));
            props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(this.name));
            if (filterString != null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
            }
            Notification notification = new Notification(null, NotificationType.CONSUMER_CREATED, props);
            if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                HornetQServerLogger.LOGGER.debug("Session with user=" + this.username + ", connection=" + this.remotingConnection + " created a consumer on queue " + queueName + ", filter = " + filterString);
            }
            this.managementService.sendNotification(notification);
        }
    }

    @Override
    public void createQueue(SimpleString address, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
        if (durable) {
            this.securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
        } else {
            this.securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
        }
        Queue queue = this.server.createQueue(address, name, filterString, durable, temporary);
        if (temporary) {
            TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(this.server, name);
            this.remotingConnection.addCloseListener(cleaner);
            this.remotingConnection.addFailureListener(cleaner);
            this.tempQueueCleannerUppers.put(name, cleaner);
        }
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("Queue " + name + " created on address " + name + " with filter=" + filterString + " temporary = " + temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
        }
    }

    @Override
    public void createSharedQueue(SimpleString address, SimpleString name, boolean durable, SimpleString filterString) throws Exception {
        this.securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
        this.server.createSharedQueue(address, name, filterString, durable);
    }

    @Override
    public RemotingConnection getRemotingConnection() {
        return this.remotingConnection;
    }

    @Override
    public void deleteQueue(SimpleString queueToDelete) throws Exception {
        Binding binding = this.postOffice.getBinding(queueToDelete);
        if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
            throw new HornetQNonExistentQueueException();
        }
        this.server.destroyQueue(queueToDelete, this, true);
        TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(queueToDelete);
        if (cleaner != null) {
            this.remotingConnection.removeCloseListener(cleaner);
            this.remotingConnection.removeFailureListener(cleaner);
        }
    }

    @Override
    public QueueQueryResult executeQueueQuery(SimpleString name) throws Exception {
        QueueQueryResult response;
        if (name == null) {
            throw HornetQMessageBundle.BUNDLE.queueNameIsNull();
        }
        Binding binding = this.postOffice.getBinding(name);
        if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
            Queue queue = (Queue)binding.getBindable();
            Filter filter = queue.getFilter();
            SimpleString filterString = filter == null ? null : filter.getFilterString();
            response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(1000L));
        } else {
            response = name.equals(this.managementAddress) ? new QueueQueryResult(name, this.managementAddress, true, false, null, -1, -1L) : new QueueQueryResult();
        }
        return response;
    }

    @Override
    public BindingQueryResult executeBindingQuery(SimpleString address) throws Exception {
        if (address == null) {
            throw HornetQMessageBundle.BUNDLE.addressIsNull();
        }
        ArrayList<SimpleString> names = new ArrayList<SimpleString>();
        if (address.equals(this.managementAddress)) {
            return new BindingQueryResult(true, names);
        }
        Bindings bindings = this.postOffice.getMatchingBindings(address);
        for (Binding binding : bindings.getBindings()) {
            if (binding.getType() != BindingType.LOCAL_QUEUE && binding.getType() != BindingType.REMOTE_QUEUE) continue;
            names.add(binding.getUniqueName());
        }
        return new BindingQueryResult(!names.isEmpty(), names);
    }

    @Override
    public void forceConsumerDelivery(long consumerID, long sequence) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.forceDelivery(sequence);
        }
    }

    @Override
    public void acknowledge(long consumerID, long messageID) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer == null) {
            throw HornetQMessageBundle.BUNDLE.consumerDoesntExist(consumerID);
        }
        if (this.tx != null && this.tx.getState() == Transaction.State.ROLLEDBACK) {
            TransactionImpl newTX = this.newTransaction();
            consumer.acknowledge(this.autoCommitAcks, newTX, messageID);
            newTX.rollback();
        } else {
            consumer.acknowledge(this.autoCommitAcks, this.tx, messageID);
        }
    }

    @Override
    public void individualAcknowledge(long consumerID, long messageID) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (this.xa && this.tx == null) {
            throw new HornetQXAException(-6, "Invalid transaction state");
        }
        if (this.tx != null && this.tx.getState() == Transaction.State.ROLLEDBACK) {
            TransactionImpl newTX = this.newTransaction();
            consumer.individualAcknowledge(this.autoCommitAcks, this.tx, messageID);
            newTX.rollback();
        } else {
            consumer.individualAcknowledge(this.autoCommitAcks, this.tx, messageID);
        }
    }

    @Override
    public void individualCancel(long consumerID, long messageID, boolean failed) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.individualCancel(messageID, failed);
        }
    }

    @Override
    public void expire(long consumerID, long messageID) throws Exception {
        MessageReference ref = this.consumers.get(consumerID).removeReferenceByID(messageID);
        if (ref != null) {
            ref.getQueue().expire(ref);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void commit() throws Exception {
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("Calling commit");
        }
        try {
            this.tx.commit();
        }
        finally {
            this.tx = this.newTransaction();
        }
    }

    @Override
    public void rollback(boolean considerLastMessageAsDelivered) throws Exception {
        this.rollback(false, considerLastMessageAsDelivered);
    }

    private synchronized void rollback(boolean clientFailed, boolean considerLastMessageAsDelivered) throws Exception {
        if (this.tx == null) {
            this.tx = this.newTransaction();
        }
        this.doRollback(clientFailed, considerLastMessageAsDelivered, this.tx);
        this.tx = this.xa ? null : this.newTransaction();
    }

    private TransactionImpl newTransaction() {
        return new TransactionImpl(this.storageManager, this.timeoutSeconds);
    }

    private TransactionImpl newTransaction(Xid xid) {
        return new TransactionImpl(xid, this.storageManager, this.timeoutSeconds);
    }

    @Override
    public synchronized void xaCommit(Xid xid, boolean onePhase) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            String msg = "Cannot commit, session is currently doing work in transaction " + this.tx.getXid();
            throw new HornetQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.removeTransaction(xid);
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("XAcommit into " + theTx + ", xid=" + xid);
        }
        if (theTx == null) {
            if (this.resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
                throw new HornetQXAException(7, "transaction has been heuristically committed: " + xid);
            }
            if (this.resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
                throw new HornetQXAException(6, "transaction has been heuristically rolled back: " + xid);
            }
            if (isTrace) {
                HornetQServerLogger.LOGGER.trace("XAcommit into " + theTx + ", xid=" + xid + " cannot find it");
            }
            throw new HornetQXAException(-4, "Cannot find xid in resource manager: " + xid);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            this.resourceManager.putTransaction(xid, theTx);
            throw new HornetQXAException(-6, "Cannot commit transaction, it is suspended " + xid);
        }
        theTx.commit(onePhase);
    }

    @Override
    public synchronized void xaEnd(Xid xid) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            if (this.tx.getState() == Transaction.State.SUSPENDED) {
                String msg = "Cannot end, transaction is suspended";
                throw new HornetQXAException(-6, "Cannot end, transaction is suspended");
            }
            if (this.tx.getState() == Transaction.State.ROLLEDBACK) {
                String msg = "Cannot end, transaction is rolled back";
                this.tx = null;
                throw new HornetQXAException(-6, "Cannot end, transaction is rolled back");
            }
            this.tx = null;
        } else {
            Transaction theTx = this.resourceManager.getTransaction(xid);
            if (theTx == null) {
                String msg = "Cannot find suspended transaction to end " + xid;
                throw new HornetQXAException(-4, msg);
            }
            if (theTx.getState() != Transaction.State.SUSPENDED) {
                String msg = "Transaction is not suspended " + xid;
                throw new HornetQXAException(-6, msg);
            }
            theTx.resume();
        }
    }

    @Override
    public synchronized void xaForget(Xid xid) throws Exception {
        long id = this.resourceManager.removeHeuristicCompletion(xid);
        if (id != -1L) {
            try {
                this.storageManager.deleteHeuristicCompletion(id);
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new HornetQXAException(-3);
            }
        } else {
            throw new HornetQXAException(-4);
        }
    }

    @Override
    public synchronized void xaJoin(Xid xid) throws Exception {
        Transaction theTx = this.resourceManager.getTransaction(xid);
        if (theTx == null) {
            String msg = "Cannot find xid in resource manager: " + xid;
            throw new HornetQXAException(-4, msg);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            throw new HornetQXAException(-6, "Cannot join tx, it is suspended " + xid);
        }
        this.tx = theTx;
    }

    @Override
    public synchronized void xaResume(Xid xid) throws Exception {
        if (this.tx != null) {
            String msg = "Cannot resume, session is currently doing work in a transaction " + this.tx.getXid();
            throw new HornetQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.getTransaction(xid);
        if (theTx == null) {
            String msg = "Cannot find xid in resource manager: " + xid;
            throw new HornetQXAException(-4, msg);
        }
        if (theTx.getState() != Transaction.State.SUSPENDED) {
            throw new HornetQXAException(-6, "Cannot resume transaction, it is not suspended " + xid);
        }
        this.tx = theTx;
        this.tx.resume();
    }

    @Override
    public synchronized void xaRollback(Xid xid) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            String msg = "Cannot roll back, session is currently doing work in a transaction " + this.tx.getXid();
            throw new HornetQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.removeTransaction(xid);
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("xarollback into " + theTx);
        }
        if (theTx == null) {
            if (this.resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
                throw new HornetQXAException(7, "transaction has ben heuristically committed: " + xid);
            }
            if (this.resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
                throw new HornetQXAException(6, "transaction has ben heuristically rolled back: " + xid);
            }
            if (isTrace) {
                HornetQServerLogger.LOGGER.trace("xarollback into " + theTx + ", xid=" + xid + " forcing a rollback regular");
            }
            try {
                this.rollback(false);
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
            }
            throw new HornetQXAException(-4, "Cannot find xid in resource manager: " + xid);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            if (isTrace) {
                HornetQServerLogger.LOGGER.trace("xarollback into " + theTx + " sending tx back as it was suspended");
            }
            this.resourceManager.putTransaction(xid, this.tx);
            throw new HornetQXAException(-6, "Cannot rollback transaction, it is suspended " + xid);
        }
        this.doRollback(false, false, theTx);
    }

    @Override
    public synchronized void xaStart(Xid xid) throws Exception {
        boolean added;
        if (this.tx != null) {
            HornetQServerLogger.LOGGER.xidReplacedOnXStart(this.tx.getXid().toString(), xid.toString());
            try {
                if (this.tx.getState() != Transaction.State.PREPARED) {
                    if (this.tx.getXid() != null) {
                        this.resourceManager.removeTransaction(this.tx.getXid());
                    }
                    this.tx.rollback();
                }
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.debug("An exception happened while we tried to debug the previous tx, we can ignore this exception", e);
            }
        }
        this.tx = this.newTransaction(xid);
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("xastart into tx= " + this.tx);
        }
        if (!(added = this.resourceManager.putTransaction(xid, this.tx))) {
            String msg = "Cannot start, there is already a xid " + this.tx.getXid();
            throw new HornetQXAException(-8, msg);
        }
    }

    @Override
    public synchronized void xaFailed(Xid xid) throws Exception {
        boolean added;
        if (this.tx != null) {
            String msg = "Cannot start, session is already doing work in a transaction " + this.tx.getXid();
            throw new HornetQXAException(-6, msg);
        }
        this.tx = this.newTransaction(xid);
        this.tx.markAsRollbackOnly(new HornetQException("Can't commit as a Failover happened during the operation"));
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("xastart into tx= " + this.tx);
        }
        if (!(added = this.resourceManager.putTransaction(xid, this.tx))) {
            String msg = "Cannot start, there is already a xid " + this.tx.getXid();
            throw new HornetQXAException(-8, msg);
        }
    }

    @Override
    public synchronized void xaSuspend() throws Exception {
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("xasuspend on " + this.tx);
        }
        if (this.tx == null) {
            String msg = "Cannot suspend, session is not doing work in a transaction ";
            throw new HornetQXAException(-6, "Cannot suspend, session is not doing work in a transaction ");
        }
        if (this.tx.getState() == Transaction.State.SUSPENDED) {
            String msg = "Cannot suspend, transaction is already suspended " + this.tx.getXid();
            throw new HornetQXAException(-6, msg);
        }
        this.tx.suspend();
        this.tx = null;
    }

    @Override
    public synchronized void xaPrepare(Xid xid) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            String msg = "Cannot commit, session is currently doing work in a transaction " + this.tx.getXid();
            throw new HornetQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.getTransaction(xid);
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("xaprepare into , xid=" + xid + ", tx= " + this.tx);
        }
        if (theTx == null) {
            String msg = "Cannot find xid in resource manager: " + xid;
            throw new HornetQXAException(-4, msg);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            throw new HornetQXAException(-6, "Cannot prepare transaction, it is suspended " + xid);
        }
        if (theTx.getState() == Transaction.State.PREPARED) {
            HornetQServerLogger.LOGGER.info("ignoring prepare on xid as already called :" + xid);
        } else {
            theTx.prepare();
        }
    }

    @Override
    public List<Xid> xaGetInDoubtXids() {
        ArrayList<Xid> xids = new ArrayList<Xid>();
        xids.addAll(this.resourceManager.getPreparedTransactions());
        xids.addAll(this.resourceManager.getHeuristicCommittedTransactions());
        xids.addAll(this.resourceManager.getHeuristicRolledbackTransactions());
        return xids;
    }

    @Override
    public int xaGetTimeout() {
        return this.resourceManager.getTimeoutSeconds();
    }

    @Override
    public void xaSetTimeout(int timeout) {
        this.timeoutSeconds = timeout;
        if (this.tx != null) {
            this.tx.setTimeout(timeout);
        }
    }

    @Override
    public void start() {
        this.setStarted(true);
    }

    @Override
    public void stop() {
        this.setStarted(false);
    }

    @Override
    public void waitContextCompletion() {
        try {
            if (!this.context.waitCompletion(10000L)) {
                HornetQServerLogger.LOGGER.errorCompletingContext(new Exception("warning"));
            }
        }
        catch (Exception e) {
            HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
        }
    }

    @Override
    public void close(final boolean failed) {
        if (this.closed) {
            return;
        }
        this.context.executeOnCompletion(new IOAsyncTask(){

            @Override
            public void onError(int errorCode, String errorMessage) {
            }

            @Override
            public void done() {
                try {
                    ServerSessionImpl.this.doClose(failed);
                }
                catch (Exception e) {
                    HornetQServerLogger.LOGGER.errorClosingSession(e);
                }
            }
        });
    }

    @Override
    public void closeConsumer(long consumerID) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.close(false);
        } else {
            HornetQServerLogger.LOGGER.cannotFindConsumer(consumerID);
        }
    }

    @Override
    public void receiveConsumerCredits(long consumerID, int credits) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer == null) {
            HornetQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID);
            return;
        }
        consumer.receiveCredits(credits);
    }

    @Override
    public Transaction getCurrentTransaction() {
        return this.tx;
    }

    @Override
    public void sendLarge(MessageInternal message) throws Exception {
        long id = this.storageManager.generateUniqueID();
        LargeServerMessage largeMsg = this.storageManager.createLargeMessage(id, message);
        if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
            HornetQServerLogger.LOGGER.trace("sendLarge::" + largeMsg);
        }
        if (this.currentLargeMessage != null) {
            HornetQServerLogger.LOGGER.replacingIncompleteLargeMessage(this.currentLargeMessage.getMessageID());
        }
        this.currentLargeMessage = largeMsg;
    }

    @Override
    public void send(ServerMessage message, boolean direct) throws Exception {
        if (!message.isLargeMessage()) {
            long id = this.storageManager.generateUniqueID();
            message.setMessageID(id);
            message.encodeMessageIDToBuffer();
        }
        SimpleString address = message.getAddress();
        if (this.defaultAddress == null && address != null) {
            this.defaultAddress = address;
        }
        if (address == null) {
            if (message.isDurable()) {
                message.setAddress(this.defaultAddress);
            } else {
                message.setAddressTransient(this.defaultAddress);
            }
        }
        if (isTrace) {
            HornetQServerLogger.LOGGER.trace("send(message=" + message + ", direct=" + direct + ") being called");
        }
        if (message.getAddress() == null) {
            throw HornetQMessageBundle.BUNDLE.noAddress();
        }
        if (message.getAddress().equals(this.managementAddress)) {
            this.handleManagementMessage(message, direct);
        } else {
            this.doSend(message, direct);
        }
    }

    @Override
    public void sendContinuations(int packetSize, long messageBodySize, byte[] body, boolean continues) throws Exception {
        if (this.currentLargeMessage == null) {
            throw HornetQMessageBundle.BUNDLE.largeMessageNotInitialised();
        }
        this.currentLargeMessage.addBytes(body);
        if (!continues) {
            this.currentLargeMessage.releaseResources();
            if (messageBodySize >= 0L) {
                this.currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
            }
            this.doSend(this.currentLargeMessage, false);
            this.currentLargeMessage = null;
        }
    }

    @Override
    public void requestProducerCredits(final SimpleString address, final int credits) throws Exception {
        PagingStore store = this.server.getPagingManager().getPageStore(address);
        if (!store.checkMemory(new Runnable(){

            @Override
            public void run() {
                ServerSessionImpl.this.callback.sendProducerCreditsMessage(credits, address);
            }
        })) {
            this.callback.sendProducerCreditsFailMessage(credits, address);
        }
    }

    @Override
    public void setTransferring(boolean transferring) {
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        for (ServerConsumer consumer : consumersClone) {
            consumer.setTransferring(transferring);
        }
    }

    @Override
    public void addMetaData(String key, String data) {
        if (this.metaData == null) {
            this.metaData = new HashMap<String, String>();
        }
        this.metaData.put(key, data);
    }

    @Override
    public boolean addUniqueMetaData(String key, String data) {
        ServerSession sessionWithMetaData = this.server.lookupSession(key, data);
        if (sessionWithMetaData != null && sessionWithMetaData != this) {
            return false;
        }
        this.addMetaData(key, data);
        return true;
    }

    @Override
    public String getMetaData(String key) {
        String data = null;
        if (this.metaData != null) {
            data = this.metaData.get(key);
        }
        return data;
    }

    @Override
    public String[] getTargetAddresses() {
        Map<SimpleString, Pair<UUID, AtomicLong>> copy = this.cloneTargetAddresses();
        Iterator<SimpleString> iter = copy.keySet().iterator();
        int num = copy.keySet().size();
        String[] addresses = new String[num];
        int i = 0;
        while (iter.hasNext()) {
            addresses[i] = iter.next().toString();
            ++i;
        }
        return addresses;
    }

    @Override
    public String getLastSentMessageID(String address) {
        Pair<UUID, AtomicLong> value = this.targetAddressInfos.get(SimpleString.toSimpleString(address));
        if (value != null) {
            return value.getA().toString();
        }
        return null;
    }

    @Override
    public long getCreationTime() {
        return this.creationTime;
    }

    public StorageManager getStorageManager() {
        return this.storageManager;
    }

    @Override
    public void describeProducersInfo(JSONArray array) throws Exception {
        Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy = this.cloneTargetAddresses();
        for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry : targetCopy.entrySet()) {
            JSONObject producerInfo = new JSONObject();
            producerInfo.put("connectionID", this.getConnectionID().toString());
            producerInfo.put("sessionID", this.getName());
            producerInfo.put("destination", entry.getKey().toString());
            producerInfo.put("lastUUIDSent", entry.getValue().getA());
            producerInfo.put("msgSent", entry.getValue().getB().longValue());
            array.put(producerInfo);
        }
    }

    public String toString() {
        StringBuffer buffer = new StringBuffer();
        if (this.metaData != null) {
            for (Map.Entry<String, String> value : this.metaData.entrySet()) {
                String tmpValue;
                if (buffer.length() != 0) {
                    buffer.append(",");
                }
                if ((tmpValue = value.getValue()) == null || tmpValue.toString().isEmpty()) {
                    buffer.append(value.getKey() + "=*N/A*");
                    continue;
                }
                buffer.append(value.getKey() + "=" + tmpValue);
            }
        }
        return "ServerSessionImpl(" + buffer.toString() + ")";
    }

    @Override
    public void connectionFailed(HornetQException me, boolean failedOver) {
        try {
            HornetQServerLogger.LOGGER.clientConnectionFailed(this.name);
            this.close(true);
            HornetQServerLogger.LOGGER.clientConnectionFailedClearingSession(this.name);
        }
        catch (Throwable t) {
            HornetQServerLogger.LOGGER.errorClosingConnection(this);
        }
    }

    public void clearLargeMessage() {
        this.currentLargeMessage = null;
    }

    private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() {
        return new HashMap<SimpleString, Pair<UUID, AtomicLong>>(this.targetAddressInfos);
    }

    private void setStarted(boolean s) {
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        for (ServerConsumer consumer : consumersClone) {
            consumer.setStarted(s);
        }
        this.started = s;
    }

    private void handleManagementMessage(ServerMessage message, boolean direct) throws Exception {
        try {
            this.securityStore.check(message.getAddress(), CheckType.MANAGE, this);
        }
        catch (HornetQException e) {
            if (!this.autoCommitSends) {
                this.tx.markAsRollbackOnly(e);
            }
            throw e;
        }
        ServerMessage reply = this.managementService.handleMessage(message);
        SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
        if (replyTo != null) {
            reply.setAddress(replyTo);
            this.doSend(reply, direct);
        }
    }

    private void doRollback(boolean clientFailed, boolean lastMessageAsDelived, Transaction theTx) throws Exception {
        boolean wasStarted = this.started;
        ArrayList<MessageReference> toCancel = new ArrayList<MessageReference>();
        for (ServerConsumer consumer : this.consumers.values()) {
            if (wasStarted) {
                consumer.setStarted(false);
            }
            toCancel.addAll(consumer.cancelRefs(clientFailed, lastMessageAsDelived, theTx));
        }
        for (MessageReference ref : toCancel) {
            ref.getQueue().cancel(theTx, ref);
        }
        if (wasStarted && !clientFailed) {
            theTx.addOperation(new TransactionOperationAbstract(){

                @Override
                public void afterRollback(Transaction tx) {
                    for (ServerConsumer consumer : ServerSessionImpl.this.consumers.values()) {
                        consumer.setStarted(true);
                    }
                }
            });
        }
        theTx.rollback();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSend(ServerMessage msg, boolean direct) throws Exception {
        try {
            this.securityStore.check(msg.getAddress(), CheckType.SEND, this);
        }
        catch (HornetQException e) {
            if (!this.autoCommitSends) {
                this.tx.markAsRollbackOnly(e);
            }
            throw e;
        }
        if (this.tx != null && !this.autoCommitSends) {
            this.routingContext.setTransaction(this.tx);
        }
        try {
            this.postOffice.route(msg, this.routingContext, direct);
            Pair<UUID, AtomicLong> value = this.targetAddressInfos.get(msg.getAddress());
            if (value == null) {
                this.targetAddressInfos.put(msg.getAddress(), new Pair<UUID, AtomicLong>(msg.getUserID(), new AtomicLong(1L)));
            } else {
                value.setA(msg.getUserID());
                value.getB().incrementAndGet();
            }
        }
        finally {
            this.routingContext.clear();
        }
    }

    @Override
    public List<MessageReference> getInTXMessagesForConsumer(long consumerId) {
        if (this.tx != null) {
            QueueImpl.RefsOperation oper = (QueueImpl.RefsOperation)this.tx.getProperty(6);
            if (oper == null) {
                return Collections.emptyList();
            }
            return oper.getListOnConsumer(consumerId);
        }
        return Collections.emptyList();
    }

    private static class TempQueueCleanerUpper
    implements CloseListener,
    FailureListener {
        private final SimpleString bindingName;
        private final HornetQServer server;

        TempQueueCleanerUpper(HornetQServer server, SimpleString bindingName) {
            this.server = server;
            this.bindingName = bindingName;
        }

        private void run() {
            try {
                if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                    HornetQServerLogger.LOGGER.debug("deleting temporary queue " + this.bindingName);
                }
                try {
                    this.server.destroyQueue(this.bindingName, null, false);
                }
                catch (HornetQException e) {
                    HornetQServerLogger.LOGGER.debug(e.getMessage(), e);
                }
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorRemovingTempQueue(e, this.bindingName);
            }
        }

        @Override
        public void connectionFailed(HornetQException exception, boolean failedOver) {
            this.run();
        }

        @Override
        public void connectionClosed() {
            this.run();
        }

        public String toString() {
            return "Temporary Cleaner for queue " + this.bindingName;
        }
    }
}

