package org.apache.qpid.proton.hawtdispatch.api;

import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.hawtdispatch.impl.Watch;
import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.apache.qpid.proton.message.impl.MessageFactoryImpl;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtdispatch.Task;

/* loaded from: input_file:org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.class */
public abstract class MessageDelivery extends WatchBase {
    private static final MessageFactoryImpl MESSAGE_FACTORY;
    final int initialSize;
    private Message message;
    private Buffer encoded;
    public Delivery delivery;
    private int sizeHint;
    boolean watchingRemoteStateChange;
    static final /* synthetic */ boolean $assertionsDisabled;

    static Buffer encode(Message message, int i) {
        byte[] bArr = new byte[i];
        int encode2 = ((ProtonJMessage) message).encode2(bArr, 0, i);
        if (encode2 > i) {
            bArr = new byte[encode2];
            encode2 = message.encode(bArr, 0, encode2);
        }
        return new Buffer(bArr, 0, encode2);
    }

    static Message decode(Buffer buffer) {
        ProtonJMessage createMessage = MESSAGE_FACTORY.createMessage();
        int i = buffer.offset;
        int i2 = buffer.length;
        while (true) {
            int i3 = i2;
            if (i3 <= 0) {
                return createMessage;
            }
            int decode = createMessage.decode(buffer.data, i, i3);
            if (!$assertionsDisabled && decode <= 0) {
                throw new AssertionError("Make progress decoding the message");
            }
            i += decode;
            i2 = i3 - decode;
        }
    }

    public MessageDelivery(Message message) {
        this(message, encode(message, 32));
    }

    public MessageDelivery(Buffer buffer) {
        this(null, buffer);
    }

    public MessageDelivery(Message message, Buffer buffer) {
        this.sizeHint = 32;
        this.message = message;
        this.encoded = buffer;
        this.sizeHint = this.encoded.length;
        this.initialSize = this.sizeHint;
    }

    public Message getMessage() {
        if (this.message == null) {
            this.message = decode(this.encoded);
        }
        return this.message;
    }

    public Buffer encoded() {
        if (this.encoded == null) {
            this.encoded = encode(this.message, this.sizeHint);
            this.sizeHint = this.encoded.length;
        }
        return this.encoded;
    }

    public boolean isSettled() {
        return this.delivery != null && this.delivery.isSettled();
    }

    public DeliveryState getRemoteState() {
        if (this.delivery == null) {
            return null;
        }
        return this.delivery.getRemoteState();
    }

    public DeliveryState getLocalState() {
        if (this.delivery == null) {
            return null;
        }
        return this.delivery.getLocalState();
    }

    public void onEncoded(final Callback<Void> callback) {
        addWatch(new Watch() { // from class: org.apache.qpid.proton.hawtdispatch.api.MessageDelivery.1
            @Override // org.apache.qpid.proton.hawtdispatch.impl.Watch
            public boolean execute() {
                if (MessageDelivery.this.delivery == null) {
                    return false;
                }
                callback.onSuccess(null);
                return true;
            }
        });
    }

    public DeliveryState getRemoteStateChange() throws Exception {
        AmqpEndpointBase.assertNotOnDispatchQueue();
        return getRemoteStateChangeFuture().await();
    }

    public Future<DeliveryState> getRemoteStateChangeFuture() {
        final Promise promise = new Promise();
        link().queue().execute(new Task() { // from class: org.apache.qpid.proton.hawtdispatch.api.MessageDelivery.2
            public void run() {
                MessageDelivery.this.onRemoteStateChange(promise);
            }
        });
        return promise;
    }

    abstract AmqpLink link();

    public void onRemoteStateChange(final Callback<DeliveryState> callback) {
        this.watchingRemoteStateChange = true;
        final DeliveryState remoteState = this.delivery.getRemoteState();
        addWatch(new Watch() { // from class: org.apache.qpid.proton.hawtdispatch.api.MessageDelivery.3
            @Override // org.apache.qpid.proton.hawtdispatch.impl.Watch
            public boolean execute() {
                if (remoteState == null) {
                    if (MessageDelivery.this.delivery.getRemoteState() == null) {
                        return false;
                    }
                    callback.onSuccess(MessageDelivery.this.delivery.getRemoteState());
                    MessageDelivery.this.watchingRemoteStateChange = false;
                    return true;
                }
                if (remoteState.equals(MessageDelivery.this.delivery.getRemoteState())) {
                    return false;
                }
                callback.onSuccess(MessageDelivery.this.delivery.getRemoteState());
                MessageDelivery.this.watchingRemoteStateChange = false;
                return true;
            }
        });
    }

    public DeliveryState getSettle() throws Exception {
        AmqpEndpointBase.assertNotOnDispatchQueue();
        return getSettleFuture().await();
    }

    public Future<DeliveryState> getSettleFuture() {
        final Promise promise = new Promise();
        link().queue().execute(new Task() { // from class: org.apache.qpid.proton.hawtdispatch.api.MessageDelivery.4
            public void run() {
                MessageDelivery.this.onSettle(promise);
            }
        });
        return promise;
    }

    public void onSettle(final Callback<DeliveryState> callback) {
        addWatch(new Watch() { // from class: org.apache.qpid.proton.hawtdispatch.api.MessageDelivery.5
            @Override // org.apache.qpid.proton.hawtdispatch.impl.Watch
            public boolean execute() {
                if (MessageDelivery.this.delivery == null || !MessageDelivery.this.delivery.isSettled()) {
                    return false;
                }
                callback.onSuccess(MessageDelivery.this.delivery.getRemoteState());
                return true;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.proton.hawtdispatch.impl.WatchBase
    public void fireWatches() {
        super.fireWatches();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void incrementDeliveryCount() {
        Message message = getMessage();
        message.setDeliveryCount(message.getDeliveryCount() + 1);
        this.encoded = null;
    }

    public void redeliver(boolean z) {
        if (z) {
            incrementDeliveryCount();
        }
    }

    public void settle() {
        if (this.delivery.isSettled()) {
            return;
        }
        this.delivery.settle();
    }

    static {
        $assertionsDisabled = !MessageDelivery.class.desiredAssertionStatus();
        MESSAGE_FACTORY = new MessageFactoryImpl();
    }
}
