package org.apache.activemq.camel.component;

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-camel-5.5.1-fuse-00-08.jar:org/apache/activemq/camel/component/JournalEndpoint.class */
public class JournalEndpoint extends DefaultEndpoint {
    private static final transient Logger LOG = LoggerFactory.getLogger(JournalEndpoint.class);
    private final File directory;
    private final AtomicReference<DefaultConsumer> consumer;
    private final Object activationMutex;
    private int referenceCount;
    private AsyncDataManager dataManager;
    private Thread thread;
    private Location lastReadLocation;
    private long idleDelay;
    private boolean syncProduce;
    private boolean syncConsume;

    public JournalEndpoint(String str, JournalComponent journalComponent, File file) {
        super(str, journalComponent.getCamelContext());
        this.consumer = new AtomicReference<>();
        this.activationMutex = new Object();
        this.idleDelay = 1000L;
        this.syncProduce = true;
        this.directory = file;
    }

    public JournalEndpoint(String str, File file) {
        super(str);
        this.consumer = new AtomicReference<>();
        this.activationMutex = new Object();
        this.idleDelay = 1000L;
        this.syncProduce = true;
        this.directory = file;
    }

    @Override // org.apache.camel.IsSingleton
    public boolean isSingleton() {
        return true;
    }

    public File getDirectory() {
        return this.directory;
    }

    @Override // org.apache.camel.Endpoint
    public Consumer createConsumer(Processor processor) throws Exception {
        return new DefaultConsumer(this, processor) { // from class: org.apache.activemq.camel.component.JournalEndpoint.1
            @Override // org.apache.camel.impl.ServiceSupport, org.apache.camel.Service
            public void start() throws Exception {
                super.start();
                JournalEndpoint.this.activateConsumer(this);
            }

            @Override // org.apache.camel.impl.ServiceSupport, org.apache.camel.Service
            public void stop() throws Exception {
                JournalEndpoint.this.deactivateConsumer(this);
                super.stop();
            }
        };
    }

    protected void decrementReference() throws IOException {
        synchronized (this.activationMutex) {
            this.referenceCount--;
            if (this.referenceCount == 0) {
                LOG.debug("Closing data manager: " + this.directory);
                LOG.debug("Last mark at: " + this.lastReadLocation);
                this.dataManager.close();
                this.dataManager = null;
            }
        }
    }

    protected void incrementReference() throws IOException {
        synchronized (this.activationMutex) {
            this.referenceCount++;
            if (this.referenceCount == 1) {
                LOG.debug("Opening data manager: " + this.directory);
                this.dataManager = new AsyncDataManager();
                this.dataManager.setDirectory(this.directory);
                this.dataManager.start();
                this.lastReadLocation = this.dataManager.getMark();
                LOG.debug("Last mark at: " + this.lastReadLocation);
            }
        }
    }

    protected void deactivateConsumer(DefaultConsumer defaultConsumer) throws IOException {
        synchronized (this.activationMutex) {
            if (this.consumer.get() != defaultConsumer) {
                throw new RuntimeCamelException("Consumer was not active.");
            }
            this.consumer.set(null);
            try {
                this.thread.join();
                decrementReference();
            } catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
        }
    }

    protected void activateConsumer(DefaultConsumer defaultConsumer) throws IOException {
        synchronized (this.activationMutex) {
            if (this.consumer.get() != null) {
                throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
            }
            incrementReference();
            this.consumer.set(defaultConsumer);
            this.thread = new Thread() { // from class: org.apache.activemq.camel.component.JournalEndpoint.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    JournalEndpoint.this.dispatchToConsumer();
                }
            };
            this.thread.setName("Dipatch thread: " + getEndpointUri());
            this.thread.setDaemon(true);
            this.thread.start();
        }
    }

    protected void dispatchToConsumer() {
        while (true) {
            try {
                DefaultConsumer defaultConsumer = this.consumer.get();
                if (defaultConsumer == null) {
                    return;
                }
                Location nextLocation = this.dataManager.getNextLocation(this.lastReadLocation);
                if (nextLocation != null) {
                    ByteSequence read = this.dataManager.read(nextLocation);
                    Exchange createExchange = createExchange();
                    createExchange.getIn().setBody(read);
                    createExchange.getIn().setHeader("journal", getEndpointUri());
                    createExchange.getIn().setHeader("location", nextLocation);
                    defaultConsumer.getProcessor().process(createExchange);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Consumed record at: " + nextLocation);
                    }
                    this.dataManager.setMark(nextLocation, this.syncConsume);
                    this.lastReadLocation = nextLocation;
                } else {
                    LOG.debug("Sleeping due to no records being available.");
                    Thread.sleep(this.idleDelay);
                }
            } catch (Throwable th) {
                th.printStackTrace();
                return;
            }
        }
    }

    @Override // org.apache.camel.Endpoint
    public Producer createProducer() throws Exception {
        return new DefaultProducer(this) { // from class: org.apache.activemq.camel.component.JournalEndpoint.3
            @Override // org.apache.camel.Processor
            public void process(Exchange exchange) throws Exception {
                byte[] bArr;
                JournalEndpoint.this.incrementReference();
                try {
                    ByteSequence byteSequence = (ByteSequence) exchange.getIn().getBody(ByteSequence.class);
                    if (byteSequence == null && (bArr = (byte[]) exchange.getIn().getBody(byte[].class)) != null) {
                        byteSequence = new ByteSequence(bArr);
                    }
                    if (byteSequence == null) {
                        throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
                    }
                    JournalEndpoint.this.dataManager.write(byteSequence, JournalEndpoint.this.syncProduce);
                    JournalEndpoint.this.decrementReference();
                } catch (Throwable th) {
                    JournalEndpoint.this.decrementReference();
                    throw th;
                }
            }
        };
    }

    public boolean isSyncConsume() {
        return this.syncConsume;
    }

    public void setSyncConsume(boolean z) {
        this.syncConsume = z;
    }

    public boolean isSyncProduce() {
        return this.syncProduce;
    }

    public void setSyncProduce(boolean z) {
        this.syncProduce = z;
    }

    boolean isOpen() {
        boolean z;
        synchronized (this.activationMutex) {
            z = this.referenceCount > 0;
        }
        return z;
    }
}
