/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.replay;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.axonframework.common.DirectExecutor;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.ClusterMetaData;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.EventProcessingMonitor;
import org.axonframework.eventhandling.replay.IncomingMessageHandler;
import org.axonframework.eventhandling.replay.ReplayAware;
import org.axonframework.eventhandling.replay.ReplayFailedException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.unitofwork.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplayingCluster
implements Cluster {
    private static final Logger logger = LoggerFactory.getLogger(ReplayingCluster.class);
    public static final String AFTER_REPLAY_TIMEOUT = "afterReplayTimeout";
    private final Cluster delegate;
    private final EventStoreManagement replayingEventStore;
    private final TransactionManager transactionManager;
    private final int commitThreshold;
    private final IncomingMessageHandler incomingMessageHandler;
    private final Set<ReplayAware> replayAwareListeners = new CopyOnWriteArraySet<ReplayAware>();
    private volatile Status status = Status.LIVE;
    private final EventProcessingListeners eventHandlingListeners = new EventProcessingListeners();

    public ReplayingCluster(Cluster delegate, EventStoreManagement eventStore, TransactionManager transactionManager, int commitThreshold, IncomingMessageHandler incomingMessageHandler) {
        this.delegate = delegate;
        this.replayingEventStore = eventStore;
        this.transactionManager = transactionManager;
        this.commitThreshold = commitThreshold;
        this.incomingMessageHandler = incomingMessageHandler;
        this.delegate.subscribeEventProcessingMonitor(this.eventHandlingListeners);
    }

    public CriteriaBuilder newCriteriaBuilder() {
        return this.replayingEventStore.newCriteriaBuilder();
    }

    public void startReplay() {
        this.startReplay((Criteria)null);
    }

    public void startReplay(Criteria criteria) {
        try {
            this.startReplay(DirectExecutor.INSTANCE, criteria).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplayFailedException("Replay failed because it was interrupted", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof ReplayFailedException) {
                throw (ReplayFailedException)e.getCause();
            }
            throw new ReplayFailedException("Replay failed due to an exception.", e.getCause());
        }
    }

    public Future<Void> startReplay(Executor executor) {
        return this.startReplay(executor, null);
    }

    public Future<Void> startReplay(Executor executor, Criteria criteria) {
        FutureTask<Object> task = new FutureTask<Object>(new ReplayEventsTask(criteria), null);
        executor.execute(task);
        return task;
    }

    public boolean isInReplayMode() {
        return this.status != Status.LIVE;
    }

    @Override
    public String getName() {
        return this.delegate.getName();
    }

    @Override
    public void publish(EventMessage ... events) {
        if (this.status == Status.LIVE) {
            this.delegate.publish(events);
        } else {
            logger.debug("Cluster is in replaying: sending message to process backlog");
            List<EventMessage> acknowledgedMessages = this.incomingMessageHandler.onIncomingMessages(this.delegate, events);
            if (acknowledgedMessages != null && !acknowledgedMessages.isEmpty()) {
                this.eventHandlingListeners.onEventProcessingCompleted(acknowledgedMessages);
            }
        }
    }

    @Override
    public void subscribe(EventListener eventListener) {
        this.delegate.subscribe(eventListener);
        if (eventListener instanceof ReplayAware) {
            this.replayAwareListeners.add((ReplayAware)((Object)eventListener));
        }
    }

    @Override
    public void unsubscribe(EventListener eventListener) {
        if (eventListener instanceof ReplayAware) {
            this.replayAwareListeners.remove(eventListener);
        }
        this.delegate.unsubscribe(eventListener);
    }

    @Override
    public Set<EventListener> getMembers() {
        return this.delegate.getMembers();
    }

    @Override
    public ClusterMetaData getMetaData() {
        return this.delegate.getMetaData();
    }

    @Override
    public void subscribeEventProcessingMonitor(EventProcessingMonitor monitor) {
        this.eventHandlingListeners.delegates.add(monitor);
    }

    @Override
    public void unsubscribeEventProcessingMonitor(EventProcessingMonitor monitor) {
        this.eventHandlingListeners.delegates.remove(monitor);
    }

    public long getAfterReplayTimeout() {
        if (this.getMetaData().isPropertySet(AFTER_REPLAY_TIMEOUT)) {
            String timeout = this.getMetaData().getProperty(AFTER_REPLAY_TIMEOUT).toString();
            try {
                return Long.parseLong(timeout);
            }
            catch (NumberFormatException e) {
                logger.error("Not a number: '{}'. Reverting to default timeout of 5 seconds.", (Object)timeout);
            }
        }
        return 5000L;
    }

    private static enum Status {
        LIVE,
        REPLAYING,
        PROCESSING_BACKLOG;

    }

    private final class EventProcessingListeners
    implements EventProcessingMonitor {
        private final Set<EventProcessingMonitor> delegates = new CopyOnWriteArraySet<EventProcessingMonitor>();

        private EventProcessingListeners() {
        }

        @Override
        public void onEventProcessingCompleted(List<? extends EventMessage> eventMessages) {
            if (ReplayingCluster.this.status != Status.REPLAYING) {
                for (EventProcessingMonitor delegate : this.delegates) {
                    delegate.onEventProcessingCompleted(eventMessages);
                }
            }
        }

        @Override
        public void onEventProcessingFailed(List<? extends EventMessage> eventMessages, Throwable cause) {
            if (ReplayingCluster.this.status != Status.REPLAYING) {
                for (EventProcessingMonitor delegate : this.delegates) {
                    delegate.onEventProcessingFailed(eventMessages, cause);
                }
            }
        }
    }

    private static class LastEventMonitor
    implements EventProcessingMonitor {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition condition = this.lock.newCondition();
        private EventMessage lastProvidedMessage;
        private EventMessage lastProcessedMessage;

        private LastEventMonitor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onEventProcessingCompleted(List<? extends EventMessage> eventMessages) {
            this.lock.lock();
            try {
                this.lastProcessedMessage = eventMessages.get(eventMessages.size() - 1);
            }
            finally {
                if (this.lastProvidedMessage != null && this.lastProcessedMessage.getIdentifier().equals(this.lastProvidedMessage.getIdentifier())) {
                    this.condition.signalAll();
                }
                this.lock.unlock();
            }
        }

        @Override
        public void onEventProcessingFailed(List<? extends EventMessage> eventMessages, Throwable cause) {
            this.onEventProcessingCompleted(eventMessages);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForLastMessageProcessed(EventMessage lastProvidedMessage, long timeout) {
            this.lock.lock();
            try {
                this.lastProvidedMessage = lastProvidedMessage;
                if (this.lastProcessedMessage == null || !this.lastProcessedMessage.getIdentifier().equals(this.lastProvidedMessage.getIdentifier())) {
                    try {
                        this.condition.await(timeout, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private class ReplayEventsTask
    implements Runnable {
        private final Criteria criteria;

        public ReplayEventsTask(Criteria criteria) {
            this.criteria = criteria;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ReplayingCluster.this.incomingMessageHandler.prepareForReplay(ReplayingCluster.this.delegate);
            ReplayingCluster.this.status = Status.REPLAYING;
            logger.trace("Cluster set to replay mode");
            Object tx = ReplayingCluster.this.transactionManager.startTransaction();
            logger.trace("Started new transaction for event replay");
            LastEventMonitor monitor = new LastEventMonitor();
            ReplayingEventVisitor visitor = new ReplayingEventVisitor(tx);
            try {
                logger.trace("Notifying replay aware listeners 'beforeReplay'");
                for (ReplayAware replayAwareEventListener : ReplayingCluster.this.replayAwareListeners) {
                    replayAwareEventListener.beforeReplay();
                }
                ReplayingCluster.this.delegate.subscribeEventProcessingMonitor(monitor);
                if (this.criteria != null) {
                    logger.trace("Starting visiting events using criteria");
                    ReplayingCluster.this.replayingEventStore.visitEvents(this.criteria, visitor);
                } else {
                    logger.trace("Starting visiting events without criteria");
                    ReplayingCluster.this.replayingEventStore.visitEvents(visitor);
                }
                EventMessage lastMessage = visitor.getLastMessage();
                if (lastMessage != null) {
                    monitor.waitForLastMessageProcessed(lastMessage, ReplayingCluster.this.getAfterReplayTimeout());
                }
                logger.trace("Notifying replay aware listeners 'afterReplay'");
                for (ReplayAware replayAwareEventListener : ReplayingCluster.this.replayAwareListeners) {
                    replayAwareEventListener.afterReplay();
                }
                ReplayingCluster.this.status = Status.PROCESSING_BACKLOG;
                logger.trace("Processing backlog of messages");
                ReplayingCluster.this.incomingMessageHandler.processBacklog(ReplayingCluster.this.delegate);
                logger.trace("Committing transaction");
                ReplayingCluster.this.transactionManager.commitTransaction(visitor.getTransaction());
            }
            catch (Throwable t) {
                try {
                    logger.error("Replay failed due to an exception.", t);
                    ReplayingCluster.this.incomingMessageHandler.onReplayFailed(ReplayingCluster.this.delegate, t);
                    logger.trace("Notifying replay aware listeners 'replayFailed'");
                    for (ReplayAware replayAwareEventListener : ReplayingCluster.this.replayAwareListeners) {
                        replayAwareEventListener.onReplayFailed(t);
                    }
                }
                finally {
                    logger.trace("Rolling back replay transaction");
                    ReplayingCluster.this.transactionManager.rollbackTransaction(visitor.getTransaction());
                }
                throw new ReplayFailedException("Replay failed due to an exception.", t);
            }
            finally {
                logger.info("Replay ended. Switching back to live mode");
                ReplayingCluster.this.status = Status.LIVE;
            }
        }

        private class ReplayingEventVisitor
        implements EventVisitor {
            private int eventCounter = 0;
            private Object currentTransaction;
            private EventMessage lastMessage;

            public ReplayingEventVisitor(Object tx) {
                this.currentTransaction = tx;
            }

            @Override
            public void doWithEvent(DomainEventMessage domainEvent) {
                if (ReplayingCluster.this.commitThreshold > 0 && ++this.eventCounter > ReplayingCluster.this.commitThreshold) {
                    this.eventCounter = 0;
                    logger.trace("Replay batch size reached; committing Replay Transaction");
                    ReplayingCluster.this.transactionManager.commitTransaction(this.currentTransaction);
                    logger.trace("Starting new Replay Transaction for next batch");
                    this.currentTransaction = ReplayingCluster.this.transactionManager.startTransaction();
                }
                ReplayingCluster.this.delegate.publish(domainEvent);
                List<EventMessage> releasedMessages = ReplayingCluster.this.incomingMessageHandler.releaseMessage(ReplayingCluster.this.delegate, domainEvent);
                if (releasedMessages != null && !releasedMessages.isEmpty()) {
                    ReplayingCluster.this.eventHandlingListeners.onEventProcessingCompleted(releasedMessages);
                }
                this.lastMessage = domainEvent;
            }

            public EventMessage getLastMessage() {
                return this.lastMessage;
            }

            public Object getTransaction() {
                return this.currentTransaction;
            }
        }
    }
}

