package org.infinispan.stream.impl;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.commons.CacheException;
import org.infinispan.container.entries.CacheEntry;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-9.4.16.Final.jar:org/infinispan/stream/impl/TxLockedStreamImpl.class */
public class TxLockedStreamImpl<K, V> extends LockedStreamImpl<K, V> {
    final TransactionManager tm;

    public TxLockedStreamImpl(TransactionManager transactionManager, CacheStream<CacheEntry<K, V>> cacheStream, long j, TimeUnit timeUnit) {
        super(cacheStream, j, timeUnit);
        this.tm = (TransactionManager) Objects.requireNonNull(transactionManager);
    }

    TxLockedStreamImpl(TransactionManager transactionManager, CacheStream<CacheEntry<K, V>> cacheStream, Predicate<? super CacheEntry<K, V>> predicate, long j, TimeUnit timeUnit) {
        super(cacheStream, predicate, j, timeUnit);
        this.tm = transactionManager;
    }

    @Override // org.infinispan.stream.impl.LockedStreamImpl, org.infinispan.LockedStream
    public void forEach(BiConsumer<Cache<K, V>, ? super CacheEntry<K, V>> biConsumer) {
        Transaction transaction = null;
        try {
            transaction = suspendOngoingTransactionIfExists();
            super.forEach(biConsumer);
            resumePreviousOngoingTransaction(transaction);
        } catch (Throwable th) {
            resumePreviousOngoingTransaction(transaction);
            throw th;
        }
    }

    @Override // org.infinispan.stream.impl.LockedStreamImpl, org.infinispan.LockedStream
    public <R> Map<K, R> invokeAll(BiFunction<Cache<K, V>, ? super CacheEntry<K, V>, R> biFunction) {
        Transaction transaction = null;
        try {
            transaction = suspendOngoingTransactionIfExists();
            Map<K, R> invokeAll = super.invokeAll(biFunction);
            resumePreviousOngoingTransaction(transaction);
            return invokeAll;
        } catch (Throwable th) {
            resumePreviousOngoingTransaction(transaction);
            throw th;
        }
    }

    private Transaction suspendOngoingTransactionIfExists() {
        Transaction ongoingTransaction = getOngoingTransaction();
        if (ongoingTransaction != null) {
            try {
                this.tm.suspend();
            } catch (SystemException e) {
                throw new CacheException("Unable to suspend transaction.", e);
            }
        }
        return ongoingTransaction;
    }

    private Transaction getOngoingTransaction() {
        try {
            return this.tm.getTransaction();
        } catch (SystemException e) {
            throw new CacheException("Unable to get transaction", e);
        }
    }

    private void resumePreviousOngoingTransaction(Transaction transaction) {
        if (transaction != null) {
            try {
                this.tm.resume(transaction);
            } catch (Exception e) {
                throw new CacheException("Had problems trying to resume a transaction after locked stream forEach()", e);
            }
        }
    }

    @Override // org.infinispan.stream.impl.LockedStreamImpl
    LockedStreamImpl<K, V> newStream(CacheStream<CacheEntry<K, V>> cacheStream, Predicate<? super CacheEntry<K, V>> predicate, long j, TimeUnit timeUnit) {
        return new TxLockedStreamImpl(this.tm, cacheStream, predicate, j, timeUnit);
    }
}
