package io.fabric8.partition.internal;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.fabric8.groups.Group;
import io.fabric8.groups.GroupListener;
import io.fabric8.groups.internal.ZooKeeperGroup;
import io.fabric8.partition.BalancingPolicy;
import io.fabric8.partition.Partition;
import io.fabric8.partition.PartitionListener;
import io.fabric8.partition.TaskManager;
import io.fabric8.partition.WorkerNode;
import io.fabric8.utils.Closeables;
import io.fabric8.utils.SystemProperties;
import io.fabric8.zookeeper.ZkPath;
import io.fabric8.zookeeper.utils.ZooKeeperUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fabric8/partition/internal/DefaultTaskManager.class */
public class DefaultTaskManager implements TaskManager, GroupListener<WorkerNode>, PathChildrenCacheListener, NodeCacheListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTaskManager.class);
    private final Group<WorkerNode> group;
    private final String taskId;
    private final String taskDefinition;
    private final String partitionPath;
    private final CuratorFramework curator;
    private final PartitionListener partitionListener;
    private final BalancingPolicy balancingPolicy;
    private volatile PathChildrenCache partitionCache;
    private volatile NodeCache workerCache;
    private WorkerNode node;
    private final String name = System.getProperty(SystemProperties.KARAF_NAME);
    private final ExecutorService executorService = Executors.newFixedThreadPool(2);
    private final ObjectMapper mapper = new ObjectMapper();
    private final TypeReference<HashMap<String, String>> partitionTypeRef = new TypeReference<HashMap<String, String>>() { // from class: io.fabric8.partition.internal.DefaultTaskManager.1
    };
    private final Set<Partition> assignedPartitions = new LinkedHashSet();

    /* renamed from: io.fabric8.partition.internal.DefaultTaskManager$3, reason: invalid class name */
    /* loaded from: input_file:io/fabric8/partition/internal/DefaultTaskManager$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$io$fabric8$groups$GroupListener$GroupEvent;
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.INITIALIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$io$fabric8$groups$GroupListener$GroupEvent = new int[GroupListener.GroupEvent.values().length];
            try {
                $SwitchMap$io$fabric8$groups$GroupListener$GroupEvent[GroupListener.GroupEvent.CONNECTED.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$fabric8$groups$GroupListener$GroupEvent[GroupListener.GroupEvent.CHANGED.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$fabric8$groups$GroupListener$GroupEvent[GroupListener.GroupEvent.DISCONNECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:io/fabric8/partition/internal/DefaultTaskManager$RebalanceTask.class */
    private class RebalanceTask implements Runnable {
        private RebalanceTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultTaskManager.this.rebalance();
        }
    }

    public DefaultTaskManager(CuratorFramework curatorFramework, String str, String str2, String str3, PartitionListener partitionListener, BalancingPolicy balancingPolicy) {
        this.curator = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "curator");
        this.taskId = (String) Preconditions.checkNotNull(str);
        this.taskDefinition = (String) Preconditions.checkNotNull(str2);
        this.partitionPath = (String) Preconditions.checkNotNull(str3);
        this.partitionListener = partitionListener;
        this.balancingPolicy = balancingPolicy;
        this.group = new ZooKeeperGroup(curatorFramework, ZkPath.TASK.getPath(new String[]{str}), WorkerNode.class);
        this.group.add(this);
    }

    @Override // io.fabric8.partition.TaskManager
    public void start() {
        this.node = createNode();
        try {
            this.workerCache = createWorkerCache();
            this.workerCache.getListenable().addListener(this);
            this.workerCache.start(true);
            ZooKeeperUtils.createDefault(this.curator, this.partitionPath, (String) null);
            this.group.update(createNode());
            this.group.start();
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // io.fabric8.partition.TaskManager
    public void stop() {
        Closeables.closeQuitely(this.partitionCache);
        Closeables.closeQuitely(this.workerCache);
        Closeables.closeQuitely(this.group);
        this.executorService.shutdown();
        this.partitionListener.stop(this.taskId, this.taskDefinition, this.assignedPartitions);
        this.assignedPartitions.clear();
        this.node = null;
    }

    private synchronized void startWatchingForTasks() {
        try {
            if (this.partitionCache == null) {
                this.partitionCache = new PathChildrenCache(this.curator, this.partitionPath, true, false, this.executorService);
                this.partitionCache.getListenable().addListener(this);
                this.partitionCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                this.partitionCache.rebuild();
                this.workerCache.rebuild();
            }
        } catch (Exception e) {
            LOGGER.error("Failed to start partition cache.", e);
        }
    }

    private synchronized void stopWatchingForTasks() {
        if (this.partitionCache != null) {
            try {
                this.partitionCache.close();
                this.partitionCache = null;
            } catch (IOException e) {
                LOGGER.error("Failed to stop partition cache.", e);
            }
        }
    }

    public void groupEvent(Group<WorkerNode> group, GroupListener.GroupEvent groupEvent) {
        switch (AnonymousClass3.$SwitchMap$io$fabric8$groups$GroupListener$GroupEvent[groupEvent.ordinal()]) {
            case 1:
            case 2:
                WorkerNode createNode = createNode();
                if (!group.isMaster()) {
                    group.update(createNode);
                    stopWatchingForTasks();
                    return;
                } else {
                    startWatchingForTasks();
                    createNode.setDefinition(this.taskDefinition);
                    group.update(createNode);
                    this.executorService.submit(new RebalanceTask());
                    return;
                }
            case 3:
                stopWatchingForTasks();
                return;
            default:
                return;
        }
    }

    WorkerNode createNode() {
        return new WorkerNode(this.taskId);
    }

    NodeCache createWorkerCache() throws Exception {
        String path = ZkPath.TASK_MEMBER_PARTITIONS.getPath(new String[]{this.name, this.taskId});
        ZooKeeperUtils.createDefault(this.curator, path, (String) null);
        return new NodeCache(this.curator, path);
    }

    WorkerNode readWorkerNode() {
        try {
            byte[] bArr = (byte[]) this.curator.getData().forPath(ZkPath.TASK_MEMBER_PARTITIONS.getPath(new String[]{this.name, this.taskId}));
            return bArr != null ? (WorkerNode) this.mapper.readValue(bArr, WorkerNode.class) : createNode();
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // io.fabric8.partition.TaskManager
    public PartitionListener getPartitionListener() {
        return this.partitionListener;
    }

    @Override // io.fabric8.partition.TaskManager
    public BalancingPolicy getBalancingPolicy() {
        return this.balancingPolicy;
    }

    @Override // io.fabric8.partition.TaskManager
    public void rebalance() {
        List currentData = this.partitionCache.getCurrentData();
        String[] strArr = (String[]) Lists.transform(currentData, ChildDataToPath.INSTANCE).toArray(new String[currentData.size()]);
        Map members = this.group.members();
        this.balancingPolicy.rebalance(this.taskId, strArr, (String[]) members.keySet().toArray(new String[members.size()]));
    }

    @Override // io.fabric8.partition.TaskManager
    public synchronized void updated(WorkerNode workerNode) {
        Set<Partition> listNodePartitions = listNodePartitions(workerNode);
        LinkedHashSet linkedHashSet = new LinkedHashSet((Collection) Sets.difference(listNodePartitions, this.assignedPartitions));
        LinkedHashSet linkedHashSet2 = new LinkedHashSet((Collection) Sets.difference(this.assignedPartitions, listNodePartitions));
        this.assignedPartitions.addAll(linkedHashSet);
        this.assignedPartitions.removeAll(linkedHashSet2);
        this.partitionListener.stop(this.taskId, this.taskDefinition, linkedHashSet2);
        this.partitionListener.start(this.taskId, this.taskDefinition, linkedHashSet);
    }

    private Set<Partition> listNodePartitions(WorkerNode workerNode) {
        return Sets.newHashSet(Iterables.transform(workerNode.getPartitions() != null ? Sets.newHashSet(workerNode.getPartitions()) : Sets.newHashSet(), new Function<String, Partition>() { // from class: io.fabric8.partition.internal.DefaultTaskManager.2
            public Partition apply(String str) {
                try {
                    return new PartitionImpl(str, (Map) DefaultTaskManager.this.mapper.readValue((byte[]) DefaultTaskManager.this.curator.getData().forPath(str), DefaultTaskManager.this.partitionTypeRef));
                } catch (Exception e) {
                    DefaultTaskManager.LOGGER.warn("Failed to read partition data, using empty configuration instead.");
                    return new PartitionImpl(str, Maps.newHashMap());
                }
            }
        }));
    }

    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        switch (AnonymousClass3.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
            case 1:
            case 2:
            case 3:
                this.executorService.submit(new RebalanceTask());
                return;
            default:
                return;
        }
    }

    public synchronized void nodeChanged() throws Exception {
        this.node = readWorkerNode();
        updated(this.node);
    }
}
