package io.nessus.aries.websocket;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import io.nessus.aries.util.AssertState;
import io.nessus.aries.util.SafeConsumer;
import io.nessus.aries.util.ThreadUtils;
import io.nessus.aries.wallet.WalletRegistry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.Response;
import okhttp3.WebSocket;
import org.hyperledger.aries.api.connection.ConnectionRecord;
import org.hyperledger.aries.api.discover_features.DiscoverFeatureEvent;
import org.hyperledger.aries.api.endorser.EndorseTransactionRecord;
import org.hyperledger.aries.api.issue_credential_v1.V1CredentialExchange;
import org.hyperledger.aries.api.issue_credential_v2.V20CredExRecord;
import org.hyperledger.aries.api.issue_credential_v2.V2IssueIndyCredentialEvent;
import org.hyperledger.aries.api.issue_credential_v2.V2IssueLDCredentialEvent;
import org.hyperledger.aries.api.message.BasicMessage;
import org.hyperledger.aries.api.message.ProblemReport;
import org.hyperledger.aries.api.present_proof.PresentationExchangeRecord;
import org.hyperledger.aries.api.present_proof_v2.V20PresExRecord;
import org.hyperledger.aries.api.revocation.RevocationEvent;
import org.hyperledger.aries.api.revocation.RevocationNotificationEvent;
import org.hyperledger.aries.api.revocation.RevocationNotificationEventV2;
import org.hyperledger.aries.api.revocation.RevocationRegistryState;
import org.hyperledger.aries.api.settings.Settings;
import org.hyperledger.aries.api.trustping.PingEvent;
import org.hyperledger.aries.config.GsonConfig;
import org.hyperledger.aries.webhook.EventParser;
import org.hyperledger.aries.webhook.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/nessus/aries/websocket/WebSocketListener.class */
public class WebSocketListener extends okhttp3.WebSocketListener {
    static final Logger log = LoggerFactory.getLogger(WebSocketListener.class);
    private static final Gson gson = GsonConfig.defaultConfig();
    private static final Gson pretty = GsonConfig.prettyPrinter();
    private WebSocketState state = WebSocketState.NEW;
    private final Map<EventType, List<WebSocketEvent>> recordedEvents = new HashMap();
    private final Lock accessLock = new ReentrantLock();
    private final EventParser parser = new EventParser();
    private final WalletRegistry walletRegistry;
    private final List<String> walletIdFilter;
    private final String label;

    /* loaded from: input_file:io/nessus/aries/websocket/WebSocketListener$WebSocketEvent.class */
    public class WebSocketEvent {
        private final String topic;
        private final String walletId;
        private final Object payload;

        WebSocketEvent(String str, String str2, Object obj) {
            this.walletId = str;
            this.topic = str2;
            this.payload = obj;
        }

        public WebSocketListener getWebSocketListener() {
            return WebSocketListener.this;
        }

        public String getTopic() {
            return this.topic;
        }

        public Class<?> getEventType() {
            return this.payload.getClass();
        }

        public String getWalletName() {
            return WebSocketListener.this.walletRegistry != null ? WebSocketListener.this.walletRegistry.getWalletName(this.walletId) : WebSocketListener.this.label;
        }

        public <T> T getPayload(Class<T> cls) {
            return (T) this.payload;
        }
    }

    /* loaded from: input_file:io/nessus/aries/websocket/WebSocketListener$WebSocketState.class */
    public enum WebSocketState {
        NEW,
        OPEN,
        CLOSING,
        CLOSED
    }

    public WebSocketListener(String str, WalletRegistry walletRegistry, List<String> list) {
        this.label = str;
        this.walletRegistry = walletRegistry;
        this.walletIdFilter = list != null ? new ArrayList(list) : null;
    }

    public WebSocketState getWebSocketState() {
        return this.state;
    }

    public void onOpen(WebSocket webSocket, Response response) {
        log.info("{}: WebSocket Open: {}", this.label, response);
        this.state = WebSocketState.OPEN;
    }

    public void onClosing(WebSocket webSocket, int i, String str) {
        log.info("{}: WebSocket Closing: {} {}", new Object[]{this.label, Integer.valueOf(i), str});
        this.state = WebSocketState.CLOSING;
    }

    public void onClosed(WebSocket webSocket, int i, String str) {
        log.info("{}: WebSocket Closed: {} {}", new Object[]{this.label, Integer.valueOf(i), str});
        this.accessLock.lock();
        try {
            this.recordedEvents.clear();
            this.state = WebSocketState.CLOSED;
            this.accessLock.unlock();
        } catch (Throwable th) {
            this.accessLock.unlock();
            throw th;
        }
    }

