/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.saga.annotation;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.TreeMap;
import org.axonframework.common.AxonNonTransientException;
import org.axonframework.common.annotation.ParameterResolverFactory;
import org.axonframework.correlation.CorrelationDataHolder;
import org.axonframework.correlation.CorrelationDataProvider;
import org.axonframework.domain.EventMessage;
import org.axonframework.saga.AssociationValue;
import org.axonframework.saga.AssociationValues;
import org.axonframework.saga.Saga;
import org.axonframework.saga.SagaRepository;
import org.axonframework.saga.annotation.AbstractAnnotatedSaga;
import org.axonframework.saga.annotation.AsyncAnnotatedSagaManager;
import org.axonframework.saga.annotation.AsyncSagaProcessingEvent;
import org.axonframework.unitofwork.UnitOfWork;
import org.axonframework.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AsyncSagaEventProcessor
implements EventHandler<AsyncSagaProcessingEvent>,
LifecycleAware {
    private static final Logger logger = LoggerFactory.getLogger(AsyncSagaEventProcessor.class);
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final SagaRepository sagaRepository;
    private final Map<String, Saga> processedSagas = new TreeMap<String, Saga>();
    private final Map<String, Saga> newlyCreatedSagas = new TreeMap<String, Saga>();
    private final ParameterResolverFactory parameterResolverFactory;
    private final int processorCount;
    private final int processorId;
    private final RingBuffer<AsyncSagaProcessingEvent> ringBuffer;
    private final AsyncAnnotatedSagaManager.SagaManagerStatus status;
    private final CorrelationDataProvider<? super EventMessage> correlationDataProvider;
    private UnitOfWork unitOfWork;

    private AsyncSagaEventProcessor(SagaRepository sagaRepository, ParameterResolverFactory parameterResolverFactory, int processorCount, int processorId, UnitOfWorkFactory unitOfWorkFactory, RingBuffer<AsyncSagaProcessingEvent> ringBuffer, AsyncAnnotatedSagaManager.SagaManagerStatus status, CorrelationDataProvider<? super EventMessage> correlationDataProvider) {
        this.sagaRepository = sagaRepository;
        this.parameterResolverFactory = parameterResolverFactory;
        this.processorCount = processorCount;
        this.processorId = processorId;
        this.unitOfWorkFactory = unitOfWorkFactory;
        this.ringBuffer = ringBuffer;
        this.status = status;
        this.correlationDataProvider = correlationDataProvider;
    }

    static EventHandler<AsyncSagaProcessingEvent>[] createInstances(SagaRepository sagaRepository, ParameterResolverFactory parameterResolverFactory, UnitOfWorkFactory unitOfWorkFactory, int processorCount, RingBuffer<AsyncSagaProcessingEvent> ringBuffer, AsyncAnnotatedSagaManager.SagaManagerStatus status, CorrelationDataProvider<? super EventMessage> correlationDataProvider) {
        AsyncSagaEventProcessor[] processors = new AsyncSagaEventProcessor[processorCount];
        for (int processorId = 0; processorId < processorCount; ++processorId) {
            processors[processorId] = new AsyncSagaEventProcessor(sagaRepository, parameterResolverFactory, processorCount, processorId, unitOfWorkFactory, ringBuffer, status, correlationDataProvider);
        }
        return processors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(AsyncSagaProcessingEvent entry, long sequence, boolean endOfBatch) throws Exception {
        Map<String, ?> correlationData = this.correlationDataProvider.correlationDataFor(entry.getPublishedEvent());
        CorrelationDataHolder.setCorrelationData(correlationData);
        try {
            this.doProcessEvent(entry, sequence, endOfBatch);
        }
        finally {
            CorrelationDataHolder.clear();
        }
    }

    private void doProcessEvent(AsyncSagaProcessingEvent entry, long sequence, boolean endOfBatch) throws Exception {
        boolean sagaInvoked = this.invokeExistingSagas(entry);
        switch (entry.getCreationHandler().getCreationPolicy()) {
            case ALWAYS: {
                AssociationValue associationValue = entry.getInitialAssociationValue();
                if (associationValue == null || !this.ownedByCurrentProcessor(entry.getNewSaga().getSagaIdentifier())) break;
                this.processNewSagaInstance(entry, associationValue);
                break;
            }
            case IF_NONE_FOUND: {
                boolean shouldCreate;
                AssociationValue associationValue = entry.getInitialAssociationValue();
                boolean bl = shouldCreate = associationValue != null && entry.waitForSagaCreationVote(sagaInvoked, this.processorCount, this.ownedByCurrentProcessor(entry.getNewSaga().getSagaIdentifier()));
                if (!shouldCreate) break;
                this.processNewSagaInstance(entry, associationValue);
            }
        }
        if (endOfBatch) {
            int attempts = 0;
            while (!this.persistProcessedSagas(attempts == 0) && this.isLastInBacklog(sequence) && this.status.isRunning()) {
                if (attempts == 0) {
                    logger.warn("Error committing Saga state to the repository. Starting retry procedure...");
                }
                if (++attempts > 1 && attempts < 5) {
                    logger.info("Waiting 100ms for next attempt");
                    Thread.sleep(100L);
                    continue;
                }
                if (attempts < 5) continue;
                logger.info("Waiting 2000ms for next attempt");
                long timeToStop = System.currentTimeMillis() + 2000L;
                while (this.inFuture(timeToStop) && this.isLastInBacklog(sequence) && this.status.isRunning()) {
                    Thread.sleep(100L);
                }
            }
        }
    }

    private boolean inFuture(long timestamp) {
        return System.currentTimeMillis() < timestamp;
    }

    private boolean invokeExistingSagas(AsyncSagaProcessingEvent entry) {
        boolean sagaInvoked = false;
        Class<? extends Saga> sagaType = entry.getSagaType();
        HashSet<String> sagaIds = new HashSet<String>();
        for (AssociationValue associationValue : entry.getAssociationValues()) {
            sagaIds.addAll(this.sagaRepository.find(sagaType, associationValue));
        }
        for (String sagaId : sagaIds) {
            if (!this.ownedByCurrentProcessor(sagaId) || this.processedSagas.containsKey(sagaId)) continue;
            this.ensureActiveUnitOfWork();
            Saga saga = this.sagaRepository.load(sagaId);
            if (this.parameterResolverFactory != null) {
                ((AbstractAnnotatedSaga)saga).registerParameterResolverFactory(this.parameterResolverFactory);
            }
            this.processedSagas.put(sagaId, saga);
        }
        for (Saga saga : this.processedSagas.values()) {
            if (!sagaType.isInstance(saga) || !saga.isActive() || !this.containsAny(saga.getAssociationValues(), entry.getAssociationValues())) continue;
            try {
                this.ensureActiveUnitOfWork();
                saga.handle(entry.getPublishedEvent());
            }
            catch (Exception e) {
                logger.error("Saga threw an exception while handling an Event. Ignoring and moving on...", (Throwable)e);
            }
            sagaInvoked = true;
        }
        return sagaInvoked;
    }

    private boolean containsAny(AssociationValues associationValues, Collection<AssociationValue> toFind) {
        for (AssociationValue valueToFind : toFind) {
            if (!associationValues.contains(valueToFind)) continue;
            return true;
        }
        return false;
    }

    private boolean persistProcessedSagas(boolean logExceptions) throws Exception {
        try {
            HashSet<String> committedSagas = new HashSet<String>();
            if (!this.processedSagas.isEmpty()) {
                this.ensureActiveUnitOfWork();
                for (Saga saga : this.processedSagas.values()) {
                    if (this.newlyCreatedSagas.containsKey(saga.getSagaIdentifier())) {
                        this.sagaRepository.add(saga);
                    } else {
                        this.sagaRepository.commit(saga);
                    }
                    committedSagas.add(saga.getSagaIdentifier());
                }
            }
            if (this.unitOfWork != null) {
                this.unitOfWork.commit();
                this.unitOfWork = null;
            }
            this.processedSagas.keySet().removeAll(committedSagas);
            this.newlyCreatedSagas.keySet().removeAll(committedSagas);
            return true;
        }
        catch (Exception e) {
            if (AxonNonTransientException.isCauseOf(e)) {
                throw e;
            }
            if (logExceptions) {
                logger.warn("Exception while attempting to persist Sagas", (Throwable)e);
            }
            return false;
        }
    }

    private boolean isLastInBacklog(long sequence) {
        return this.ringBuffer.getCursor() <= sequence;
    }

    private void processNewSagaInstance(AsyncSagaProcessingEvent entry, AssociationValue associationValue) {
        this.ensureActiveUnitOfWork();
        AbstractAnnotatedSaga newSaga = entry.getNewSaga();
        if (this.parameterResolverFactory != null) {
            newSaga.registerParameterResolverFactory(this.parameterResolverFactory);
        }
        newSaga.associateWith(associationValue);
        newSaga.handle(entry.getPublishedEvent());
        this.processedSagas.put(newSaga.getSagaIdentifier(), newSaga);
        this.newlyCreatedSagas.put(newSaga.getSagaIdentifier(), newSaga);
    }

    private void ensureActiveUnitOfWork() {
        if (this.unitOfWork == null || !this.unitOfWork.isStarted()) {
            this.unitOfWork = this.unitOfWorkFactory.createUnitOfWork();
        }
    }

    private boolean ownedByCurrentProcessor(String sagaIdentifier) {
        return this.processedSagas.containsKey(sagaIdentifier) || Math.abs(sagaIdentifier.hashCode() & Integer.MAX_VALUE) % this.processorCount == this.processorId;
    }

    public void onStart() {
    }

    public void onShutdown() {
        try {
            if (!this.persistProcessedSagas(true)) {
                logger.error("The processor was shut down while some Saga instances could not be persisted. As a result,persisted Saga state may not properly reflect the activity of those Sagas.");
            }
        }
        catch (Exception e) {
            logger.error("A fatal, non-transient exception occurred while attempting to persist Saga state", (Throwable)e);
        }
    }
}

