package org.hornetq.core.protocol.proton;

import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.concurrent.Executor;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.jms.EncodedMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.protocol.proton.ProtonUtils;
import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPIllegalStateException;
import org.hornetq.core.server.HornetQMessageBundle;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.UUIDGenerator;

/* loaded from: input_file:org/hornetq/core/protocol/proton/ProtonProtocolManager.class */
public class ProtonProtocolManager implements ProtocolManager, NotificationListener {
    public static final EnumSet<EndpointState> UNINITIALIZED = EnumSet.of(EndpointState.UNINITIALIZED);
    public static final EnumSet<EndpointState> INITIALIZED = EnumSet.complementOf(UNINITIALIZED);
    public static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
    public static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
    public static final EnumSet<EndpointState> ANY_ENDPOINT_STATE = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
    private final HornetQServer server;

    public ProtonProtocolManager(HornetQServer hornetQServer) {
        this.server = hornetQServer;
    }

    @Override // org.hornetq.spi.core.protocol.ProtocolManager
    public ConnectionEntry createConnectionEntry(Acceptor acceptor, Connection connection) {
        return new ConnectionEntry(new ProtonRemotingConnection(acceptor, connection, this), (Executor) null, System.currentTimeMillis(), 60000L);
    }

    @Override // org.hornetq.spi.core.protocol.ProtocolManager
    public void removeHandler(String str) {
    }

    @Override // org.hornetq.spi.core.protocol.ProtocolManager
    public int isReadyToHandle(HornetQBuffer hornetQBuffer) {
        return -1;
    }

    @Override // org.hornetq.spi.core.protocol.ProtocolManager
    public void handleBuffer(RemotingConnection remotingConnection, HornetQBuffer hornetQBuffer) {
        ProtonRemotingConnection protonRemotingConnection = (ProtonRemotingConnection) remotingConnection;
        protonRemotingConnection.setDataReceived();
        byte[] bArr = new byte[hornetQBuffer.capacity()];
        hornetQBuffer.readBytes(bArr);
        protonRemotingConnection.handleFrame(bArr);
    }

    public void onNotification(Notification notification) {
    }

    public ServerMessageImpl createServerMessage() {
        return new ServerMessageImpl(this.server.getStorageManager().generateUniqueID(), 512);
    }

    public void handleMessage(final Receiver receiver, HornetQBuffer hornetQBuffer, final Delivery delivery, final ProtonRemotingConnection protonRemotingConnection, ProtonSession protonSession, String str) throws Exception {
        int recv;
        synchronized (protonRemotingConnection.getDeliveryLock()) {
            byte[] bArr = new byte[1024];
            while (true) {
                recv = receiver.recv(bArr, 0, bArr.length);
                if (recv <= 0) {
                    break;
                } else {
                    hornetQBuffer.writeBytes(bArr, 0, recv);
                }
            }
            if (recv == 0) {
                return;
            }
            receiver.advance();
            byte[] bArr2 = new byte[hornetQBuffer.readableBytes()];
            hornetQBuffer.readBytes(bArr2);
            hornetQBuffer.clear();
            ServerMessageImpl transform = ProtonUtils.INBOUND.transform(protonRemotingConnection, new EncodedMessage(delivery.getMessageFormat(), bArr2, 0, bArr2.length));
            if (str != null) {
                transform.setAddress(new SimpleString(str));
            }
            protonSession.getServerSession().send(transform, true);
            this.server.getStorageManager().afterCompleteOperations(new IOAsyncTask() { // from class: org.hornetq.core.protocol.proton.ProtonProtocolManager.1
                public void done() {
                    synchronized (protonRemotingConnection.getDeliveryLock()) {
                        receiver.flow(1);
                        delivery.settle();
                    }
                }

                public void onError(int i, String str2) {
                    receiver.setLocalError(new EndpointError("" + i, str2));
                }
            });
        }
    }