    public void onFailure(WebSocket webSocket, Throwable th, Response response) {
        String message = response != null ? response.message() : th.getMessage();
        if ("Socket closed".equals(message)) {
            return;
        }
        log.error(String.format("[%s] Failure: %s", this.label, message), th);
    }

    public void onMessage(WebSocket webSocket, String str) {
        log.trace("{} Event: {}", this.label, str);
        try {
            JsonObject jsonObject = (JsonObject) gson.fromJson(str, JsonObject.class);
            String asString = jsonObject.has("wallet_id") ? jsonObject.get("wallet_id").getAsString() : null;
            String jsonElement = jsonObject.has("payload") ? jsonObject.get("payload").toString() : "{}";
            String asString2 = jsonObject.get("topic").getAsString();
            if (notWsPing(asString2, jsonElement) && isForWalletId(asString)) {
                handleEvent(asString, asString2, jsonElement);
                log.debug("{}", pretty.toJson(jsonObject));
            }
        } catch (JsonSyntaxException e) {
            log.error("JsonSyntaxException", e);
        }
    }

    private boolean notWsPing(String str, String str2) {
        return (EventType.PING.topicEquals(str) && "{}".equals(str2)) ? false : true;
    }

    private boolean isForWalletId(String str) {
        return this.walletIdFilter == null || this.walletIdFilter.contains(str);
    }

