/*
 * Decompiled with CFR 0.152.
 */
package org.tikv.common;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.DefaultHostMapping;
import org.tikv.common.HostMapping;
import org.tikv.common.PDChecker;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.apiversion.RequestKeyCodec;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.operation.PDErrorHandler;
import org.tikv.common.pd.PDError;
import org.tikv.common.pd.PDUtils;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.Pdpb;
import org.tikv.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.tikv.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.tikv.shade.com.fasterxml.jackson.databind.json.JsonMapper;
import org.tikv.shade.com.google.common.annotations.VisibleForTesting;
import org.tikv.shade.com.google.common.base.Preconditions;
import org.tikv.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.tikv.shade.com.google.protobuf.ByteString;
import org.tikv.shade.io.grpc.ManagedChannel;
import org.tikv.shade.io.grpc.Metadata;
import org.tikv.shade.io.grpc.stub.MetadataUtils;
import org.tikv.shade.io.prometheus.client.Histogram;

public class PDClient
extends AbstractGRPCClient<PDGrpc.PDBlockingStub, PDGrpc.PDFutureStub>
implements ReadOnlyPDClient {
    private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
    private static final long MIN_TRY_UPDATE_DURATION = 50L;
    private static final int PAUSE_CHECKER_TIMEOUT = 300;
    private static final int KEEP_CHECKER_PAUSE_PERIOD = 60;
    private static final Logger logger = LoggerFactory.getLogger(PDClient.class);
    private final RequestKeyCodec codec;
    private Pdpb.RequestHeader header;
    private Pdpb.TsoRequest tsoReq;
    private volatile PDClientWrapper pdClientWrapper;
    private ScheduledExecutorService service;
    private ScheduledExecutorService tiflashReplicaService;
    private final HashMap<PDChecker, ScheduledExecutorService> pauseCheckerService = new HashMap();
    private List<URI> pdAddrs;
    private Client etcdClient;
    private ConcurrentMap<Long, Double> tiflashReplicaMap;
    private HostMapping hostMapping;
    private long lastUpdateLeaderTime;
    public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = (Histogram)((Histogram.Builder)((Histogram.Builder)((Histogram.Builder)HistogramUtils.buildDuration().name("client_java_pd_get_region_by_requests_latency")).help("pd getRegionByKey request latency.")).labelNames("cluster")).register();

    private PDClient(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
        super(conf, channelFactory);
        this.initCluster();
        this.codec = codec;
        this.blockingStub = this.getBlockingStub();
        this.asyncStub = this.getAsyncStub();
    }

    public static ReadOnlyPDClient create(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
        return PDClient.createRaw(conf, codec, channelFactory);
    }

    static PDClient createRaw(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
        return new PDClient(conf, codec, channelFactory);
    }

    @Override
    public HostMapping getHostMapping() {
        return this.hostMapping;
    }

    @Override
    public TiTimestamp getTimestamp(BackOffer backOffer) {
        Supplier<Pdpb.TsoRequest> request = () -> this.tsoReq;
        PDErrorHandler<Pdpb.TsoResponse> handler = new PDErrorHandler<Pdpb.TsoResponse>(r -> r.getHeader().hasError() ? PDError.buildFromPdpbError(r.getHeader().getError()) : null, this);
        Pdpb.TsoResponse resp = this.callWithRetry(backOffer, PDGrpc.getTsoMethod(), request, handler);
        Pdpb.Timestamp timestamp = resp.getTimestamp();
        return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical());
    }

    public synchronized void keepPauseChecker(PDChecker checker) {
        if (!this.pauseCheckerService.containsKey((Object)checker)) {
            ScheduledExecutorService newService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name())).setDaemon(true).build());
            newService.scheduleAtFixedRate(() -> this.pauseChecker(checker, 300), 0L, 60L, TimeUnit.SECONDS);
            this.pauseCheckerService.put(checker, newService);
        }
    }

    public synchronized void stopKeepPauseChecker(PDChecker checker) {
        if (this.pauseCheckerService.containsKey((Object)checker)) {
            this.pauseCheckerService.get((Object)checker).shutdown();
            this.pauseCheckerService.remove((Object)checker);
        }
    }

    public void resumeChecker(PDChecker checker) {
        this.pauseChecker(checker, 0);
    }

    private void pauseChecker(PDChecker checker, int timeout) {
        String verb = timeout == 0 ? "resume" : "pause";
        URI url = this.pdAddrs.get(0);
        String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
        HashMap<String, Integer> arguments = new HashMap<String, Integer>();
        arguments.put("delay", timeout);
        try (CloseableHttpClient client = HttpClients.createDefault();){
            JsonMapper jsonMapper = new JsonMapper();
            byte[] body = jsonMapper.writeValueAsBytes(arguments);
            HttpPost post = new HttpPost(api);
            post.setEntity(new ByteArrayEntity(body));
            try (CloseableHttpResponse resp = client.execute(post);){
                if (resp.getStatusLine().getStatusCode() != 200) {
                    logger.error("failed to {} checker.", (Object)verb);
                }
                logger.info("checker {} {}d", (Object)checker.apiName(), (Object)verb);
            }
        }
        catch (Exception e) {
            logger.error(String.format("failed to %s checker.", verb), (Throwable)e);
        }
    }

    public Boolean isCheckerPaused(PDChecker checker) {
        URI url = this.pdAddrs.get(0);
        String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
        try {
            ObjectMapper mapper = new ObjectMapper();
            HashMap<String, Boolean> status = mapper.readValue(new URL(api), new TypeReference<HashMap<String, Boolean>>(){});
            return status.get("paused");
        }
        catch (Exception e) {
            logger.error(String.format("failed to get %s checker status.", checker.apiName()), (Throwable)e);
            return null;
        }
    }

    void scatterRegion(Metapb.Region region, BackOffer backOffer) {
        Supplier<Pdpb.ScatterRegionRequest> request = () -> Pdpb.ScatterRegionRequest.newBuilder().setHeader(this.header).setRegionId(region.getId()).build();
        PDErrorHandler<Pdpb.ScatterRegionResponse> handler = new PDErrorHandler<Pdpb.ScatterRegionResponse>(r -> r.getHeader().hasError() ? PDError.buildFromPdpbError(r.getHeader().getError()) : null, this);
        Pdpb.ScatterRegionResponse resp = this.callWithRetry(backOffer, PDGrpc.getScatterRegionMethod(), request, handler);
        if (resp.hasHeader() && resp.getHeader().hasError()) {
            throw new TiClientInternalException(String.format("failed to scatter region because %s", resp.getHeader().getError()));
        }
    }

    void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
        while (true) {
            Pdpb.GetOperatorResponse resp;
            if ((resp = this.getOperator(region.getId())) == null) {
                continue;
            }
            if (this.isScatterRegionFinish(resp)) {
                logger.info(String.format("wait scatter region on %d is finished", region.getId()));
                return;
            }
            backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException("waiting scatter region"));
            logger.info(String.format("wait scatter region %d at key %s is %s", region.getId(), KeyUtils.formatBytes(resp.getDesc().toByteArray()), resp.getStatus()));
        }
    }

    private Pdpb.GetOperatorResponse getOperator(long regionId) {
        Supplier<Pdpb.GetOperatorRequest> request = () -> Pdpb.GetOperatorRequest.newBuilder().setHeader(this.header).setRegionId(regionId).build();
        return this.callWithRetry(ConcreteBackOffer.newCustomBackOff(0, this.getClusterId()), PDGrpc.getGetOperatorMethod(), request, new NoopHandler());
    }

    private boolean isScatterRegionFinish(Pdpb.GetOperatorResponse resp) {
        Pdpb.Error error;
        Pdpb.ResponseHeader header;
        boolean finished;
        boolean bl = finished = !resp.getDesc().equals(ByteString.copyFromUtf8("scatter-region")) || resp.getStatus() != Pdpb.OperatorStatus.RUNNING;
        if (resp.hasHeader() && (header = resp.getHeader()).hasError() && (error = header.getError()).getType() == Pdpb.ErrorType.REGION_NOT_FOUND) {
            finished = true;
        }
        return finished;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
        Histogram.Timer requestTimer = ((Histogram.Child)PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(this.getClusterId().toString())).startTimer();
        try {
            Supplier<Pdpb.GetRegionRequest> request = () -> Pdpb.GetRegionRequest.newBuilder().setHeader(this.header).setRegionKey(this.codec.encodePdQuery(key)).build();
            PDErrorHandler<Pdpb.GetRegionResponse> handler = new PDErrorHandler<Pdpb.GetRegionResponse>(PDErrorHandler.getRegionResponseErrorExtractor, this);
            Pdpb.GetRegionResponse resp = this.callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
            Pair<Metapb.Region, Metapb.Peer> pair = new Pair<Metapb.Region, Metapb.Peer>(this.codec.decodeRegion(resp.getRegion()), resp.getLeader());
            return pair;
        }
        finally {
            requestTimer.observeDuration();
        }
    }

    @Override
    public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id) {
        Supplier<Pdpb.GetRegionByIDRequest> request = () -> Pdpb.GetRegionByIDRequest.newBuilder().setHeader(this.header).setRegionId(id).build();
        PDErrorHandler<Pdpb.GetRegionResponse> handler = new PDErrorHandler<Pdpb.GetRegionResponse>(PDErrorHandler.getRegionResponseErrorExtractor, this);
        Pdpb.GetRegionResponse resp = this.callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
        return new Pair<Metapb.Region, Metapb.Peer>(this.codec.decodeRegion(resp.getRegion()), resp.getLeader());
    }

    @Override
    public List<Pdpb.Region> scanRegions(BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
        PDGrpc.PDBlockingStub stub = (PDGrpc.PDBlockingStub)this.getBlockingStub().withDeadlineAfter(this.conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS);
        Pair<ByteString, ByteString> range = this.codec.encodePdQueryRange(startKey, endKey);
        Pdpb.ScanRegionsRequest request = Pdpb.ScanRegionsRequest.newBuilder().setHeader(this.header).setStartKey((ByteString)range.first).setEndKey((ByteString)range.second).setLimit(limit).build();
        Pdpb.ScanRegionsResponse resp = stub.scanRegions(request);
        if (resp == null) {
            return null;
        }
        return this.codec.decodePdRegions(resp.getRegionsList());
    }

    private Supplier<Pdpb.GetStoreRequest> buildGetStoreReq(long storeId) {
        return () -> Pdpb.GetStoreRequest.newBuilder().setHeader(this.header).setStoreId(storeId).build();
    }

    private Supplier<Pdpb.GetAllStoresRequest> buildGetAllStoresReq() {
        return () -> Pdpb.GetAllStoresRequest.newBuilder().setHeader(this.header).build();
    }

    private <T> PDErrorHandler<Pdpb.GetStoreResponse> buildPDErrorHandler() {
        return new PDErrorHandler<Pdpb.GetStoreResponse>(r -> r.getHeader().hasError() ? PDError.buildFromPdpbError(r.getHeader().getError()) : null, this);
    }

    @Override
    public Metapb.Store getStore(BackOffer backOffer, long storeId) {
        Pdpb.GetStoreResponse resp = this.callWithRetry(backOffer, PDGrpc.getGetStoreMethod(), this.buildGetStoreReq(storeId), this.buildPDErrorHandler());
        if (resp != null) {
            return resp.getStore();
        }
        return null;
    }

    @Override
    public List<Metapb.Store> getAllStores(BackOffer backOffer) {
        return this.callWithRetry(backOffer, PDGrpc.getGetAllStoresMethod(), this.buildGetAllStoresReq(), new PDErrorHandler<Pdpb.GetAllStoresResponse>(r -> r.getHeader().hasError() ? PDError.buildFromPdpbError(r.getHeader().getError()) : null, this)).getStoresList();
    }

    @Override
    public TiConfiguration.ReplicaRead getReplicaRead() {
        return this.conf.getReplicaRead();
    }

    @Override
    public void close() throws InterruptedException {
        this.etcdClient.close();
        if (this.service != null) {
            this.service.shutdownNow();
        }
        if (this.tiflashReplicaService != null) {
            this.tiflashReplicaService.shutdownNow();
        }
        if (this.channelFactory != null) {
            this.channelFactory.close();
        }
    }

    @VisibleForTesting
    Pdpb.RequestHeader getHeader() {
        return this.header;
    }

    @VisibleForTesting
    PDClientWrapper getPdClientWrapper() {
        return this.pdClientWrapper;
    }

    private Pdpb.GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
        while (true) {
            try {
                ManagedChannel probChan = this.channelFactory.getChannel(PDUtils.uriToAddr(uri), this.hostMapping);
                PDGrpc.PDBlockingStub stub = (PDGrpc.PDBlockingStub)PDGrpc.newBlockingStub(probChan).withDeadlineAfter(this.getTimeout(), TimeUnit.MILLISECONDS);
                Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder().setHeader(Pdpb.RequestHeader.getDefaultInstance()).build();
                Pdpb.GetMembersResponse resp = stub.getMembers(request);
                if (resp != null && resp.getLeader().getMemberId() == 0L) {
                    return null;
                }
                return resp;
            }
            catch (Exception e) {
                logger.warn("failed to get member from pd server.", (Throwable)e);
                backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
                continue;
            }
            break;
        }
    }

    private Pdpb.GetMembersResponse getMembers(URI uri) {
        ConcreteBackOffer backOffer = ConcreteBackOffer.newCustomBackOff(5000);
        try {
            return this.doGetMembers(backOffer, uri);
        }
        catch (Exception e) {
            return null;
        }
    }

    synchronized boolean trySwitchLeader(String leaderUrlStr) {
        if (this.pdClientWrapper != null && leaderUrlStr.equals(this.pdClientWrapper.getLeaderInfo()) && leaderUrlStr.equals(this.pdClientWrapper.getStoreAddress())) {
            return true;
        }
        return this.createLeaderClientWrapper(leaderUrlStr);
    }

    private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) {
        try {
            ManagedChannel clientChannel = this.channelFactory.getChannel(leaderUrlStr, this.hostMapping);
            this.pdClientWrapper = new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
            this.timeout = this.conf.getTimeout();
        }
        catch (IllegalArgumentException e) {
            return false;
        }
        logger.info(String.format("Switched to new leader: %s", this.pdClientWrapper));
        return true;
    }

    synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) {
        try {
            if (!this.checkHealth(followerUrlStr, this.hostMapping)) {
                return false;
            }
            ManagedChannel channel = this.channelFactory.getChannel(followerUrlStr, this.hostMapping);
            this.pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
            this.timeout = this.conf.getForwardTimeout();
        }
        catch (IllegalArgumentException e) {
            return false;
        }
        logger.info(String.format("Switched to new leader by follower forward: %s", this.pdClientWrapper));
        return true;
    }

    public synchronized void updateLeaderOrForwardFollower() {
        if (System.currentTimeMillis() - this.lastUpdateLeaderTime < 50L) {
            return;
        }
        for (URI url : this.pdAddrs) {
            Pdpb.GetMembersResponse resp = this.getMembers(url);
            if (resp == null || resp.getLeader().getClientUrlsList().isEmpty()) continue;
            String leaderUrlStr = (String)resp.getLeader().getClientUrlsList().get(0);
            if (this.checkHealth(leaderUrlStr = PDUtils.uriToAddr(PDUtils.addrToUri(leaderUrlStr)), this.hostMapping) && this.createLeaderClientWrapper(leaderUrlStr)) {
                this.lastUpdateLeaderTime = System.currentTimeMillis();
                return;
            }
            if (!this.conf.getEnableGrpcForward()) continue;
            logger.info(String.format("can not switch to new leader, try follower forward", new Object[0]));
            List<Pdpb.Member> members = resp.getMembersList();
            boolean hasReachNextMember = this.pdClientWrapper != null && this.pdClientWrapper.getStoreAddress().equals(leaderUrlStr);
            for (int i = 0; i < members.size() * 2; ++i) {
                Pdpb.Member member = members.get(i % members.size());
                if (member.getMemberId() == resp.getLeader().getMemberId()) continue;
                String followerUrlStr = (String)member.getClientUrlsList().get(0);
                followerUrlStr = PDUtils.uriToAddr(PDUtils.addrToUri(followerUrlStr));
                if (this.pdClientWrapper != null && this.pdClientWrapper.getStoreAddress().equals(followerUrlStr)) {
                    hasReachNextMember = true;
                    continue;
                }
                if (!hasReachNextMember || !this.createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) continue;
                logger.warn(String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr));
                return;
            }
        }
        this.lastUpdateLeaderTime = System.currentTimeMillis();
        if (this.pdClientWrapper == null) {
            throw new TiClientInternalException("already tried all address on file, but not leader found yet.");
        }
    }

    public void tryUpdateLeader() {
        for (URI url : this.pdAddrs) {
            Pdpb.GetMembersResponse resp = this.getMembers(url);
            if (resp == null) continue;
            List<URI> urls = resp.getMembersList().stream().map(mem -> PDUtils.addrToUri(mem.getClientUrls(0))).collect(Collectors.toList());
            String leaderUrlStr = (String)resp.getLeader().getClientUrlsList().get(0);
            if (!this.checkHealth(leaderUrlStr = PDUtils.uriToAddr(PDUtils.addrToUri(leaderUrlStr)), this.hostMapping) || !this.trySwitchLeader(leaderUrlStr)) continue;
            if (!urls.equals(this.pdAddrs)) {
                this.tryUpdateMembers(urls);
            }
            return;
        }
        this.lastUpdateLeaderTime = System.currentTimeMillis();
        if (this.pdClientWrapper == null) {
            throw new TiClientInternalException("already tried all address on file, but not leader found yet.");
        }
    }

    private synchronized void tryUpdateMembers(List<URI> members) {
        this.pdAddrs = members;
    }

    public void updateTiFlashReplicaStatus() {
        ByteSequence prefix = ByteSequence.from(TIFLASH_TABLE_SYNC_PROGRESS_PATH, StandardCharsets.UTF_8);
        for (int i = 0; i < 5; ++i) {
            GetResponse getResp;
            CompletableFuture<GetResponse> resp;
            try {
                resp = this.etcdClient.getKVClient().get(prefix, GetOption.newBuilder().withPrefix(prefix).build());
            }
            catch (Exception e) {
                logger.info("get tiflash table replica sync progress failed, continue checking.", (Throwable)e);
                continue;
            }
            try {
                getResp = resp.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                continue;
            }
            catch (ExecutionException e) {
                throw new GrpcException("failed to update tiflash replica", e);
            }
            ConcurrentHashMap<Long, Double> progressMap = new ConcurrentHashMap<Long, Double>();
            for (KeyValue kv : getResp.getKvs()) {
                double progress;
                long tableId;
                try {
                    tableId = Long.parseLong(kv.getKey().toString().substring(TIFLASH_TABLE_SYNC_PROGRESS_PATH.length()));
                }
                catch (Exception e) {
                    logger.info("invalid tiflash table replica sync progress key. key = " + kv.getKey().toString());
                    continue;
                }
                try {
                    progress = Double.parseDouble(kv.getValue().toString());
                }
                catch (Exception e) {
                    logger.info("invalid tiflash table replica sync progress value. value = " + kv.getValue().toString());
                    continue;
                }
                progressMap.put(tableId, progress);
            }
            this.tiflashReplicaMap = progressMap;
            break;
        }
    }

    public double getTiFlashReplicaProgress(long tableId) {
        return this.tiflashReplicaMap.getOrDefault(tableId, 0.0);
    }

    @Override
    protected PDGrpc.PDBlockingStub getBlockingStub() {
        if (this.pdClientWrapper == null) {
            throw new GrpcException("PDClient may not be initialized");
        }
        return (PDGrpc.PDBlockingStub)this.pdClientWrapper.getBlockingStub().withDeadlineAfter(this.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override
    protected PDGrpc.PDFutureStub getAsyncStub() {
        if (this.pdClientWrapper == null) {
            throw new GrpcException("PDClient may not be initialized");
        }
        return (PDGrpc.PDFutureStub)this.pdClientWrapper.getAsyncStub().withDeadlineAfter(this.getTimeout(), TimeUnit.MILLISECONDS);
    }

    private void initCluster() {
        logger.info("init cluster: start");
        Pdpb.GetMembersResponse resp = null;
        ArrayList<URI> pdAddrs = new ArrayList<URI>(this.getConf().getPdAddrs());
        Collections.shuffle(pdAddrs);
        this.pdAddrs = pdAddrs;
        this.etcdClient = Client.builder().endpoints(pdAddrs).executorService(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("etcd-conn-manager-pool-%d").setDaemon(true).build())).build();
        logger.info("init host mapping: start");
        this.hostMapping = Optional.ofNullable(this.getConf().getHostMapping()).orElseGet(() -> new DefaultHostMapping(this.etcdClient, this.conf.getNetworkMappingName()));
        logger.info("init host mapping: end");
        long originTimeout = this.timeout;
        this.timeout = this.conf.getPdFirstGetMemberTimeout();
        for (URI u : pdAddrs) {
            logger.info("get members with pd " + u + ": start");
            resp = this.getMembers(u);
            logger.info("get members with pd " + u + ": end");
            if (resp == null) continue;
            break;
        }
        if (resp == null) {
            logger.error("Could not get leader member with: " + pdAddrs);
        }
        this.timeout = originTimeout;
        Preconditions.checkNotNull(resp, "Failed to init client for PD cluster.");
        long clusterId = resp.getHeader().getClusterId();
        this.header = Pdpb.RequestHeader.newBuilder().setClusterId(clusterId).build();
        this.tsoReq = Pdpb.TsoRequest.newBuilder().setHeader(this.header).setCount(1).build();
        this.tiflashReplicaMap = new ConcurrentHashMap<Long, Double>();
        this.pdAddrs = resp.getMembersList().stream().map(mem -> PDUtils.addrToUri(mem.getClientUrls(0))).collect(Collectors.toList());
        logger.info("init cluster with address: " + this.pdAddrs);
        String leaderUrlStr = resp.getLeader().getClientUrls(0);
        leaderUrlStr = PDUtils.uriToAddr(PDUtils.addrToUri(leaderUrlStr));
        logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": start");
        this.createLeaderClientWrapper(leaderUrlStr);
        logger.info("createLeaderClientWrapper with leader " + leaderUrlStr + ": end");
        this.service = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("PDClient-update-leader-pool-%d").setDaemon(true).build());
        this.service.scheduleAtFixedRate(() -> {
            try {
                this.tryUpdateLeader();
            }
            catch (Exception e) {
                logger.warn("Update leader failed", (Throwable)e);
            }
        }, 10L, 10L, TimeUnit.SECONDS);
        if (this.conf.isTiFlashEnabled()) {
            this.tiflashReplicaService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("PDClient-tiflash-replica-pool-%d").setDaemon(true).build());
            this.tiflashReplicaService.scheduleAtFixedRate(this::updateTiFlashReplicaStatus, 10L, 10L, TimeUnit.SECONDS);
        }
        logger.info("init cluster: finish");
    }

    @Override
    public Long getClusterId() {
        return this.header.getClusterId();
    }

    public List<URI> getPdAddrs() {
        return this.pdAddrs;
    }

    @Override
    public RequestKeyCodec getCodec() {
        return this.codec;
    }

    static class PDClientWrapper {
        private final String leaderInfo;
        private final PDGrpc.PDBlockingStub blockingStub;
        private final PDGrpc.PDFutureStub asyncStub;
        private final long createTime;
        private final String storeAddress;

        PDClientWrapper(String leaderInfo, String storeAddress, ManagedChannel clientChannel, long createTime) {
            if (!storeAddress.equals(leaderInfo)) {
                Metadata header = new Metadata();
                header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, PDUtils.addrToUri(leaderInfo).toString());
                this.blockingStub = MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header);
                this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header);
            } else {
                this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
                this.asyncStub = PDGrpc.newFutureStub(clientChannel);
            }
            this.leaderInfo = leaderInfo;
            this.storeAddress = storeAddress;
            this.createTime = createTime;
        }

        String getLeaderInfo() {
            return this.leaderInfo;
        }

        String getStoreAddress() {
            return this.storeAddress;
        }

        PDGrpc.PDBlockingStub getBlockingStub() {
            return this.blockingStub;
        }

        PDGrpc.PDFutureStub getAsyncStub() {
            return this.asyncStub;
        }

        long getCreateTime() {
            return this.createTime;
        }

        public String toString() {
            return "[leaderInfo: " + this.leaderInfo + ", storeAddress: " + this.storeAddress + "]";
        }
    }
}