    public void handleDelivery(final Sender sender, byte[] bArr, EncodedMessage encodedMessage, ServerMessage serverMessage, ProtonRemotingConnection protonRemotingConnection, final boolean z) {
        synchronized (protonRemotingConnection.getDeliveryLock()) {
            final Delivery delivery = sender.delivery(bArr, 0, bArr.length);
            delivery.setContext(serverMessage);
            sender.send(encodedMessage.getArray(), 0, encodedMessage.getLength());
            this.server.getStorageManager().afterCompleteOperations(new IOAsyncTask() { // from class: org.hornetq.core.protocol.proton.ProtonProtocolManager.2
                public void done() {
                    if (!z) {
                        sender.advance();
                    } else {
                        delivery.settle();
                        sender.addCredit(1);
                    }
                }

                public void onError(int i, String str) {
                    sender.setLocalError(new EndpointError("" + i, str));
                }
            });
        }
        protonRemotingConnection.write();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleNewLink(Link link, ProtonSession protonSession) throws HornetQAMQPException {
        link.setSource(link.getRemoteSource());
        link.setTarget(link.getRemoteTarget());
        if (!(link instanceof Receiver)) {
            protonSession.initialise(false);
            Sender sender = (Sender) link;
            protonSession.addConsumer(sender);
            sender.offer(1);
            return;
        }
        Receiver receiver = (Receiver) link;
        if (link.getRemoteTarget() instanceof Coordinator) {
            protonSession.initialise(true);
            protonSession.addTransactionHandler((Coordinator) link.getRemoteTarget(), receiver);
        } else {
            protonSession.initialise(false);
            protonSession.addProducer(receiver);
            receiver.flow(100);
        }
    }

    public ProtonSession createSession(ProtonRemotingConnection protonRemotingConnection, TransportImpl transportImpl) throws HornetQAMQPException {
        return new ProtonSession(UUIDGenerator.getInstance().generateStringUUID(), protonRemotingConnection, this, this.server.getStorageManager().newContext(this.server.getExecutorFactory().getExecutor()), this.server, transportImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleActiveLink(Link link) throws HornetQAMQPException {
        link.setSource(link.getRemoteSource());
        link.setTarget(link.getRemoteTarget());
        ((ProtonDeliveryHandler) link.getContext()).checkState();
    }

    public void handleTransaction(Receiver receiver, HornetQBuffer hornetQBuffer, Delivery delivery, ProtonSession protonSession) throws HornetQAMQPIllegalStateException {
        int recv;
        byte[] bArr = new byte[1024];
        while (true) {
            recv = receiver.recv(bArr, 0, bArr.length);
            if (recv <= 0) {
                break;
            } else {
                hornetQBuffer.writeBytes(bArr, 0, recv);
            }
        }
        if (recv == 0) {
            return;
        }
        receiver.advance();
        byte[] bArr2 = new byte[hornetQBuffer.readableBytes()];
        hornetQBuffer.readBytes(bArr2);
        hornetQBuffer.clear();
        MessageImpl messageImpl = new MessageImpl();
        messageImpl.decode(bArr2, 0, bArr2.length);
        Object value = messageImpl.getBody().getValue();
        if (value instanceof Declare) {
            Transaction currentTransaction = protonSession.getServerSession().getCurrentTransaction();
            Declared declared = new Declared();
            declared.setTxnId(new Binary(longToBytes(currentTransaction.getID())));
            delivery.disposition(declared);
            delivery.settle();
            return;
        }
        if (value instanceof Discharge) {
            if (((Discharge) value).getFail().booleanValue()) {
                try {
                    protonSession.getServerSession().rollback(false);
                } catch (Exception e) {
                    throw HornetQMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                }
            } else {
                try {
                    protonSession.getServerSession().commit();
                } catch (Exception e2) {
                    throw HornetQMessageBundle.BUNDLE.errorCommittingCoordinator(e2.getMessage());
                }
            }
            delivery.settle();
        }
    }

    public byte[] longToBytes(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(j);
        return allocate.array();
    }
}