    private void handleEvent(String str, String str2, String str3) {
        Object orElseThrow;
        SafeConsumer safeConsumer;
        try {
            if (EventType.CONNECTIONS.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, ConnectionRecord.class).orElseThrow();
                safeConsumer = webSocketEvent -> {
                    handleConnection(webSocketEvent);
                };
            } else if (EventType.PRESENT_PROOF.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, PresentationExchangeRecord.class).orElseThrow();
                safeConsumer = webSocketEvent2 -> {
                    handlePresentProofV1(webSocketEvent2);
                };
            } else if (EventType.PRESENT_PROOF_V2.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, V20PresExRecord.class).orElseThrow();
                safeConsumer = webSocketEvent3 -> {
                    handlePresentProofV2(webSocketEvent3);
                };
            } else if (EventType.ISSUE_CREDENTIAL.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, V1CredentialExchange.class).orElseThrow();
                safeConsumer = webSocketEvent4 -> {
                    handleIssueCredentialV1(webSocketEvent4);
                };
            } else if (EventType.ISSUE_CREDENTIAL_V2.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, V20CredExRecord.class).orElseThrow();
                safeConsumer = webSocketEvent5 -> {
                    handleIssueCredentialV2(webSocketEvent5);
                };
            } else if (EventType.ISSUE_CREDENTIAL_V2_INDY.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, V2IssueIndyCredentialEvent.class).orElseThrow();
                safeConsumer = webSocketEvent6 -> {
                    handleIssueCredentialV2Indy(webSocketEvent6);
                };
            } else if (EventType.ISSUE_CREDENTIAL_V2_LD_PROOF.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, V2IssueLDCredentialEvent.class).orElseThrow();
                safeConsumer = webSocketEvent7 -> {
                    handleIssueCredentialV2LD(webSocketEvent7);
                };
            } else if (EventType.BASIC_MESSAGES.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, BasicMessage.class).orElseThrow();
                safeConsumer = webSocketEvent8 -> {
                    handleBasicMessage(webSocketEvent8);
                };
            } else if (EventType.PING.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, PingEvent.class).orElseThrow();
                safeConsumer = webSocketEvent9 -> {
                    handleTrustPing(webSocketEvent9);
                };
            } else if (EventType.ISSUER_CRED_REV.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, RevocationEvent.class).orElseThrow();
                safeConsumer = webSocketEvent10 -> {
                    handleIssuerRevocation(webSocketEvent10);
                };
            } else if (EventType.ENDORSE_TRANSACTION.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, EndorseTransactionRecord.class).orElseThrow();
                safeConsumer = webSocketEvent11 -> {
                    handleEndorseTransaction(webSocketEvent11);
                };
            } else if (EventType.PROBLEM_REPORT.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, ProblemReport.class).orElseThrow();
                safeConsumer = webSocketEvent12 -> {
                    handleProblemReport(webSocketEvent12);
                };
            } else if (EventType.DISCOVER_FEATURE.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, DiscoverFeatureEvent.class).orElseThrow();
                safeConsumer = webSocketEvent13 -> {
                    handleDiscoverFeature(webSocketEvent13);
                };
            } else if (EventType.REVOCATION_NOTIFICATION.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, RevocationNotificationEvent.class).orElseThrow();
                safeConsumer = webSocketEvent14 -> {
                    handleRevocationNotificationV1(webSocketEvent14);
                };
            } else if (EventType.REVOCATION_NOTIFICATION_V2.topicEquals(str2)) {
                orElseThrow = this.parser.parseValueSave(str3, RevocationNotificationEventV2.class).orElseThrow();
                safeConsumer = webSocketEvent15 -> {
                    handleRevocationNotificationV2(webSocketEvent15);
                };
            } else if (!EventType.SETTINGS.topicEquals(str2)) {
                log.warn("Unsupported event topic: {}", str2);
                return;
            } else {
                orElseThrow = this.parser.parseValueSave(str3, Settings.class).orElseThrow();
                safeConsumer = webSocketEvent16 -> {
                    handleSettings(webSocketEvent16);
                };
            }
            safeConsumer.accept(recordEvent(new WebSocketEvent(str, str2, orElseThrow)));
        } catch (Throwable th) {
            log.error("Error in webhook event handler:", th);
        }
    }

    protected BasicMessage handleBasicMessage(WebSocketEvent webSocketEvent) {
        BasicMessage basicMessage = (BasicMessage) webSocketEvent.getPayload(BasicMessage.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), basicMessage});
        return basicMessage;
    }

    protected ConnectionRecord handleConnection(WebSocketEvent webSocketEvent) throws Exception {
        ConnectionRecord connectionRecord = (ConnectionRecord) webSocketEvent.getPayload(ConnectionRecord.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), connectionRecord.getTheirRole(), connectionRecord.getState(), connectionRecord});
        return connectionRecord;
    }

    protected DiscoverFeatureEvent handleDiscoverFeature(WebSocketEvent webSocketEvent) {
        DiscoverFeatureEvent discoverFeatureEvent = (DiscoverFeatureEvent) webSocketEvent.getPayload(DiscoverFeatureEvent.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), discoverFeatureEvent});
        return discoverFeatureEvent;
    }

    protected EndorseTransactionRecord handleEndorseTransaction(WebSocketEvent webSocketEvent) {
        EndorseTransactionRecord endorseTransactionRecord = (EndorseTransactionRecord) webSocketEvent.getPayload(EndorseTransactionRecord.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), endorseTransactionRecord});
        return endorseTransactionRecord;
    }

    protected V1CredentialExchange handleIssueCredentialV1(WebSocketEvent webSocketEvent) throws Exception {
        V1CredentialExchange v1CredentialExchange = (V1CredentialExchange) webSocketEvent.getPayload(V1CredentialExchange.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), v1CredentialExchange.getRole(), v1CredentialExchange.getState(), v1CredentialExchange});
        return v1CredentialExchange;
    }

    protected V20CredExRecord handleIssueCredentialV2(WebSocketEvent webSocketEvent) {
        V20CredExRecord v20CredExRecord = (V20CredExRecord) webSocketEvent.getPayload(V20CredExRecord.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), v20CredExRecord});
        return v20CredExRecord;
    }

    protected V2IssueIndyCredentialEvent handleIssueCredentialV2Indy(WebSocketEvent webSocketEvent) {
        V2IssueIndyCredentialEvent v2IssueIndyCredentialEvent = (V2IssueIndyCredentialEvent) webSocketEvent.getPayload(V2IssueIndyCredentialEvent.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), v2IssueIndyCredentialEvent});
        return v2IssueIndyCredentialEvent;
    }

    protected V2IssueLDCredentialEvent handleIssueCredentialV2LD(WebSocketEvent webSocketEvent) {
        V2IssueLDCredentialEvent v2IssueLDCredentialEvent = (V2IssueLDCredentialEvent) webSocketEvent.getPayload(V2IssueLDCredentialEvent.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), v2IssueLDCredentialEvent});
        return v2IssueLDCredentialEvent;
    }

    protected RevocationEvent handleIssuerRevocation(WebSocketEvent webSocketEvent) throws Exception {
        RevocationEvent revocationEvent = (RevocationEvent) webSocketEvent.getPayload(RevocationEvent.class);
        log.info("{}: [@{}] {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), revocationEvent.getState(), revocationEvent});
        return revocationEvent;
    }

    protected PresentationExchangeRecord handlePresentProofV1(WebSocketEvent webSocketEvent) throws Exception {
        PresentationExchangeRecord presentationExchangeRecord = (PresentationExchangeRecord) webSocketEvent.getPayload(PresentationExchangeRecord.class);
        log.info("{}: [@{}] {} {} {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), presentationExchangeRecord.getRole(), presentationExchangeRecord.getState(), presentationExchangeRecord});
        return presentationExchangeRecord;
    }

    protected V20PresExRecord handlePresentProofV2(WebSocketEvent webSocketEvent) {
        V20PresExRecord v20PresExRecord = (V20PresExRecord) webSocketEvent.getPayload(V20PresExRecord.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), v20PresExRecord});
        return v20PresExRecord;
    }

    protected ProblemReport handleProblemReport(WebSocketEvent webSocketEvent) {
        ProblemReport problemReport = (ProblemReport) webSocketEvent.getPayload(ProblemReport.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), problemReport});
        return problemReport;
    }

    protected RevocationNotificationEvent handleRevocationNotificationV1(WebSocketEvent webSocketEvent) {
        RevocationNotificationEvent revocationNotificationEvent = (RevocationNotificationEvent) webSocketEvent.getPayload(RevocationNotificationEvent.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), revocationNotificationEvent});
        return revocationNotificationEvent;
    }

    protected RevocationNotificationEventV2 handleRevocationNotificationV2(WebSocketEvent webSocketEvent) {
        RevocationNotificationEventV2 revocationNotificationEventV2 = (RevocationNotificationEventV2) webSocketEvent.getPayload(RevocationNotificationEventV2.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), revocationNotificationEventV2});
        return revocationNotificationEventV2;
    }

    protected Settings handleSettings(WebSocketEvent webSocketEvent) throws Exception {
        Settings settings = (Settings) webSocketEvent.getPayload(Settings.class);
        log.info("{}: {}", webSocketEvent.getWalletName(), settings);
        return settings;
    }

    protected PingEvent handleTrustPing(WebSocketEvent webSocketEvent) {
        PingEvent pingEvent = (PingEvent) webSocketEvent.getPayload(PingEvent.class);
        log.info("{}: [@{}] {} {} {}", new Object[]{this.label, webSocketEvent.getWalletName(), pingEvent});
        return pingEvent;
    }

    public boolean isRecording(EventType eventType) {
        this.accessLock.lock();
        try {
            return this.recordedEvents.containsKey(eventType);
        } finally {
            this.accessLock.unlock();
        }
    }

    public void restartRecording(EventType... eventTypeArr) {
        AssertState.notNull(eventTypeArr, "Not evtypes");
        this.accessLock.lock();
        try {
            for (EventType eventType : eventTypeArr) {
                stopRecording(eventType);
                startRecording(eventType);
            }
        } finally {
            this.accessLock.unlock();
        }
    }

    public void startRecording(EventType... eventTypeArr) {
        AssertState.notNull(eventTypeArr, "Not evtypes");
        this.accessLock.lock();
        try {
            for (EventType eventType : eventTypeArr) {
                if (!isRecording(eventType)) {
                    this.recordedEvents.put(eventType, new ArrayList());
                }
            }
        } finally {
            this.accessLock.unlock();
        }
    }

    public void stopRecording(EventType... eventTypeArr) {
        this.accessLock.lock();
        try {
            for (EventType eventType : eventTypeArr) {
                this.recordedEvents.remove(eventType);
            }
        } finally {
            this.accessLock.unlock();
        }
    }

    private WebSocketEvent recordEvent(WebSocketEvent webSocketEvent) {
        EventType eventType = (EventType) EventType.fromTopic(webSocketEvent.topic).get();
        this.accessLock.lock();
        try {
            if (isRecording(eventType)) {
                this.recordedEvents.get(eventType).add(webSocketEvent);
            }
            return webSocketEvent;
        } finally {
            this.accessLock.unlock();
        }
    }

    public Stream<BasicMessage> awaitBasicMessage(Predicate<BasicMessage> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.BASIC_MESSAGES, BasicMessage.class, predicate, j, timeUnit);
    }

    public Stream<ConnectionRecord> awaitConnection(Predicate<ConnectionRecord> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.CONNECTIONS, ConnectionRecord.class, predicate, j, timeUnit);
    }

    public Stream<DiscoverFeatureEvent> awaitDiscoveredFeature(Predicate<DiscoverFeatureEvent> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.DISCOVER_FEATURE, DiscoverFeatureEvent.class, predicate, j, timeUnit);
    }

    public Stream<EndorseTransactionRecord> awaitEndorseTransaction(Predicate<EndorseTransactionRecord> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.ENDORSE_TRANSACTION, EndorseTransactionRecord.class, predicate, j, timeUnit);
    }

    public Stream<V1CredentialExchange> awaitIssueCredentialV1(Predicate<V1CredentialExchange> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.ISSUE_CREDENTIAL, V1CredentialExchange.class, predicate, j, timeUnit);
    }

    public Stream<V20CredExRecord> awaitIssueCredentialV2(Predicate<V20CredExRecord> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.ISSUE_CREDENTIAL_V2, V20CredExRecord.class, predicate, j, timeUnit);
    }

    public Stream<V2IssueIndyCredentialEvent> awaitIssueCredentialV2Indy(Predicate<V2IssueIndyCredentialEvent> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.ISSUE_CREDENTIAL_V2_INDY, V2IssueIndyCredentialEvent.class, predicate, j, timeUnit);
    }

    public Stream<V2IssueLDCredentialEvent> awaitIssueCredentialV2LD(Predicate<V2IssueLDCredentialEvent> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.ISSUE_CREDENTIAL_V2_LD_PROOF, V2IssueLDCredentialEvent.class, predicate, j, timeUnit);
    }

    public Stream<RevocationEvent> awaitIssuerRevocation(Predicate<RevocationEvent> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.ISSUER_CRED_REV, RevocationEvent.class, predicate, j, timeUnit);
    }

    public Stream<PresentationExchangeRecord> awaitPresentProofV1(Predicate<PresentationExchangeRecord> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.PRESENT_PROOF, PresentationExchangeRecord.class, predicate, j, timeUnit);
    }

    public Stream<V20PresExRecord> awaitPresentProofV2(Predicate<V20PresExRecord> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.PRESENT_PROOF_V2, V20PresExRecord.class, predicate, j, timeUnit);
    }

    public Stream<ProblemReport> awaitProblemReport(Predicate<ProblemReport> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.PROBLEM_REPORT, ProblemReport.class, predicate, j, timeUnit);
    }

    public Stream<RevocationNotificationEvent> awaitRevocationNotificationV1(Predicate<RevocationNotificationEvent> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.REVOCATION_NOTIFICATION, RevocationNotificationEvent.class, predicate, j, timeUnit);
    }

    public Stream<RevocationNotificationEventV2> awaitRevocationNotificationV2(Predicate<RevocationNotificationEventV2> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.REVOCATION_NOTIFICATION_V2, RevocationNotificationEventV2.class, predicate, j, timeUnit);
    }

    public Stream<RevocationRegistryState> awaitRevocationRegistry(Predicate<RevocationRegistryState> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.REVOCATION_REGISTRY, RevocationRegistryState.class, predicate, j, timeUnit);
    }

    public Stream<Settings> awaitSettings(Predicate<Settings> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.SETTINGS, Settings.class, predicate, j, timeUnit);
    }

    public Stream<PingEvent> awaitTrustPing(Predicate<PingEvent> predicate, long j, TimeUnit timeUnit) {
        return getPayloadStream(EventType.PING, PingEvent.class, predicate, j, timeUnit);
    }

    private <T> Stream<T> getPayloadStream(EventType eventType, Class<T> cls, Predicate<T> predicate, long j, TimeUnit timeUnit) {
        List emptyList = Collections.emptyList();
        if (isRecording(eventType)) {
            long currentTimeMillis = System.currentTimeMillis();
            long millis = currentTimeMillis + timeUnit.toMillis(j);
            while (emptyList.isEmpty() && currentTimeMillis < millis) {
                this.accessLock.lock();
                try {
                    emptyList = (List) this.recordedEvents.get(eventType).stream().map(webSocketEvent -> {
                        return webSocketEvent.getPayload(cls);
                    }).filter(obj -> {
                        return predicate.test(obj);
                    }).collect(Collectors.toList());
                    this.accessLock.unlock();
                    ThreadUtils.sleepWell(200L);
                    currentTimeMillis = System.currentTimeMillis();
                } catch (Throwable th) {
                    this.accessLock.unlock();
                    throw th;
                }
            }
        }
        return emptyList.stream();
    }
}
