/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.seda;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.component.seda.SedaEndpoint;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ExchangeHelper;

public class SedaProducer
extends DefaultAsyncProducer {
    protected final BlockingQueue<Exchange> queue;
    private final SedaEndpoint endpoint;
    private final WaitForTaskToComplete waitForTaskToComplete;
    private final long timeout;
    private final boolean blockWhenFull;

    @Deprecated
    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) {
        this(endpoint, queue, waitForTaskToComplete, timeout, false);
    }

    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull) {
        super(endpoint);
        this.queue = queue;
        this.endpoint = endpoint;
        this.waitForTaskToComplete = waitForTaskToComplete;
        this.timeout = timeout;
        this.blockWhenFull = blockWhenFull;
    }

    @Override
    public boolean process(final Exchange exchange, AsyncCallback callback) {
        WaitForTaskToComplete wait = this.waitForTaskToComplete;
        if (exchange.getProperty("CamelAsyncWait") != null) {
            wait = exchange.getProperty("CamelAsyncWait", WaitForTaskToComplete.class);
        }
        if (wait == WaitForTaskToComplete.Always || wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange)) {
            Exchange copy = this.prepareCopy(exchange, false);
            final CountDownLatch latch = new CountDownLatch(1);
            copy.addOnCompletion(new SynchronizationAdapter(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onDone(Exchange response) {
                    if (latch.getCount() == 0L) {
                        if (SedaProducer.this.log.isTraceEnabled()) {
                            SedaProducer.this.log.trace("{}. Timeout occurred so response will be ignored: {}", this, (Object)(response.hasOut() ? response.getOut() : response.getIn()));
                        }
                        return;
                    }
                    if (SedaProducer.this.log.isTraceEnabled()) {
                        SedaProducer.this.log.trace("{} with response: {}", this, (Object)(response.hasOut() ? response.getOut() : response.getIn()));
                    }
                    try {
                        ExchangeHelper.copyResults(exchange, response);
                    }
                    finally {
                        latch.countDown();
                    }
                }

                @Override
                public boolean allowHandover() {
                    return false;
                }

                public String toString() {
                    return "onDone at [" + SedaProducer.this.endpoint.getEndpointUri() + "]";
                }
            });
            this.log.trace("Adding Exchange to queue: {}", copy);
            this.addToQueue(copy);
            if (this.timeout > 0L) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Waiting for task to complete using timeout (ms): {} at [{}]", this.timeout, (Object)this.endpoint.getEndpointUri());
                }
                boolean done = false;
                try {
                    done = latch.await(this.timeout, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                if (!done) {
                    exchange.setException(new ExchangeTimedOutException(exchange, this.timeout));
                    latch.countDown();
                    this.queue.remove(copy);
                }
            } else {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Waiting for task to complete (blocking) at [{}]", (Object)this.endpoint.getEndpointUri());
                }
                try {
                    latch.await();
                }
                catch (InterruptedException e) {}
            }
        } else {
            Exchange copy = this.prepareCopy(exchange, true);
            this.log.trace("Adding Exchange to queue: {}", copy);
            this.addToQueue(copy);
        }
        callback.done(true);
        return true;
    }

    protected Exchange prepareCopy(Exchange exchange, boolean handover) {
        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover);
        copy.setFromEndpoint(this.endpoint);
        return copy;
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        this.endpoint.onStarted(this);
    }

    @Override
    protected void doStop() throws Exception {
        this.endpoint.onStopped(this);
        super.doStop();
    }

    protected void addToQueue(Exchange exchange) {
        if (this.blockWhenFull) {
            try {
                this.queue.put(exchange);
            }
            catch (InterruptedException e) {
                this.log.debug("Put interrupted, are we stopping? {}", this.isStopping() || this.isStopped());
            }
        } else {
            this.queue.add(exchange);
        }
    }
}

