package org.hornetq.core.server.cluster.impl;

import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQExceptionType;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.management.CoreNotificationType;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.utils.UUID;
import org.hornetq.utils.UUIDGenerator;

/* loaded from: input_file:org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.class */
public class ClusterConnectionBridge extends BridgeImpl {
    private final ClusterConnection clusterConnection;
    private final ClusterManager clusterManager;
    private final MessageFlowRecord flowRecord;
    private final SimpleString managementAddress;
    private final SimpleString managementNotificationAddress;
    private ClientConsumer notifConsumer;
    private final SimpleString idsHeaderName;
    private final long targetNodeEventUID;
    private final ServerLocatorInternal discoveryLocator;

    public ClusterConnectionBridge(ClusterConnection clusterConnection, ClusterManager clusterManager, ServerLocatorInternal serverLocatorInternal, ServerLocatorInternal serverLocatorInternal2, int i, int i2, long j, double d, long j2, UUID uuid, long j3, String str, SimpleString simpleString, Queue queue, Executor executor, Filter filter, SimpleString simpleString2, ScheduledExecutorService scheduledExecutorService, Transformer transformer, boolean z, String str2, String str3, StorageManager storageManager, SimpleString simpleString3, SimpleString simpleString4, MessageFlowRecord messageFlowRecord, TransportConfiguration transportConfiguration) {
        super(serverLocatorInternal, i, i2, 0, j, d, j2, uuid, simpleString, queue, executor, filter, simpleString2, scheduledExecutorService, transformer, z, str2, str3, storageManager);
        this.discoveryLocator = serverLocatorInternal2;
        this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(simpleString);
        this.clusterConnection = clusterConnection;
        this.clusterManager = clusterManager;
        this.targetNodeEventUID = j3;
        this.targetNodeID = str;
        this.managementAddress = simpleString3;
        this.managementNotificationAddress = simpleString4;
        this.flowRecord = messageFlowRecord;
        queue.setInternalQueue(true);
        if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
            HornetQServerLogger.LOGGER.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + serverLocatorInternal, new Exception("trace"));
        }
    }

    @Override // org.hornetq.core.server.cluster.impl.BridgeImpl
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
        ClientSessionFactoryInternal clientSessionFactoryInternal = (ClientSessionFactoryInternal) this.serverLocator.createSessionFactory(this.targetNodeID);
        setSessionFactory(clientSessionFactoryInternal);
        if (clientSessionFactoryInternal == null) {
            HornetQServerLogger.LOGGER.nodeNotAvailable(this.targetNodeID);
            return null;
        }
        clientSessionFactoryInternal.setReconnectAttempts(0);
        clientSessionFactoryInternal.getConnection().addFailureListener(new FailureListener() { // from class: org.hornetq.core.server.cluster.impl.ClusterConnectionBridge.1
            @Override // org.hornetq.core.remoting.FailureListener
            public void connectionFailed(HornetQException hornetQException, boolean z) {
                if (hornetQException.getType() == HornetQExceptionType.DISCONNECTED) {
                    ClusterConnectionBridge.this.flowRecord.serverDisconnected();
                    ClusterConnectionBridge.this.clusterManager.removeClusterLocator(ClusterConnectionBridge.this.serverLocator);
                }
            }
        });
        return clientSessionFactoryInternal;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.hornetq.core.server.cluster.impl.BridgeImpl
    public ServerMessage beforeForward(ServerMessage serverMessage) {
        ServerMessage copy = serverMessage.copy();
        if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
            HornetQServerLogger.LOGGER.trace("Clustered bridge  copied message " + serverMessage + " as " + copy + " before delivery");
        }
        HashSet<SimpleString> hashSet = new HashSet(copy.getPropertyNames());
        byte[] bytesProperty = serverMessage.getBytesProperty(this.idsHeaderName);
        if (bytesProperty == null) {
            HornetQServerLogger.LOGGER.noQueueIdDefined(serverMessage, copy, this.idsHeaderName);
            throw new IllegalStateException("no queueIDs defined");
        }
        for (SimpleString simpleString : hashSet) {
            if (simpleString.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
                copy.removeProperty(simpleString);
            }
        }
        copy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, bytesProperty);
        return super.beforeForward(copy);
    }

    private void setupNotificationConsumer() throws Exception {
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("Setting up notificationConsumer between " + this.clusterConnection.getConnector() + " and " + this.flowRecord.getBridge().getForwardingConnection() + " clusterConnection = " + ((Object) this.clusterConnection.getName()) + " on server " + this.clusterConnection.getServer());
        }
        if (this.flowRecord != null) {
            this.flowRecord.reset();
            if (this.notifConsumer != null) {
                try {
                    HornetQServerLogger.LOGGER.debug("Closing notification Consumer for reopening " + this.notifConsumer + " on bridge " + ((Object) getName()));
                    this.notifConsumer.close();
                    this.notifConsumer = null;
                } catch (HornetQException e) {
                    HornetQServerLogger.LOGGER.errorClosingConsumer(e);
                }
            }
            SimpleString simpleString = new SimpleString("notif." + UUIDGenerator.getInstance().generateStringUUID() + "." + this.clusterConnection.getServer());
            this.session.createTemporaryQueue(this.managementNotificationAddress, simpleString, new SimpleString(((Object) ManagementHelper.HDR_BINDING_TYPE) + "<>" + BindingType.DIVERT.toInt() + " AND " + ((Object) ManagementHelper.HDR_NOTIFICATION_TYPE) + " IN ('" + CoreNotificationType.BINDING_ADDED + "','" + CoreNotificationType.BINDING_REMOVED + "','" + CoreNotificationType.CONSUMER_CREATED + "','" + CoreNotificationType.CONSUMER_CLOSED + "','" + CoreNotificationType.PROPOSAL + "','" + CoreNotificationType.PROPOSAL_RESPONSE + "','" + CoreNotificationType.UNPROPOSAL + "') AND " + ((Object) ManagementHelper.HDR_DISTANCE) + "<" + this.flowRecord.getMaxHops() + " AND (" + ((Object) ManagementHelper.HDR_ADDRESS) + " LIKE '" + this.flowRecord.getAddress() + "%')"));
            this.notifConsumer = this.session.createConsumer(simpleString);
            this.notifConsumer.setMessageHandler(this.flowRecord);
            this.session.start();
            ClientMessage createMessage = this.session.createMessage(false);
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace"));
            }
            ManagementHelper.putOperationInvocation(createMessage, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", simpleString.toString(), this.flowRecord.getAddress());
            ClientProducer createProducer = this.session.createProducer(this.managementAddress);
            if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                HornetQServerLogger.LOGGER.debug("Cluster connetion bridge on " + this.clusterConnection + " requesting information on queues");
            }
            createProducer.send(createMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.hornetq.core.server.cluster.impl.BridgeImpl
    public void afterConnect() throws Exception {
        super.afterConnect();
        setupNotificationConsumer();
    }

    @Override // org.hornetq.core.server.cluster.impl.BridgeImpl
    protected void tryScheduleRetryReconnect(HornetQExceptionType hornetQExceptionType) {
        if (hornetQExceptionType != HornetQExceptionType.DISCONNECTED) {
            scheduleRetryConnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.hornetq.core.server.cluster.impl.BridgeImpl
    public void fail(boolean z) {
        HornetQServerLogger.LOGGER.debug("Cluster Bridge " + ((Object) getName()) + " failed, permanently=" + z);
        super.fail(z);
        if (z) {
            HornetQServerLogger.LOGGER.debug("cluster node for bridge " + ((Object) getName()) + " is permanently down");
            this.discoveryLocator.notifyNodeDown(System.currentTimeMillis(), this.targetNodeID);
        }
    }

    @Override // org.hornetq.core.server.cluster.impl.BridgeImpl
    protected boolean isPlainCoreBridge() {
        return false;
    }
}
