/*
 * Decompiled with CFR 0.152.
 */
package org.apache.servicemix.eip.patterns;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import org.apache.servicemix.eip.support.resequence.DefaultComparator;
import org.apache.servicemix.eip.support.resequence.ResequencerBase;
import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
import org.apache.servicemix.eip.support.resequence.SequenceReader;
import org.apache.servicemix.eip.support.resequence.SequenceSender;
import org.apache.servicemix.executors.Executor;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class Resequencer
extends ResequencerBase
implements SequenceSender {
    private ResequencerEngine<MessageExchange> reseq;
    private SequenceReader reader = new SequenceReader(this);
    private Executor executor;
    private int capacity;
    private long timeout;
    private SequenceElementComparator<MessageExchange> comparator = new DefaultComparator();

    public void setCapacity(int capacity) {
        this.capacity = capacity;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setComparator(SequenceElementComparator<MessageExchange> comparator) {
        this.comparator = comparator;
    }

    @Override
    public void start() throws Exception {
        super.start();
        if (this.executor == null) {
            this.executor = this.getServiceUnit().getComponent().getExecutor();
        }
        LinkedBlockingQueue<MessageExchange> queue = new LinkedBlockingQueue<MessageExchange>();
        this.reseq = new ResequencerEngine<MessageExchange>(this.comparator, this.capacity);
        this.reseq.setTimeout(this.timeout);
        this.reseq.setOutQueue(queue);
        this.reader.setQueue(queue);
        this.reader.start(this.executor);
    }

    @Override
    public void stop() throws Exception {
        this.reseq.stop();
        this.reader.stop();
        super.stop();
    }

    @Override
    public void sendSync(MessageExchange exchange) throws MessagingException {
        super.sendSync(exchange);
    }

    @Override
    public void sendSync(List<MessageExchange> exchanges) throws MessagingException {
        for (MessageExchange exchange : exchanges) {
            this.sendSync(exchange);
        }
    }

    @Override
    protected void processSync(MessageExchange exchange) throws Exception {
        this.fail(exchange, new UnsupportedOperationException("synchronous resequencing not supported"));
    }

    @Override
    protected void processAsync(MessageExchange exchange) throws Exception {
        this.validateMessageExchange(exchange);
        if (exchange.getStatus() == ExchangeStatus.DONE) {
            return;
        }
        if (exchange.getStatus() == ExchangeStatus.ERROR) {
            return;
        }
        if (exchange.getFault() != null) {
            this.done(exchange);
            return;
        }
        this.processMessage(exchange);
        this.done(exchange);
    }

    private void processMessage(MessageExchange sourceExchange) throws MessagingException, InterruptedException {
        NormalizedMessage source = sourceExchange.getMessage("in");
        NormalizedMessage copy = this.getMessageCopier().copy(source);
        MessageExchange targetExchange = this.createTargetExchange(copy, sourceExchange.getPattern());
        this.reseq.put(targetExchange);
    }
}

