package org.apache.qpid;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.qpid.transport.Acquired;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.ExchangeBind;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquire;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageFlow;
import org.apache.qpid.transport.MessageFlush;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageRejectCode;
import org.apache.qpid.transport.MessageSubscribe;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.QueueDeclare;
import org.apache.qpid.transport.QueueQuery;
import org.apache.qpid.transport.QueueQueryResult;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.ServerDelegate;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.apache.qpid.transport.network.mina.MinaHandler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/qpid/ToyBroker.class */
public class ToyBroker extends SessionDelegate {
    private ToyExchange exchange;
    private Map<String, Consumer> consumers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/ToyBroker$Consumer.class */
    public class Consumer {
        long _credit;
        String _queueName;

        private Consumer() {
        }
    }

    public ToyBroker(ToyExchange toyExchange) {
        this.exchange = toyExchange;
    }

    @Override // org.apache.qpid.transport.MethodDelegate
    public void messageAcquire(Session session, MessageAcquire messageAcquire) {
        System.out.println("\n==================> messageAcquire ");
        session.executionResult(messageAcquire.getId(), new Acquired(messageAcquire.getTransfers()), new Option[0]);
    }

    @Override // org.apache.qpid.transport.MethodDelegate
    public void queueDeclare(Session session, QueueDeclare queueDeclare) {
        this.exchange.createQueue(queueDeclare.getQueue());
        System.out.println("\n==================> declared queue: " + queueDeclare.getQueue() + "\n");
    }

    @Override // org.apache.qpid.transport.MethodDelegate
    public void exchangeBind(Session session, ExchangeBind exchangeBind) {
        this.exchange.bindQueue(exchangeBind.getExchange(), exchangeBind.getBindingKey(), exchangeBind.getQueue());
        System.out.println("\n==================> bound queue: " + exchangeBind.getQueue() + " with binding key " + exchangeBind.getBindingKey() + "\n");
    }

    @Override // org.apache.qpid.transport.MethodDelegate
    public void queueQuery(Session session, QueueQuery queueQuery) {
        session.executionResult(queueQuery.getId(), new QueueQueryResult().queue(queueQuery.getQueue()), new Option[0]);
    }

    @Override // org.apache.qpid.transport.MethodDelegate
    public void messageSubscribe(Session session, MessageSubscribe messageSubscribe) {
        Consumer consumer = new Consumer();
        consumer._queueName = messageSubscribe.getQueue();
        this.consumers.put(messageSubscribe.getDestination(), consumer);
        System.out.println("\n==================> message subscribe : " + messageSubscribe.getDestination() + " queue: " + messageSubscribe.getQueue() + "\n");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.qpid.transport.SessionDelegate, org.apache.qpid.transport.MethodDelegate
    public void messageFlow(Session session, MessageFlow messageFlow) {
        this.consumers.get(messageFlow.getDestination())._credit = messageFlow.getValue();
        System.out.println("\n==================> message flow : " + messageFlow.getDestination() + " credit: " + messageFlow.getValue() + "\n");
    }

    @Override // org.apache.qpid.transport.MethodDelegate
    public void messageFlush(Session session, MessageFlush messageFlush) {
        System.out.println("\n==================> message flush for consumer : " + messageFlush.getDestination() + "\n");
        checkAndSendMessagesToConsumer(session, messageFlush.getDestination());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.qpid.transport.SessionDelegate, org.apache.qpid.transport.MethodDelegate
    public void messageTransfer(Session session, MessageTransfer messageTransfer) {
        String destination = messageTransfer.getDestination();
        System.out.println("received transfer " + destination);
        Header header = messageTransfer.getHeader();
        DeliveryProperties deliveryProperties = (DeliveryProperties) header.get(DeliveryProperties.class);
        if (deliveryProperties != null) {
            System.out.println("received headers routing_key " + deliveryProperties.getRoutingKey());
        }
        MessageProperties messageProperties = (MessageProperties) header.get(MessageProperties.class);
        System.out.println("MP: " + messageProperties);
        if (messageProperties != null) {
            System.out.println(messageProperties.getApplicationHeaders());
        }
        if (this.exchange.route(destination, deliveryProperties.getRoutingKey(), messageTransfer)) {
            System.out.println("queued " + messageTransfer);
            dispatchMessages(session);
        } else if (deliveryProperties == null || !deliveryProperties.getDiscardUnroutable()) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add(messageTransfer.getId());
            session.messageReject(rangeSet, MessageRejectCode.UNROUTABLE, "no such destination", new Option[0]);
        }
        session.processed(messageTransfer);
    }

    private void transferMessageToPeer(Session session, String str, MessageTransfer messageTransfer) {
        System.out.println("\n==================> Transfering message to: " + str + "\n");
        session.messageTransfer(messageTransfer.getDestination(), MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, messageTransfer.getHeader(), messageTransfer.getBody(), new Option[0]);
    }

    private void dispatchMessages(Session session) {
        Iterator<String> it = this.consumers.keySet().iterator();
        while (it.hasNext()) {
            checkAndSendMessagesToConsumer(session, it.next());
        }
    }

    private void checkAndSendMessagesToConsumer(Session session, String str) {
        Consumer consumer = this.consumers.get(str);
        LinkedBlockingQueue<MessageTransfer> queue = this.exchange.getQueue(consumer._queueName);
        MessageTransfer poll = queue.poll();
        while (true) {
            MessageTransfer messageTransfer = poll;
            if (messageTransfer == null || consumer._credit <= 0) {
                return;
            }
            transferMessageToPeer(session, str, messageTransfer);
            consumer._credit--;
            poll = queue.poll();
        }
    }

    public static final void main(String[] strArr) throws IOException {
        final ToyExchange toyExchange = new ToyExchange();
        MinaHandler.accept("0.0.0.0", 5672, new ServerDelegate() { // from class: org.apache.qpid.ToyBroker.1
            public SessionDelegate getSessionDelegate() {
                return new ToyBroker(ToyExchange.this);
            }
        });
    }
}
