/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;

public class BrokerTestSupport
extends CombinationTestSupport {
    public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
    protected RegionBroker regionBroker;
    public BrokerService broker;
    protected long idGenerator;
    protected int msgIdGenerator;
    protected int txGenerator;
    protected int tempDestGenerator;
    public PersistenceAdapter persistenceAdapter;
    protected String queueName = "TEST";
    protected int maxWait = 10000;
    protected SystemUsage memoryManager;
    protected PolicyMap policyMap = new PolicyMap();

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = this.createBroker();
        this.policyMap.setDefaultEntry(this.getDefaultPolicy());
        this.broker.setDestinationPolicy(this.policyMap);
        this.broker.start();
    }

    protected PolicyEntry getDefaultPolicy() {
        PolicyEntry policy = new PolicyEntry();
        policy.setDispatchPolicy((DispatchPolicy)new RoundRobinDispatchPolicy());
        policy.setSubscriptionRecoveryPolicy((SubscriptionRecoveryPolicy)new FixedCountSubscriptionRecoveryPolicy());
        return policy;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = BrokerFactory.createBroker((URI)new URI("broker:()/localhost?persistent=false"));
        return broker;
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = null;
        this.regionBroker = null;
        this.persistenceAdapter = null;
        this.memoryManager = null;
        super.tearDown();
    }

    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++this.idGenerator);
        info.setBrowser(false);
        info.setDestination(destination);
        info.setPrefetchSize(1000);
        info.setDispatchAsync(false);
        return info;
    }

    protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
        return consumerInfo.createRemoveCommand();
    }

    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
        ProducerInfo info = new ProducerInfo(sessionInfo, ++this.idGenerator);
        return info;
    }

    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
        SessionInfo info = new SessionInfo(connectionInfo, ++this.idGenerator);
        return info;
    }

    protected ConnectionInfo createConnectionInfo() throws Exception {
        ConnectionInfo info = new ConnectionInfo();
        info.setConnectionId(new ConnectionId("connection:" + ++this.idGenerator));
        info.setClientId(info.getConnectionId().getValue());
        return info;
    }

    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setMessageId(new MessageId(producerInfo, (long)(++this.msgIdGenerator)));
        message.setDestination(destination);
        message.setPersistent(false);
        try {
            message.setText("Test Message Payload.");
        }
        catch (MessageNotWriteableException messageNotWriteableException) {
            // empty catch block
        }
        return message;
    }

    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
        MessageAck ack = new MessageAck();
        ack.setAckType(ackType);
        ack.setConsumerId(consumerInfo.getConsumerId());
        ack.setDestination(msg.getDestination());
        ack.setLastMessageId(msg.getMessageId());
        ack.setMessageCount(count);
        return ack;
    }

    protected void gc() {
        this.regionBroker.gc();
    }

    protected void profilerPause(String prompt) throws IOException {
        if (System.getProperty("profiler") != null) {
            System.out.println();
            System.out.println(prompt + "> Press enter to continue: ");
            while (System.in.read() != 10) {
            }
            System.out.println(prompt + "> Done.");
        }
    }

    protected RemoveInfo closeConnectionInfo(ConnectionInfo info) {
        return info.createRemoveCommand();
    }

    protected RemoveInfo closeSessionInfo(SessionInfo info) {
        return info.createRemoveCommand();
    }

    protected RemoveInfo closeProducerInfo(ProducerInfo info) {
        return info.createRemoveCommand();
    }

    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
        Message message = this.createMessage(producerInfo, destination);
        message.setPersistent(deliveryMode == 2);
        return message;
    }

    protected LocalTransactionId createLocalTransaction(SessionInfo info) {
        LocalTransactionId id = new LocalTransactionId(info.getSessionId().getParentId(), (long)(++this.txGenerator));
        return id;
    }

    protected XATransactionId createXATransaction(SessionInfo info) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream os = new DataOutputStream(baos);
        os.writeLong(++this.txGenerator);
        os.close();
        byte[] bs = baos.toByteArray();
        XATransactionId xid = new XATransactionId();
        xid.setBranchQualifier(bs);
        xid.setGlobalTransactionId(bs);
        xid.setFormatId(55);
        return xid;
    }

    protected TransactionInfo createBeginTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
        TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, 0);
        return info;
    }

    protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
        TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, 1);
        return info;
    }

    protected TransactionInfo createCommitTransaction1Phase(ConnectionInfo connectionInfo, TransactionId txid) {
        TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, 2);
        return info;
    }

    protected TransactionInfo createCommitTransaction2Phase(ConnectionInfo connectionInfo, TransactionId txid) {
        TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, 3);
        return info;
    }

    protected TransactionInfo createRollbackTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
        TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, 4);
        return info;
    }

    protected int countMessagesInQueue(StubConnection connection, ConnectionInfo connectionInfo, ActiveMQDestination destination) throws Exception {
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)sessionInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        consumerInfo.setPrefetchSize(1);
        consumerInfo.setBrowser(true);
        connection.send((Command)consumerInfo);
        ArrayList<Object> skipped = new ArrayList<Object>();
        Object m = connection.getDispatchQueue().poll(this.maxWait, TimeUnit.MILLISECONDS);
        int i = 0;
        while (m != null) {
            if (m instanceof MessageDispatch && ((MessageDispatch)m).getConsumerId().equals((Object)consumerInfo.getConsumerId())) {
                MessageDispatch md = (MessageDispatch)m;
                if (md.getMessage() == null) break;
                ++i;
                connection.send((Command)this.createAck(consumerInfo, md.getMessage(), 1, (byte)2));
            } else {
                skipped.add(m);
            }
            m = connection.getDispatchQueue().poll(this.maxWait, TimeUnit.MILLISECONDS);
        }
        Iterator iter = skipped.iterator();
        while (iter.hasNext()) {
            connection.getDispatchQueue().put(iter.next());
        }
        connection.send((Command)this.closeSessionInfo(sessionInfo));
        return i;
    }

    protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
        DestinationInfo info = new DestinationInfo();
        info.setConnectionId(connectionInfo.getConnectionId());
        info.setOperationType((byte)0);
        info.setDestination(ActiveMQDestination.createDestination((String)(info.getConnectionId() + ":" + ++this.tempDestGenerator), (byte)destinationType));
        return info;
    }

    protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception {
        if ((destinationType & 4) != 0) {
            DestinationInfo info = this.createTempDestinationInfo(connectionInfo1, destinationType);
            connection.send((Command)info);
            return info.getDestination();
        }
        return ActiveMQDestination.createDestination((String)this.queueName, (byte)destinationType);
    }

    protected DestinationInfo closeDestinationInfo(DestinationInfo info) {
        info.setOperationType((byte)1);
        info.setTimeout(0L);
        return info;
    }

    protected StubConnection createConnection() throws Exception {
        return new StubConnection(this.broker);
    }

    public Message receiveMessage(StubConnection connection) throws InterruptedException {
        return this.receiveMessage(connection, this.maxWait);
    }

    public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
        Object o;
        do {
            if ((o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS)) != null) continue;
            return null;
        } while (!(o instanceof MessageDispatch));
        MessageDispatch dispatch = (MessageDispatch)o;
        if (dispatch.getMessage() == null) {
            return null;
        }
        dispatch.setMessage(dispatch.getMessage().copy());
        dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
        return dispatch.getMessage();
    }

    protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
        long wait;
        long l = wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0L : (long)this.maxWait;
        Object o;
        while ((o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS)) != null) {
            if (!(o instanceof MessageDispatch) || ((MessageDispatch)o).getMessage() == null) continue;
            BrokerTestSupport.fail((String)("Received a message: " + ((MessageDispatch)o).getMessage().getMessageId() + " for: " + ((MessageDispatch)o).getMessage().getDestination().getPhysicalName()));
        }
        return;
    }
}

