/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.demo.distexec.mapreduce;

import com.martiansoftware.jsap.FlaggedOption;
import com.martiansoftware.jsap.JSAP;
import com.martiansoftware.jsap.JSAPResult;
import com.martiansoftware.jsap.Parameter;
import com.martiansoftware.jsap.SimpleJSAP;
import com.martiansoftware.jsap.StringParser;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.nio.Buffer;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import org.infinispan.Cache;
import org.infinispan.demo.distexec.CacheBuilder;
import org.infinispan.distexec.mapreduce.Collator;
import org.infinispan.distexec.mapreduce.Collector;
import org.infinispan.distexec.mapreduce.MapReduceTask;
import org.infinispan.distexec.mapreduce.Mapper;
import org.infinispan.distexec.mapreduce.Reducer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.FileLookup;

public class WordCountDemo {
    private static final String DEFAULT_CONFIG_FILE = "jgroups-s3_ping-aws.xml";

    public static void main(String[] args) throws Exception {
        SimpleJSAP jsap = new SimpleJSAP("WordCountDemo", "Count words in Infinispan cache usin MapReduceTask ", new Parameter[]{new FlaggedOption("configFile", (StringParser)JSAP.STRING_PARSER, DEFAULT_CONFIG_FILE, false, 'c', "configFile", "Infinispan transport config file"), new FlaggedOption("nodeType", (StringParser)JSAP.STRING_PARSER, "slave", true, 't', "nodeType", "Node type as either master or slave"), new FlaggedOption("textFile", (StringParser)JSAP.STRING_PARSER, null, false, 'f', "textFile", "Input text file to distribute onto grid"), new FlaggedOption("ispnConfigFile", (StringParser)JSAP.STRING_PARSER, null, false, 'i', "ispnConfigFile", "Infinispan cache configuration file"), new FlaggedOption("kthWord", (StringParser)JSAP.INTEGER_PARSER, "15", false, 'k', "kthWord", "Kth most frequent word")});
        JSAPResult config = jsap.parse(args);
        if (!config.success() || jsap.messagePrinted()) {
            Iterator messageIterator = config.getErrorMessageIterator();
            while (messageIterator.hasNext()) {
                System.err.println(messageIterator.next());
            }
            System.err.println(jsap.getHelp());
            return;
        }
        String nodeType = config.getString("nodeType");
        boolean isMaster = nodeType != null && nodeType.equals("master");
        String transportConfig = config.getString("configFile");
        String ispnConfigFile = config.getString("ispnConfigFile");
        int kthWord = config.getInt("kthWord");
        System.out.println("Starting Infinispan node using transport config file " + transportConfig);
        if (ispnConfigFile != null) {
            System.out.println("Starting Infinispan node using Infinispan config file " + ispnConfigFile);
        }
        String textFile = config.getString("textFile");
        BufferedReader bufferedReader = null;
        if (textFile != null) {
            FileLookup fl = new FileLookup();
            InputStream lookupFile = fl.lookupFile(textFile);
            if (lookupFile == null) {
                System.err.println("Intended input text file " + textFile + " not found. Make sure it is on classpath");
                return;
            }
            bufferedReader = new BufferedReader(new FileReader(textFile));
        }
        CacheBuilder cb = new CacheBuilder(ispnConfigFile, transportConfig);
        EmbeddedCacheManager cacheManager = cb.getCacheManager();
        Cache cache = cacheManager.getCache();
        int chunkSize = 10;
        int chunkId = 0;
        if (bufferedReader != null) {
            CharBuffer cbuf = CharBuffer.allocate(1024 * chunkSize);
            while (bufferedReader.read(cbuf) >= 0) {
                Buffer buffer = cbuf.flip();
                String textChunk = buffer.toString();
                cache.put((Object)(textFile + chunkId++), (Object)textChunk);
                cbuf.clear();
            }
        }
        Transport transport = cache.getAdvancedCache().getRpcManager().getTransport();
        int numServers = transport.getMembers().size();
        if (isMaster) {
            System.out.println("Member " + transport.getAddress() + " joined as master and its view is " + transport.getMembers() + ", starting MapReduceTask across " + numServers + " machines");
            long start = System.currentTimeMillis();
            MapReduceTask t = new MapReduceTask(cache);
            t.mappedWith((Mapper)new WordCountMapper()).reducedWith((Reducer)new WordCountReducer());
            Map.Entry kthMostFrequentEntry = (Map.Entry)t.execute((Collator)new KFrequentWordCollator(kthWord));
            if (kthMostFrequentEntry != null) {
                System.out.println("Kth(where k=" + kthWord + ") most frequent word is " + (String)kthMostFrequentEntry.getKey() + " occurring " + kthMostFrequentEntry.getValue() + " times. Found in " + (System.currentTimeMillis() - start) + " ms");
            } else {
                System.out.println("Kth(where k=" + kthWord + ") most frequent word is too large for this data set. Try smaller k");
            }
            cacheManager.stop();
        } else {
            System.out.println("Member " + transport.getAddress() + " joined as slave and its view is " + transport.getMembers() + ", waiting....");
        }
    }

    static class WordCountReducer
    implements Reducer<String, Integer> {
        private static final long serialVersionUID = 1901016598354633256L;

        WordCountReducer() {
        }

        public Integer reduce(String key, Iterator<Integer> iter) {
            int sum = 0;
            while (iter.hasNext()) {
                Integer i = iter.next();
                sum += i.intValue();
            }
            return sum;
        }
    }

    static class WordCountMapper
    implements Mapper<String, String, String, Integer> {
        private static final long serialVersionUID = -5943370243108735560L;

        WordCountMapper() {
        }

        public void map(String key, String value, Collector<String, Integer> c) {
            StringTokenizer tokens = new StringTokenizer(value);
            while (tokens.hasMoreElements()) {
                String s = (String)tokens.nextElement();
                c.emit((Object)s, (Object)1);
            }
        }
    }

    static class KFrequentWordCollator
    implements Collator<String, Integer, Map.Entry<String, Integer>> {
        private final int kthFrequentWord;

        public KFrequentWordCollator(int kthFrequentWord) {
            if (kthFrequentWord < 0) {
                throw new IllegalArgumentException("kthFrequentWord can not be less than 0");
            }
            this.kthFrequentWord = kthFrequentWord;
        }

        public Map.Entry<String, Integer> collate(Map<String, Integer> reducedResults) {
            Set<Map.Entry<String, Integer>> entrySet = reducedResults.entrySet();
            ArrayList<Map.Entry<String, Integer>> l = new ArrayList<Map.Entry<String, Integer>>();
            l.addAll(entrySet);
            Collections.sort(l, new Comparator<Map.Entry<String, Integer>>(){

                @Override
                public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                    return o1.getValue() < o2.getValue() ? 1 : (o1.getValue() > o2.getValue() ? -1 : 0);
                }
            });
            if (this.kthFrequentWord < l.size()) {
                return (Map.Entry)l.get(this.kthFrequentWord - 1);
            }
            return null;
        }
    }
}

