package org.fusesource.fabric.stream.log;

import java.io.ByteArrayOutputStream;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fusesource/fabric/stream/log/InputBatcher.class */
public class InputBatcher extends DefaultComponent {
    public int batchSize = 262144;
    public long batchTimeout = 1000;
    private static final transient Logger LOG = LoggerFactory.getLogger(InputBatcherConsumer.class);
    static final Object EOF = new Object();

    /* loaded from: input_file:org/fusesource/fabric/stream/log/InputBatcher$InputBatcherConsumer.class */
    class InputBatcherConsumer extends DefaultConsumer {
        final ArrayBlockingQueue<Object> queue;
        final ExecutorService inputReader;
        final ExecutorService batchReader;
        private AsyncProcessor processor;

        InputBatcherConsumer(Endpoint endpoint, Processor processor) {
            super(endpoint, processor);
            this.queue = new ArrayBlockingQueue<>(1024);
            this.inputReader = Executors.newSingleThreadExecutor();
            this.batchReader = Executors.newSingleThreadExecutor();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.camel.impl.DefaultConsumer, org.apache.camel.impl.ServiceSupport
        public void doStart() throws Exception {
            super.doStart();
            this.inputReader.execute(new Runnable() { // from class: org.fusesource.fabric.stream.log.InputBatcher.InputBatcherConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    byte[] bArr;
                    int i;
                    try {
                        bArr = new byte[4096];
                        i = 0;
                    } catch (Exception e) {
                        e.printStackTrace();
                        return;
                    }
                    while (InputBatcherConsumer.this.isRunAllowed()) {
                        if (InputBatcherConsumer.this.isSuspending() || InputBatcherConsumer.this.isSuspended()) {
                            InputBatcher.LOG.trace("Consumer is suspended so skip polling");
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e2) {
                                InputBatcher.LOG.debug("Sleep interrupted, are we stopping? {}", Boolean.valueOf(InputBatcherConsumer.this.isStopping() || InputBatcherConsumer.this.isStopped()));
                            }
                        } else {
                            int read = System.in.read(bArr, i, bArr.length - i);
                            if (read < 0) {
                                if (i > 0) {
                                    byte[] bArr2 = new byte[i];
                                    System.arraycopy(bArr, 0, bArr2, 0, i);
                                    InputBatcherConsumer.this.queue.put(bArr2);
                                }
                                InputBatcherConsumer.this.queue.put(InputBatcher.EOF);
                                return;
                            }
                            i += read;
                            int lastnlposition = InputBatcher.lastnlposition(bArr, i);
                            if (lastnlposition >= 0) {
                                int i2 = lastnlposition + 1;
                                byte[] bArr3 = new byte[i2];
                                System.arraycopy(bArr, 0, bArr3, 0, i2);
                                int i3 = i - i2;
                                System.arraycopy(bArr, i2, bArr, 0, i3);
                                i = i3;
                                InputBatcherConsumer.this.queue.put(bArr3);
                            } else if (i == bArr.length) {
                                InputBatcherConsumer.this.queue.put(bArr);
                                bArr = new byte[4096];
                                i = 0;
                            }
                        }
                        e.printStackTrace();
                        return;
                    }
                }
            });
            this.batchReader.execute(new Runnable() { // from class: org.fusesource.fabric.stream.log.InputBatcher.InputBatcherConsumer.2
                @Override // java.lang.Runnable
                public void run() {
                    Object poll;
                    boolean z = false;
                    while (InputBatcherConsumer.this.isRunAllowed() && !z) {
                        if (InputBatcherConsumer.this.isSuspending() || InputBatcherConsumer.this.isSuspended()) {
                            InputBatcher.LOG.trace("Consumer is suspended so skip polling");
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {
                                InputBatcher.LOG.debug("Sleep interrupted, are we stopping? {}", Boolean.valueOf(InputBatcherConsumer.this.isStopping() || InputBatcherConsumer.this.isStopped()));
                            }
                        } else {
                            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream((int) (InputBatcher.this.batchSize * 1.5d));
                            try {
                                Object poll2 = InputBatcherConsumer.this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                                if (poll2 != null) {
                                    if (poll2 == InputBatcher.EOF) {
                                        z = true;
                                    } else {
                                        long currentTimeMillis = System.currentTimeMillis() + InputBatcher.this.batchTimeout;
                                        try {
                                            byteArrayOutputStream.write((byte[]) poll2);
                                            while (byteArrayOutputStream.size() < InputBatcher.this.batchSize && !z) {
                                                Object poll3 = InputBatcherConsumer.this.queue.poll();
                                                if (poll3 == null) {
                                                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                                                    if (currentTimeMillis2 <= 0 || (poll = InputBatcherConsumer.this.queue.poll(currentTimeMillis2, TimeUnit.MILLISECONDS)) == null) {
                                                        break;
                                                    } else if (poll == InputBatcher.EOF) {
                                                        z = true;
                                                    } else {
                                                        byteArrayOutputStream.write((byte[]) poll);
                                                    }
                                                } else if (poll3 == InputBatcher.EOF) {
                                                    z = true;
                                                } else {
                                                    byteArrayOutputStream.write((byte[]) poll3);
                                                }
                                            }
                                            if (byteArrayOutputStream.size() > 0) {
                                                byte[] byteArray = byteArrayOutputStream.toByteArray();
                                                byteArrayOutputStream.reset();
                                                Exchange createExchange = InputBatcherConsumer.this.getEndpoint().createExchange();
                                                DefaultMessage defaultMessage = new DefaultMessage();
                                                defaultMessage.setBody(byteArray);
                                                createExchange.setIn(defaultMessage);
                                                try {
                                                    InputBatcherConsumer.this.getProcessor().process(createExchange);
                                                    if (createExchange.getException() != null) {
                                                        InputBatcherConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
                                                    }
                                                } catch (Exception e2) {
                                                    InputBatcherConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange, e2);
                                                }
                                            }
                                        } catch (Exception e3) {
                                            InputBatcher.LOG.info("Error processing exchange.", (Throwable) e3);
                                        }
                                    }
                                }
                            } catch (InterruptedException e4) {
                                InputBatcher.LOG.debug("Sleep interrupted, are we stopping? {}", Boolean.valueOf(InputBatcherConsumer.this.isStopping() || InputBatcherConsumer.this.isStopped()));
                            }
                        }
                    }
                    if (z && InputBatcherConsumer.this.isRunAllowed()) {
                        Exchange createExchange2 = InputBatcherConsumer.this.getEndpoint().createExchange();
                        DefaultMessage defaultMessage2 = new DefaultMessage();
                        defaultMessage2.setHeader("EOF", "true");
                        defaultMessage2.setBody(new byte[0]);
                        createExchange2.setIn(defaultMessage2);
                        try {
                            InputBatcherConsumer.this.getProcessor().process(createExchange2);
                            if (createExchange2.getException() != null) {
                                InputBatcherConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange2, createExchange2.getException());
                            }
                        } catch (Exception e5) {
                            InputBatcherConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange2, e5);
                        }
                        System.exit(0);
                    }
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.camel.impl.DefaultConsumer, org.apache.camel.impl.ServiceSupport
        public void doStop() throws Exception {
            this.inputReader.shutdown();
            this.batchReader.shutdown();
            super.doStop();
        }
    }

    /* loaded from: input_file:org/fusesource/fabric/stream/log/InputBatcher$InputBatcherEndpoint.class */
    class InputBatcherEndpoint extends DefaultEndpoint {
        InputBatcherEndpoint(String str, Component component) {
            super(str, component);
        }

        @Override // org.apache.camel.IsSingleton
        public boolean isSingleton() {
            return true;
        }

        @Override // org.apache.camel.Endpoint
        public Producer createProducer() throws Exception {
            throw new UnsupportedOperationException("Producer not supported!");
        }

        @Override // org.apache.camel.Endpoint
        public Consumer createConsumer(Processor processor) throws Exception {
            return new InputBatcherConsumer(this, processor);
        }
    }

    @Override // org.apache.camel.impl.DefaultComponent
    protected Endpoint createEndpoint(String str, String str2, Map<String, Object> map) throws Exception {
        return new InputBatcherEndpoint(str, this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int lastnlposition(byte[] bArr, int i) {
        int i2 = -1;
        int i3 = i - 1;
        while (true) {
            if (i3 < 0) {
                break;
            }
            if (bArr[i3] == 10) {
                i2 = i3;
                break;
            }
            i3--;
        }
        return i2;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long j) {
        this.batchTimeout = j;
    }
}
