/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.proton.hawtdispatch.api;

import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.SenderImpl;
import org.apache.qpid.proton.hawtdispatch.api.AmqpLink;
import org.apache.qpid.proton.hawtdispatch.api.AmqpSession;
import org.apache.qpid.proton.hawtdispatch.api.Callback;
import org.apache.qpid.proton.hawtdispatch.api.MessageDelivery;
import org.apache.qpid.proton.hawtdispatch.api.QoS;
import org.apache.qpid.proton.hawtdispatch.impl.Defer;
import org.apache.qpid.proton.hawtdispatch.impl.Watch;
import org.apache.qpid.proton.message.Message;
import org.fusesource.hawtbuf.Buffer;

public class AmqpSender
extends AmqpLink {
    private byte[] EMPTY_BYTE_ARRAY = new byte[0];
    long nextTagId = 0L;
    HashSet<byte[]> tagCache = new HashSet();
    final AmqpSession parent;
    private final QoS qos;
    final SenderImpl sender;
    final LinkedList<MessageDelivery> outbound = new LinkedList();
    long outboundBufferSize;
    Buffer currentBuffer;
    DeliveryImpl currentDelivery;
    Defer deferedPumpDeliveries = new Defer(){

        public void run() {
            AmqpSender.this.pumpDeliveries();
        }
    };

    public AmqpSender(AmqpSession parent, SenderImpl sender, QoS qos) {
        this.parent = parent;
        this.sender = sender;
        this.qos = qos;
        this.attach();
        this.getConnection().senders.add(this);
    }

    @Override
    public void close() {
        super.close();
        this.getConnection().senders.remove(this);
    }

    protected SenderImpl getEndpoint() {
        return this.sender;
    }

    @Override
    protected AmqpSession getParent() {
        return this.parent;
    }

    public MessageDelivery send(Message message) {
        this.assertExecuting();
        MessageDelivery rc = new MessageDelivery(message){

            @Override
            AmqpLink link() {
                return AmqpSender.this;
            }

            @Override
            public void redeliver(boolean incrementDeliveryCounter) {
                super.redeliver(incrementDeliveryCounter);
                AmqpSender.this.outbound.add(this);
                AmqpSender.this.outboundBufferSize += (long)this.initialSize;
                AmqpSender.this.defer(AmqpSender.this.deferedPumpDeliveries);
            }
        };
        this.outbound.add(rc);
        this.outboundBufferSize += (long)rc.initialSize;
        this.pumpDeliveries();
        this.pumpOut();
        return rc;
    }

    public long getOverflowBufferSize() {
        return this.outboundBufferSize;
    }

    protected void pumpDeliveries() {
        this.assertExecuting();
        try {
            while (true) {
                if (this.currentBuffer != null) {
                    if (this.sender.getCredit() > 0) {
                        int sent = this.sender.send(this.currentBuffer.data, this.currentBuffer.offset, this.currentBuffer.length);
                        this.currentBuffer.moveHead(sent);
                        if (this.currentBuffer.length != 0) continue;
                        DeliveryImpl current = this.currentDelivery;
                        MessageDelivery md = (MessageDelivery)current.getContext();
                        this.currentBuffer = null;
                        this.currentDelivery = null;
                        if (this.qos == QoS.AT_MOST_ONCE) {
                            current.settle();
                        } else {
                            this.sender.advance();
                        }
                        md.fireWatches();
                        continue;
                    }
                    return;
                }
                if (this.outbound.isEmpty()) {
                    return;
                }
                MessageDelivery md = this.outbound.removeFirst();
                this.outboundBufferSize -= (long)md.initialSize;
                this.currentBuffer = md.encoded();
                if (this.qos == QoS.AT_MOST_ONCE) {
                    this.currentDelivery = this.sender.delivery(this.EMPTY_BYTE_ARRAY, 0, 0);
                } else {
                    byte[] tag = this.nextTag();
                    this.currentDelivery = this.sender.delivery(tag, 0, tag.length);
                }
                md.delivery = this.currentDelivery;
                this.currentDelivery.setContext((Object)md);
            }
        }
        finally {
            this.fireWatches();
        }
    }

    @Override
    protected void processDelivery(Delivery delivery) {
        MessageDelivery md = (MessageDelivery)delivery.getContext();
        if (delivery.remotelySettled()) {
            Modified modified;
            DeliveryState state;
            if (delivery.getTag().length > 0) {
                this.checkinTag(delivery.getTag());
            }
            if ((state = delivery.getRemoteState()) == null || state instanceof Accepted) {
                if (!delivery.remotelySettled()) {
                    delivery.disposition((DeliveryState)new Accepted());
                }
            } else if (state instanceof Rejected) {
                md.delivery = null;
                md.incrementDeliveryCount();
                this.outbound.addLast(md);
            } else if (state instanceof Released) {
                md.delivery = null;
                this.outbound.addLast(md);
            } else if (state instanceof Modified && (modified = (Modified)state).getDeliveryFailed().booleanValue()) {
                md.incrementDeliveryCount();
            }
            delivery.settle();
        }
        md.fireWatches();
    }

    byte[] nextTag() {
        byte[] rc;
        if (this.tagCache != null && !this.tagCache.isEmpty()) {
            Iterator<byte[]> iterator = this.tagCache.iterator();
            rc = iterator.next();
            iterator.remove();
        } else {
            try {
                rc = Long.toHexString(this.nextTagId++).getBytes("UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
        return rc;
    }

    void checkinTag(byte[] data) {
        if (this.tagCache.size() < 1024) {
            this.tagCache.add(data);
        }
    }

    public void onOverflowBufferDrained(final Callback<Void> cb) {
        this.addWatch(new Watch(){

            @Override
            public boolean execute() {
                if (AmqpSender.this.outboundBufferSize == 0L) {
                    cb.onSuccess(null);
                    return true;
                }
                return false;
            }
        });
    }
}

