package org.apache.activemq.transport.peer;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/peer/PeerTransportTest.class */
public class PeerTransportTest extends TestCase {
    protected static final int MESSAGE_COUNT = 50;
    protected static final int NUMBER_IN_CLUSTER = 3;
    private static final Logger LOG = LoggerFactory.getLogger(PeerTransportTest.class);
    protected ActiveMQDestination destination;
    protected boolean topic = true;
    protected int deliveryMode = 1;
    protected MessageProducer[] producers;
    protected Connection[] connections;
    protected MessageIdList[] messageIdList;

    protected void setUp() throws Exception {
        this.connections = new Connection[3];
        this.producers = new MessageProducer[3];
        this.messageIdList = new MessageIdList[3];
        ActiveMQDestination createDestination = createDestination();
        for (int i = 0; i < 3; i++) {
            this.connections[i] = createConnection(i);
            this.connections[i].setClientID("ClusterTest" + i);
            this.connections[i].start();
            Session createSession = this.connections[i].createSession(false, 1);
            this.producers[i] = createSession.createProducer(createDestination);
            this.producers[i].setDeliveryMode(this.deliveryMode);
            MessageConsumer createMessageConsumer = createMessageConsumer(createSession, createDestination);
            this.messageIdList[i] = new MessageIdList();
            createMessageConsumer.setMessageListener(this.messageIdList[i]);
        }
        LOG.info("Waiting for cluster to be fully connected");
        ActiveMQTopic consumerAdvisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(createDestination);
        for (int i2 = 0; i2 < 3; i2++) {
            Session createSession2 = this.connections[i2].createSession(false, 1);
            MessageConsumer createMessageConsumer2 = createMessageConsumer(createSession2, consumerAdvisoryTopic);
            int i3 = 0;
            while (i3 < 3) {
                ActiveMQMessage receive = createMessageConsumer2.receive(1000L);
                if (receive == null) {
                    fail("Connection " + i2 + " saw " + i3 + " consumers, expected: 3");
                }
                if (receive.getDataStructure() != null && receive.getDataStructure().getDataStructureType() == 5) {
                    i3++;
                }
            }
            createSession2.close();
        }
        LOG.info("Cluster is online.");
    }

    protected void tearDown() throws Exception {
        if (this.connections != null) {
            for (int i = 0; i < this.connections.length; i++) {
                this.connections[i].close();
            }
        }
    }

    protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
        return session.createConsumer(destination);
    }

    protected Connection createConnection(int i) throws JMSException {
        LOG.info("creating connection ....");
        return new ActiveMQConnectionFactory("peer://" + getClass().getName() + "/node" + i).createConnection();
    }

    protected ActiveMQDestination createDestination() {
        return createDestination(getClass().getName());
    }

    protected ActiveMQDestination createDestination(String str) {
        return this.topic ? new ActiveMQTopic(str) : new ActiveMQQueue(str);
    }

    public void testSendReceive() throws Exception {
        for (int i = 0; i < 50; i++) {
            for (int i2 = 0; i2 < this.producers.length; i2++) {
                ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
                activeMQTextMessage.setText("MSG-NO: " + i + " in cluster: " + i2);
                this.producers[i2].send(activeMQTextMessage);
            }
        }
        for (int i3 = 0; i3 < 3; i3++) {
            this.messageIdList[i3].assertMessagesReceived(expectedReceiveCount());
        }
    }

    protected int expectedReceiveCount() {
        return 150;
    }
}
