package org.apache.camel.component.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.zookeeper.operations.AnyOfOperations;
import org.apache.camel.component.zookeeper.operations.ChildrenChangedOperation;
import org.apache.camel.component.zookeeper.operations.DataChangedOperation;
import org.apache.camel.component.zookeeper.operations.ExistenceChangedOperation;
import org.apache.camel.component.zookeeper.operations.ExistsOperation;
import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
import org.apache.camel.component.zookeeper.operations.GetDataOperation;
import org.apache.camel.component.zookeeper.operations.OperationResult;
import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-371-04.zip:modules/system/layers/fuse/org/apache/camel/component/zookeeper/main/camel-zookeeper-2.17.0.redhat-630371-04.jar:org/apache/camel/component/zookeeper/ZooKeeperConsumer.class */
public class ZooKeeperConsumer extends DefaultConsumer {
    private final ZooKeeperConnectionManager zkm;
    private ZooKeeper connection;
    private ZooKeeperConfiguration configuration;
    private LinkedBlockingQueue<ZooKeeperOperation> operations;
    private ExecutorService executor;
    private volatile boolean shuttingDown;

    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-371-04.zip:modules/system/layers/fuse/org/apache/camel/component/zookeeper/main/camel-zookeeper-2.17.0.redhat-630371-04.jar:org/apache/camel/component/zookeeper/ZooKeeperConsumer$OperationsExecutor.class */
    private class OperationsExecutor implements Runnable {
        private ZooKeeperOperation current;
        private WatchedEvent watchedEvent;

        private OperationsExecutor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (ZooKeeperConsumer.this.isRunAllowed()) {
                try {
                    this.current = (ZooKeeperOperation) ZooKeeperConsumer.this.operations.take();
                    if (ZooKeeperConsumer.this.log.isTraceEnabled()) {
                        ZooKeeperConsumer.this.log.trace(String.format("Processing '%s' operation", this.current.getClass().getSimpleName()));
                    }
                    String node = this.current.getNode();
                    try {
                        try {
                            OperationResult operationResult = this.current.get();
                            if (ZooKeeperUtils.hasWatchedEvent(this.current)) {
                                this.watchedEvent = ZooKeeperUtils.getWatchedEvent(this.current);
                            }
                            if (operationResult != null && this.current.shouldProduceExchange()) {
                                ZooKeeperConsumer.this.getProcessor().process(ZooKeeperConsumer.this.createExchange(node, operationResult, this.watchedEvent));
                                this.watchedEvent = null;
                            }
                            if (ZooKeeperConsumer.this.configuration.isRepeat()) {
                                try {
                                    ZooKeeperConsumer.this.operations.offer(this.current.createCopy());
                                } catch (Exception e) {
                                    backoffAndThenRestart();
                                }
                            }
                        } catch (Exception e2) {
                            ZooKeeperConsumer.this.handleException(e2);
                            backoffAndThenRestart();
                            if (ZooKeeperConsumer.this.configuration.isRepeat()) {
                                try {
                                    ZooKeeperConsumer.this.operations.offer(this.current.createCopy());
                                } catch (Exception e3) {
                                    backoffAndThenRestart();
                                }
                            }
                        }
                    } catch (Throwable th) {
                        if (ZooKeeperConsumer.this.configuration.isRepeat()) {
                            try {
                                ZooKeeperConsumer.this.operations.offer(this.current.createCopy());
                            } catch (Exception e4) {
                                backoffAndThenRestart();
                            }
                        }
                        throw th;
                    }
                } catch (InterruptedException e5) {
                }
            }
        }

        private void backoffAndThenRestart() {
            try {
                if (ZooKeeperConsumer.this.isRunAllowed()) {
                    Thread.sleep(ZooKeeperConsumer.this.configuration.getBackoff());
                    ZooKeeperConsumer.this.initializeConsumer();
                }
            } catch (Exception e) {
            }
        }
    }

    public ZooKeeperConsumer(ZooKeeperEndpoint zooKeeperEndpoint, Processor processor) {
        super(zooKeeperEndpoint, processor);
        this.operations = new LinkedBlockingQueue<>();
        this.zkm = zooKeeperEndpoint.getConnectionManager();
        this.configuration = zooKeeperEndpoint.getConfiguration();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        super.doStart();
        this.connection = this.zkm.getConnection();
        if (this.log.isDebugEnabled()) {
            this.log.debug(String.format("Connected to Zookeeper cluster %s", this.configuration.getConnectString()));
        }
        initializeConsumer();
        this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "Camel-Zookeeper OperationsExecutor", 1);
        this.executor.submit(new OperationsExecutor());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        super.doStop();
        this.shuttingDown = true;
        if (this.log.isTraceEnabled()) {
            this.log.trace(String.format("Shutting down zookeeper consumer of '%s'", this.configuration.getPath()));
        }
        getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(this.executor);
        this.zkm.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeConsumer() {
        String path = this.configuration.getPath();
        if (this.configuration.isListChildren()) {
            initializeChildListingConsumer(path);
        } else {
            initializeDataConsumer(path);
        }
    }

    private void initializeDataConsumer(String str) {
        if (this.shuttingDown) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug(String.format("Initializing consumption of data on node '%s'", str));
        }
        addBasicDataConsumeSequence(str);
    }

    private void initializeChildListingConsumer(String str) {
        if (this.shuttingDown) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug(String.format("Initializing child listing of node '%s'", str));
        }
        addBasicChildListingSequence(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Exchange createExchange(String str, OperationResult operationResult, WatchedEvent watchedEvent) {
        Exchange createExchange = getEndpoint().createExchange();
        ZooKeeperMessage zooKeeperMessage = new ZooKeeperMessage(str, operationResult.getStatistics(), watchedEvent);
        createExchange.setIn(zooKeeperMessage);
        if (operationResult.isOk()) {
            zooKeeperMessage.setBody(operationResult.getResult());
        } else {
            createExchange.setException(operationResult.getException());
        }
        return createExchange;
    }

    private void addBasicDataConsumeSequence(String str) {
        this.operations.clear();
        this.operations.add(new AnyOfOperations(str, new ExistsOperation(this.connection, str), new ExistenceChangedOperation(this.connection, str)));
        this.operations.add(new GetDataOperation(this.connection, str));
        this.operations.add(new DataChangedOperation(this.connection, str, false, this.configuration.isSendEmptyMessageOnDelete()));
    }

    private void addBasicChildListingSequence(String str) {
        this.operations.clear();
        this.operations.add(new AnyOfOperations(str, new ExistsOperation(this.connection, str), new ExistenceChangedOperation(this.connection, str)));
        this.operations.add(new GetChildrenOperation(this.connection, str));
        this.operations.add(new ChildrenChangedOperation(this.connection, str, false));
    }
}
