/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.proton.hawtdispatch.api;

import java.util.LinkedList;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.ReceiverImpl;
import org.apache.qpid.proton.hawtdispatch.api.AmqpDeliveryListener;
import org.apache.qpid.proton.hawtdispatch.api.AmqpLink;
import org.apache.qpid.proton.hawtdispatch.api.AmqpSession;
import org.apache.qpid.proton.hawtdispatch.api.MessageDelivery;
import org.apache.qpid.proton.hawtdispatch.api.QoS;
import org.apache.qpid.proton.hawtdispatch.impl.Defer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;

public class AmqpReceiver
extends AmqpLink {
    final AmqpSession parent;
    final ReceiverImpl receiver;
    ByteArrayOutputStream current = new ByteArrayOutputStream();
    LinkedList<MessageDelivery> inbound = new LinkedList();
    Defer deferedDrain = new Defer(){

        public void run() {
            AmqpReceiver.this.drainInbound();
        }
    };
    int resumed = 0;
    AmqpDeliveryListener deliveryListener;

    public AmqpReceiver(AmqpSession parent, ReceiverImpl receiver, QoS qos) {
        this.parent = parent;
        this.receiver = receiver;
        this.attach();
    }

    protected ReceiverImpl getEndpoint() {
        return this.receiver;
    }

    @Override
    protected AmqpSession getParent() {
        return this.parent;
    }

    @Override
    protected void processDelivery(Delivery delivery) {
        int count;
        if (!delivery.isReadable()) {
            System.out.println("it was not readable!");
            return;
        }
        if (this.current == null) {
            this.current = new ByteArrayOutputStream();
        }
        byte[] data = new byte[4096];
        while ((count = this.receiver.recv(data, 0, data.length)) > 0) {
            this.current.write(data, 0, count);
        }
        if (count == 0) {
            return;
        }
        this.receiver.advance();
        Buffer buffer = this.current.toBuffer();
        this.current = null;
        this.onMessage(delivery, buffer);
    }

    protected void onMessage(Delivery delivery, Buffer buffer) {
        MessageDelivery md = new MessageDelivery(buffer){

            @Override
            AmqpLink link() {
                return AmqpReceiver.this;
            }

            @Override
            public void settle() {
                if (!this.delivery.isSettled()) {
                    this.delivery.disposition((DeliveryState)new Accepted());
                    this.delivery.settle();
                }
                AmqpReceiver.this.drain();
            }
        };
        md.delivery = (DeliveryImpl)delivery;
        delivery.setContext((Object)md);
        this.inbound.add(md);
        this.drainInbound();
    }

    public void drain() {
        this.defer(this.deferedDrain);
    }

    public void resume() {
        ++this.resumed;
    }

    public void suspend() {
        --this.resumed;
    }

    private void drainInbound() {
        while (this.deliveryListener != null && !this.inbound.isEmpty() && this.resumed > 0) {
            this.deliveryListener.onMessageDelivery(this.inbound.removeFirst());
            this.receiver.flow(1);
        }
    }

    public AmqpDeliveryListener getDeliveryListener() {
        return this.deliveryListener;
    }

    public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
        this.deliveryListener = deliveryListener;
        this.drainInbound();
    }
}

