package org.apache.activemq.partition;

import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/partition/PartitionBroker.class */
public class PartitionBroker extends BrokerFilter {
    protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
    protected final PartitionBrokerPlugin plugin;
    protected boolean reloadConfigOnPoll;
    protected final ConcurrentHashMap<ConnectionId, ConnectionMonitor> monitors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/partition/PartitionBroker$ConnectionMonitor.class */
    public static class ConnectionMonitor {
        final ConnectionContext context;
        LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<>();

        public ConnectionMonitor(ConnectionContext connectionContext) {
            this.context = connectionContext;
        }

        public synchronized Target findBestProducerTarget(PartitionBroker partitionBroker) {
            Target target = null;
            long j = 0;
            for (Map.Entry entry : this.trafficPerDestination.entrySet()) {
                Traffic traffic = (Traffic) entry.getValue();
                if (traffic.messages >= partitionBroker.plugin.getMinTransferCount() && traffic.bytes > j) {
                    j = traffic.bytes;
                    Target target2 = partitionBroker.getTarget((ActiveMQDestination) entry.getKey());
                    if (target2 != null) {
                        target = target2;
                    }
                }
            }
            return target;
        }

        public synchronized void onSend(ProducerBrokerExchange producerBrokerExchange, Message message) {
            ActiveMQDestination destination = message.getDestination();
            Traffic traffic = (Traffic) this.trafficPerDestination.get(destination);
            if (traffic == null) {
                traffic = new Traffic();
                this.trafficPerDestination.put(destination, traffic);
            }
            traffic.messages++;
            traffic.bytes += message.getSize();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/partition/PartitionBroker$Score.class */
    public static class Score {
        int value;

        private Score() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/partition/PartitionBroker$Traffic.class */
    public static class Traffic {
        long messages;
        long bytes;

        Traffic() {
        }
    }

    public PartitionBroker(Broker broker, PartitionBrokerPlugin partitionBrokerPlugin) {
        super(broker);
        this.reloadConfigOnPoll = true;
        this.monitors = new ConcurrentHashMap<>();
        this.plugin = partitionBrokerPlugin;
    }

    public void start() throws Exception {
        super.start();
        getExecutor().execute(new Runnable() { // from class: org.apache.activemq.partition.PartitionBroker.1
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setName("Partition Monitor");
                PartitionBroker.this.onMonitorStart();
                try {
                    PartitionBroker.this.runPartitionMonitor();
                } catch (Exception e) {
                    PartitionBroker.this.onMonitorStop();
                }
            }
        });
    }

    protected void onMonitorStart() {
    }

    protected void onMonitorStop() {
    }

    protected void runPartitionMonitor() {
        while (!isStopped()) {
            try {
                monitorWait();
                if (this.reloadConfigOnPoll) {
                    try {
                        reloadConfiguration();
                    } catch (Exception e) {
                    }
                }
                Iterator<ConnectionMonitor> it = this.monitors.values().iterator();
                while (it.hasNext()) {
                    checkTarget(it.next());
                }
            } catch (InterruptedException e2) {
                return;
            }
        }
    }

    protected void monitorWait() throws InterruptedException {
        synchronized (this) {
            wait(1000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void monitorWakeup() {
        synchronized (this) {
            notifyAll();
        }
    }

    protected void reloadConfiguration() throws Exception {
    }

    protected void checkTarget(ConnectionMonitor connectionMonitor) {
        Target pickBestBroker = pickBestBroker(connectionMonitor);
        if (pickBestBroker == null || pickBestBroker.ids == null) {
            LOG.debug("No partition target found for connection: " + connectionMonitor.context.getConnectionId());
            return;
        }
        if (pickBestBroker.ids.contains(getBrokerName())) {
            LOG.debug("We are a partition target for connection: " + connectionMonitor.context.getConnectionId());
            return;
        }
        String connectionString = getConnectionString(pickBestBroker.ids);
        if (connectionString == null) {
            LOG.debug("Could not convert to partition targets to connection string: " + pickBestBroker.ids);
        }
        LOG.info("Redirecting connection to: " + connectionString);
        TransportConnection connection = connectionMonitor.context.getConnection();
        ConnectionControl connectionControl = new ConnectionControl();
        connectionControl.setConnectedBrokers(connectionString);
        connectionControl.setRebalanceConnection(true);
        connection.dispatchAsync(connectionControl);
    }

    protected String getConnectionString(HashSet<String> hashSet) {
        if (getConfig().brokers == null || getConfig().brokers.isEmpty()) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = hashSet.iterator();
        while (it.hasNext()) {
            String str = getConfig().brokers.get(it.next());
            if (str != null) {
                if (sb.length() != 0) {
                    sb.append(',');
                }
                sb.append(str);
            }
        }
        return sb.toString();
    }

    protected Target pickBestBroker(ConnectionMonitor connectionMonitor) {
        String clientId;
        Target target;
        String userName;
        Target target2;
        Socket socket;
        if (getConfig() == null) {
            return null;
        }
        if (getConfig().bySourceIp != null && !getConfig().bySourceIp.isEmpty() && (socket = (Socket) connectionMonitor.context.getConnection().getTransport().narrow(Socket.class)) != null) {
            SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
            if (remoteSocketAddress instanceof InetSocketAddress) {
                Target target3 = getConfig().bySourceIp.get(((InetSocketAddress) remoteSocketAddress).getAddress().getHostAddress());
                if (target3 != null) {
                    return target3;
                }
            }
        }
        if (getConfig().byUserName != null && !getConfig().byUserName.isEmpty() && (userName = connectionMonitor.context.getUserName()) != null && (target2 = getConfig().byUserName.get(userName)) != null) {
            return target2;
        }
        if (getConfig().byClientId != null && !getConfig().byClientId.isEmpty() && (clientId = connectionMonitor.context.getClientId()) != null && (target = getConfig().byClientId.get(clientId)) != null) {
            return target;
        }
        if ((getConfig().byQueue == null || getConfig().byQueue.isEmpty()) && (getConfig().byTopic == null || getConfig().byTopic.isEmpty())) {
            return null;
        }
        HashSet hashSet = new HashSet();
        Iterator it = connectionMonitor.context.getConnectionState().getSessionStates().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((SessionState) it.next()).getConsumerStates().iterator();
            while (it2.hasNext()) {
                ActiveMQDestination destination = ((ConsumerState) it2.next()).getInfo().getDestination();
                if (destination.isComposite()) {
                    hashSet.addAll(Arrays.asList(destination.getCompositeDestinations()));
                } else {
                    hashSet.addAll(Collections.singletonList(destination));
                }
            }
        }
        HashMap hashMap = new HashMap();
        Iterator it3 = hashSet.iterator();
        while (it3.hasNext()) {
            Target target4 = getTarget((ActiveMQDestination) it3.next());
            if (target4 != null) {
                Score score = (Score) hashMap.get(target4);
                if (score == null) {
                    score = new Score();
                    hashMap.put(target4, score);
                }
                score.value++;
            }
        }
        if (hashMap.isEmpty()) {
            Target findBestProducerTarget = connectionMonitor.findBestProducerTarget(this);
            if (findBestProducerTarget != null) {
                return findBestProducerTarget;
            }
            return null;
        }
        Target target5 = null;
        for (Map.Entry entry : hashMap.entrySet()) {
            if (((Score) entry.getValue()).value > 0) {
                target5 = (Target) entry.getKey();
            }
        }
        return target5;
    }

    protected Target getTarget(ActiveMQDestination activeMQDestination) {
        Partitioning config = getConfig();
        if (activeMQDestination.isQueue() && config.byQueue != null && !config.byQueue.isEmpty()) {
            return config.byQueue.get(activeMQDestination.getPhysicalName());
        }
        if (!activeMQDestination.isTopic() || config.byTopic == null || config.byTopic.isEmpty()) {
            return null;
        }
        return config.byTopic.get(activeMQDestination.getPhysicalName());
    }

    public void addConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo) throws Exception {
        ConnectionMonitor connectionMonitor = new ConnectionMonitor(connectionContext);
        this.monitors.put(connectionInfo.getConnectionId(), connectionMonitor);
        super.addConnection(connectionContext, connectionInfo);
        checkTarget(connectionMonitor);
    }

    public void removeConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo, Throwable th) throws Exception {
        super.removeConnection(connectionContext, connectionInfo, th);
        this.monitors.remove(connectionInfo.getConnectionId());
    }

    public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
        ConnectionMonitor connectionMonitor = this.monitors.get(producerBrokerExchange.getConnectionContext().getConnectionId());
        if (connectionMonitor != null) {
            connectionMonitor.onSend(producerBrokerExchange, message);
        }
    }

    protected Partitioning getConfig() {
        return this.plugin.getConfig();
    }
}
