package org.proton.plug.context.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonReceiverContext;
import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.util.DeliveryUtil;

/* loaded from: input_file:org/proton/plug/context/client/ProtonClientReceiverContext.class */
public class ProtonClientReceiverContext extends AbstractProtonReceiverContext implements AMQPClientReceiverContext {
    LinkedBlockingDeque<MessageImpl> queues;

    public ProtonClientReceiverContext(AMQPSessionCallback aMQPSessionCallback, AbstractConnectionContext abstractConnectionContext, AbstractProtonSessionContext abstractProtonSessionContext, Receiver receiver) {
        super(aMQPSessionCallback, abstractConnectionContext, abstractProtonSessionContext, receiver);
        this.queues = new LinkedBlockingDeque<>();
    }

    @Override // org.proton.plug.context.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
    }

    @Override // org.proton.plug.context.ProtonDeliveryHandler
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        ByteBuf heapBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
        try {
            synchronized (this.connection.getLock()) {
                DeliveryUtil.readDelivery(this.receiver, heapBuffer);
                MessageImpl decodeMessageImpl = DeliveryUtil.decodeMessageImpl(heapBuffer);
                this.receiver.advance();
                delivery.disposition(Accepted.getInstance());
                this.queues.add(decodeMessageImpl);
            }
        } finally {
            heapBuffer.release();
        }
    }

    @Override // org.proton.plug.AMQPClientReceiverContext
    public ProtonJMessage receiveMessage(int i, TimeUnit timeUnit) throws Exception {
        return this.queues.poll(i, timeUnit);
    }
}
