package org.fusesource.fabric.stream.log;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/fusesource/fabric/stream/log/LogStreamer.class */
public class LogStreamer {
    private static final transient Logger LOG;
    static final Object EOF;
    public InputStream is;
    public boolean exitOnEOF;
    public Processor processor;
    static final /* synthetic */ boolean $assertionsDisabled;
    final ExecutorService inputReader = Executors.newSingleThreadExecutor();
    final ExecutorService batchReader = Executors.newSingleThreadExecutor();
    private final AtomicBoolean runAllowed = new AtomicBoolean(false);
    final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024);
    public String logFilePattern = null;
    public File positionFile = null;
    public int batchSize = 262144;
    public long batchTimeout = 1000;
    public long tailRetry = 500;
    public Semaphore sendSemaphore = new Semaphore(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/fusesource/fabric/stream/log/LogStreamer$QueueEntry.class */
    public static class QueueEntry {
        private final byte[] data;
        private final long file;
        private final long offset;
        private final int size;

        QueueEntry(byte[] bArr, long j, long j2, int i) {
            this.data = bArr;
            this.file = j;
            this.offset = j2;
            this.size = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateLogPosition(long j, long j2) {
        if (this.positionFile != null) {
            try {
                Support.writeText(this.positionFile, String.format("%d:%d\n", Long.valueOf(j), Long.valueOf(j2)));
            } catch (IOException e) {
                new RuntimeException(e);
            }
        }
    }

    private boolean logFileExists(long j) {
        return new File(String.format(this.logFilePattern, Long.valueOf(j))).exists();
    }

    private boolean isRunAllowed() {
        return this.runAllowed.get();
    }

    public void start() {
        if (this.runAllowed.compareAndSet(false, true)) {
            try {
                this.processor.start();
                this.inputReader.execute(new Runnable() { // from class: org.fusesource.fabric.stream.log.LogStreamer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        LogStreamer.this.readInput();
                    }
                });
                this.batchReader.execute(new Runnable() { // from class: org.fusesource.fabric.stream.log.LogStreamer.2
                    @Override // java.lang.Runnable
                    public void run() {
                        LogStreamer.this.drainBatchQueue();
                    }
                });
            } catch (Exception e) {
                this.runAllowed.set(false);
            }
        }
    }

    public void stop() {
        if (this.runAllowed.compareAndSet(true, false)) {
            this.inputReader.shutdown();
            this.batchReader.shutdown();
            this.processor.stop();
        }
        this.runAllowed.set(false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readInput() {
        if (this.logFilePattern == null) {
            if (this.is == null) {
                this.is = System.in;
            }
            try {
                process(this.is, 0L, 0L);
                return;
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
        }
        try {
            if (!this.positionFile.exists()) {
                Support.writeText(this.positionFile, "0:0");
            }
            String[] split = Support.readText(this.positionFile).trim().split(":");
            long parseLong = Long.parseLong(split[0]);
            long parseLong2 = Long.parseLong(split[1]);
            while (this.runAllowed.get()) {
                FileInputStream fileInputStream = new FileInputStream(new File(String.format(this.logFilePattern, Long.valueOf(parseLong))));
                try {
                    process(fileInputStream, parseLong, parseLong2);
                    parseLong++;
                    parseLong2 = 0;
                    fileInputStream.close();
                } catch (Throwable th) {
                    fileInputStream.close();
                    throw th;
                }
            }
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }

    private boolean process(InputStream inputStream, long j, long j2) throws IOException, InterruptedException {
        if (j2 > 0) {
            inputStream.skip(j2);
        }
        int i = 0;
        byte[] bArr = new byte[4096];
        boolean z = false;
        while (isRunAllowed()) {
            int read = inputStream.read(bArr, i, bArr.length - i);
            if (read >= 0) {
                z = false;
                i += read;
                int lastnlposition = Support.lastnlposition(bArr, i);
                if (lastnlposition >= 0) {
                    int i2 = lastnlposition + 1;
                    byte[] copyOf = Arrays.copyOf(bArr, i2);
                    int i3 = i - i2;
                    System.arraycopy(bArr, i2, bArr, 0, i3);
                    i = i3;
                    this.queue.put(new QueueEntry(copyOf, j, j2, i2));
                }
                if (i == bArr.length) {
                    this.queue.put(new QueueEntry(bArr, j, j2, i));
                    bArr = new byte[bArr.length];
                    i = 0;
                }
            } else {
                if (this.logFilePattern == null || z) {
                    if (i > 0) {
                        this.queue.put(new QueueEntry(Arrays.copyOf(bArr, i), j, j2, i));
                    }
                    this.queue.put(EOF);
                    return true;
                }
                if (logFileExists(j + 1)) {
                    z = true;
                } else {
                    z = false;
                    Thread.sleep(this.tailRetry);
                }
            }
            j2 += read;
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainBatchQueue() {
        Object poll;
        loop0: while (isRunAllowed()) {
            boolean z = false;
            while (isRunAllowed() && !z) {
                QueueEntry queueEntry = null;
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream((int) (this.batchSize * 1.5d));
                try {
                    Object poll2 = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (poll2 != null) {
                        if (poll2 == EOF) {
                            z = true;
                        } else {
                            long currentTimeMillis = System.currentTimeMillis() + this.batchTimeout;
                            QueueEntry queueEntry2 = (QueueEntry) poll2;
                            if (0 == 0) {
                                queueEntry = queueEntry2;
                            }
                            byteArrayOutputStream.write(queueEntry2.data);
                            while (byteArrayOutputStream.size() < this.batchSize && !z) {
                                Object poll3 = this.queue.poll();
                                if (poll3 == null) {
                                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                                    if (currentTimeMillis2 <= 0 || (poll = this.queue.poll(currentTimeMillis2, TimeUnit.MILLISECONDS)) == null) {
                                        break;
                                    }
                                    if (poll == EOF) {
                                        z = true;
                                    } else {
                                        queueEntry2 = (QueueEntry) poll;
                                        byteArrayOutputStream.write(queueEntry2.data);
                                    }
                                } else if (poll3 == EOF) {
                                    z = true;
                                } else {
                                    queueEntry2 = (QueueEntry) poll3;
                                    byteArrayOutputStream.write(queueEntry2.data);
                                }
                            }
                            if (byteArrayOutputStream.size() > 0) {
                                byte[] byteArray = byteArrayOutputStream.toByteArray();
                                byteArrayOutputStream.reset();
                                if (!$assertionsDisabled && queueEntry.file != queueEntry2.file) {
                                    throw new AssertionError();
                                    break loop0;
                                }
                                HashMap<String, String> hashMap = new HashMap<>();
                                hashMap.put("at", String.format("%d:%d", Long.valueOf(queueEntry.file), Long.valueOf(queueEntry.offset)));
                                final QueueEntry queueEntry3 = queueEntry2;
                                send(hashMap, queueEntry2, byteArray, new Runnable() { // from class: org.fusesource.fabric.stream.log.LogStreamer.3
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        LogStreamer.this.updateLogPosition(queueEntry3.file, queueEntry3.offset + queueEntry3.size);
                                    }
                                });
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    stop();
                }
            }
            if (z && isRunAllowed()) {
                HashMap<String, String> hashMap2 = new HashMap<>();
                hashMap2.put("EOF", "true");
                send(hashMap2, null, new byte[0], new Runnable() { // from class: org.fusesource.fabric.stream.log.LogStreamer.4
                    @Override // java.lang.Runnable
                    public void run() {
                        if (LogStreamer.this.exitOnEOF) {
                            System.exit(0);
                        }
                    }
                });
            }
        }
    }

    private void send(HashMap<String, String> hashMap, QueueEntry queueEntry, byte[] bArr, final Runnable runnable) {
        try {
            this.sendSemaphore.acquire();
            this.processor.send(hashMap, bArr, new Callback() { // from class: org.fusesource.fabric.stream.log.LogStreamer.5
                @Override // org.fusesource.fabric.stream.log.Callback
                public void onSuccess() {
                    try {
                        LogStreamer.this.sendSemaphore.release();
                        if (runnable != null) {
                            runnable.run();
                        }
                    } catch (Throwable th) {
                        onFailure(th);
                    }
                }

                @Override // org.fusesource.fabric.stream.log.Callback
                public void onFailure(Throwable th) {
                    th.printStackTrace();
                    LogStreamer.this.stop();
                }
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long j) {
        this.batchTimeout = j;
    }

    public InputStream getIs() {
        return this.is;
    }

    public void setIs(InputStream inputStream) {
        this.is = inputStream;
    }

    public boolean isExitOnEOF() {
        return this.exitOnEOF;
    }

    public void setExitOnEOF(boolean z) {
        this.exitOnEOF = z;
    }

    public String getLogFilePattern() {
        return this.logFilePattern;
    }

    public void setLogFilePattern(String str) {
        this.logFilePattern = str;
    }

    public File getPositionFile() {
        return this.positionFile;
    }

    public void setPositionFile(File file) {
        this.positionFile = file;
    }

    public Processor getProcessor() {
        return this.processor;
    }

    public void setProcessor(Processor processor) {
        this.processor = processor;
    }

    public long getTailRetry() {
        return this.tailRetry;
    }

    public void setTailRetry(long j) {
        this.tailRetry = j;
    }

    static {
        $assertionsDisabled = !LogStreamer.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(LogStreamer.class);
        EOF = new Object();
    }
}
