/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.saga.annotation;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Subscribable;
import org.axonframework.common.annotation.ClasspathParameterResolverFactory;
import org.axonframework.common.annotation.ParameterResolverFactory;
import org.axonframework.correlation.CorrelationDataProvider;
import org.axonframework.correlation.MultiCorrelationDataProvider;
import org.axonframework.correlation.SimpleCorrelationDataProvider;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventProcessingMonitor;
import org.axonframework.eventhandling.EventProcessingMonitorCollection;
import org.axonframework.eventhandling.EventProcessingMonitorSupport;
import org.axonframework.saga.GenericSagaFactory;
import org.axonframework.saga.SagaCreationPolicy;
import org.axonframework.saga.SagaFactory;
import org.axonframework.saga.SagaManager;
import org.axonframework.saga.SagaRepository;
import org.axonframework.saga.annotation.AbstractAnnotatedSaga;
import org.axonframework.saga.annotation.AsyncSagaEventProcessor;
import org.axonframework.saga.annotation.AsyncSagaProcessingEvent;
import org.axonframework.saga.annotation.SagaMethodMessageHandler;
import org.axonframework.saga.annotation.SagaMethodMessageHandlerInspector;
import org.axonframework.saga.repository.inmemory.InMemorySagaRepository;
import org.axonframework.unitofwork.DefaultUnitOfWorkFactory;
import org.axonframework.unitofwork.TransactionManager;
import org.axonframework.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncAnnotatedSagaManager
implements SagaManager,
Subscribable,
EventProcessingMonitorSupport {
    private static final WaitStrategy DEFAULT_WAIT_STRATEGY = new BlockingWaitStrategy();
    private WaitStrategy waitStrategy = DEFAULT_WAIT_STRATEGY;
    private static final int DEFAULT_BUFFER_SIZE = 512;
    private int bufferSize = 512;
    private static final int DEFAULT_PROCESSOR_COUNT = 1;
    private int processorCount = 1;
    private final EventBus eventBus;
    private final Class<? extends AbstractAnnotatedSaga>[] sagaTypes;
    private final ParameterResolverFactory parameterResolverFactory;
    private final SagaManagerStatus sagaManagerStatus = new SagaManagerStatus();
    private final EventProcessingMonitorCollection processingMonitors = new EventProcessingMonitorCollection();
    private volatile Disruptor<AsyncSagaProcessingEvent> disruptor;
    private boolean shutdownExecutorOnStop = true;
    private Executor executor = Executors.newCachedThreadPool();
    private SagaRepository sagaRepository = new InMemorySagaRepository();
    private volatile SagaFactory sagaFactory = new GenericSagaFactory();
    private UnitOfWorkFactory unitOfWorkFactory = new DefaultUnitOfWorkFactory();
    private long startTimeout = 5000L;
    private CorrelationDataProvider<? super EventMessage> correlationDataProvider = new SimpleCorrelationDataProvider(new String[0]);

    @Deprecated
    public AsyncAnnotatedSagaManager(EventBus eventBus, Class<? extends AbstractAnnotatedSaga> ... sagaTypes) {
        Assert.notNull(eventBus, "eventBus may not be null");
        this.eventBus = eventBus;
        this.sagaTypes = Arrays.copyOf(sagaTypes, sagaTypes.length);
        this.parameterResolverFactory = ClasspathParameterResolverFactory.forClass(sagaTypes.length == 0 ? AsyncAnnotatedSagaManager.class : sagaTypes[0]);
    }

    public AsyncAnnotatedSagaManager(Class<? extends AbstractAnnotatedSaga> ... sagaTypes) {
        this(ClasspathParameterResolverFactory.forClass(sagaTypes.length == 0 ? AsyncAnnotatedSagaManager.class : sagaTypes[0]), sagaTypes);
    }

    public AsyncAnnotatedSagaManager(ParameterResolverFactory parameterResolverFactory, Class<? extends AbstractAnnotatedSaga> ... sagaTypes) {
        this.parameterResolverFactory = parameterResolverFactory;
        this.eventBus = null;
        this.sagaTypes = Arrays.copyOf(sagaTypes, sagaTypes.length);
    }

    public synchronized void start() {
        if (this.disruptor == null) {
            this.sagaManagerStatus.setStatus(true);
            this.disruptor = new Disruptor((EventFactory)new AsyncSagaProcessingEvent.Factory(), this.bufferSize, (Executor)new ValidatingExecutor(this.executor, this.startTimeout), ProducerType.MULTI, this.waitStrategy);
            this.disruptor.handleExceptionsWith((ExceptionHandler)new LoggingExceptionHandler());
            this.disruptor.handleEventsWith(AsyncSagaEventProcessor.createInstances(this.sagaRepository, this.parameterResolverFactory, this.unitOfWorkFactory, this.processorCount, (RingBuffer<AsyncSagaProcessingEvent>)this.disruptor.getRingBuffer(), this.sagaManagerStatus, this.correlationDataProvider)).then(new EventHandler[]{new MonitorNotifier(this.processingMonitors)});
            this.disruptor.start();
        }
        this.subscribe();
    }

    public synchronized void stop() {
        this.sagaManagerStatus.setStatus(false);
        this.unsubscribe();
        if (this.disruptor != null) {
            this.disruptor.shutdown();
            if (this.shutdownExecutorOnStop && this.executor instanceof ExecutorService) {
                ((ExecutorService)this.executor).shutdown();
            }
        }
        this.disruptor = null;
    }

    @Override
    public void unsubscribe() {
        if (this.eventBus != null) {
            this.eventBus.unsubscribe(this);
        }
    }

    @Override
    public void subscribe() {
        if (this.eventBus != null) {
            this.eventBus.subscribe(this);
        }
    }

    @Override
    public void handle(EventMessage event) {
        if (this.disruptor != null) {
            for (Class<? extends AbstractAnnotatedSaga> sagaType : this.sagaTypes) {
                SagaMethodMessageHandlerInspector<? extends AbstractAnnotatedSaga> inspector = SagaMethodMessageHandlerInspector.getInstance(sagaType, this.parameterResolverFactory);
                List<SagaMethodMessageHandler> handlers = inspector.getMessageHandlers(event);
                if (handlers.isEmpty()) continue;
                AbstractAnnotatedSaga newSagaInstance = null;
                for (SagaMethodMessageHandler handler : handlers) {
                    if (newSagaInstance != null || handler.getCreationPolicy() == SagaCreationPolicy.NONE) continue;
                    newSagaInstance = this.sagaFactory.createSaga(inspector.getSagaType());
                }
                this.disruptor.publishEvent((EventTranslator)new SagaProcessingEventTranslator(event, inspector, handlers, newSagaInstance));
            }
        }
    }

    @Override
    public Class<?> getTargetType() {
        return this.sagaTypes.length > 0 ? this.sagaTypes[0] : Void.TYPE;
    }

    @Override
    public void subscribeEventProcessingMonitor(EventProcessingMonitor monitor) {
        this.processingMonitors.subscribeEventProcessingMonitor(monitor);
    }

    @Override
    public void unsubscribeEventProcessingMonitor(EventProcessingMonitor monitor) {
        this.processingMonitors.unsubscribeEventProcessingMonitor(monitor);
    }

    public synchronized void setExecutor(Executor executor) {
        Assert.state(this.disruptor == null, "Cannot set executor after SagaManager has started");
        this.shutdownExecutorOnStop = false;
        this.executor = executor;
    }

    public synchronized void setSagaRepository(SagaRepository sagaRepository) {
        Assert.state(this.disruptor == null, "Cannot set sagaRepository when SagaManager has started");
        this.sagaRepository = sagaRepository;
    }

    public synchronized void setSagaFactory(SagaFactory sagaFactory) {
        Assert.state(this.disruptor == null, "Cannot set sagaFactory when SagaManager has started");
        this.sagaFactory = sagaFactory;
    }

    public synchronized void setTransactionManager(TransactionManager transactionManager) {
        Assert.state(this.disruptor == null, "Cannot set transactionManager when SagaManager has started");
        this.unitOfWorkFactory = new DefaultUnitOfWorkFactory(transactionManager);
    }

    public synchronized void setProcessorCount(int processorCount) {
        Assert.state(this.disruptor == null, "Cannot set processorCount when SagaManager has started");
        this.processorCount = processorCount;
    }

    public synchronized void setStartTimeout(long startTimeout) {
        this.startTimeout = startTimeout;
    }

    public synchronized void setBufferSize(int bufferSize) {
        Assert.isTrue(Integer.bitCount(bufferSize) == 1, "The buffer size must be a power of 2");
        Assert.state(this.disruptor == null, "Cannot set bufferSize when SagaManager has started");
        this.bufferSize = bufferSize;
    }

    public synchronized void setWaitStrategy(WaitStrategy waitStrategy) {
        Assert.state(this.disruptor == null, "Cannot set waitStrategy when SagaManager has started");
        this.waitStrategy = waitStrategy;
    }

    public synchronized void setCorrelationDataProvider(CorrelationDataProvider<? super EventMessage> correlationDataProvider) {
        this.correlationDataProvider = correlationDataProvider;
    }

    public synchronized void setCorrelationDataProviders(List<? extends CorrelationDataProvider<? super EventMessage>> correlationDataProviders) {
        this.correlationDataProvider = new MultiCorrelationDataProvider<EventMessage>(correlationDataProviders);
    }

    private static class MonitorNotifier
    implements EventHandler<AsyncSagaProcessingEvent> {
        private final EventProcessingMonitor monitor;
        private final List<EventMessage> processedMessages = new ArrayList<EventMessage>();

        public MonitorNotifier(EventProcessingMonitor monitor) {
            this.monitor = monitor;
        }

        public void onEvent(AsyncSagaProcessingEvent event, long sequence, boolean endOfBatch) throws Exception {
            this.processedMessages.add(event.getPublishedEvent());
            if (endOfBatch) {
                this.monitor.onEventProcessingCompleted(this.processedMessages);
                this.processedMessages.clear();
            }
        }
    }

    private static class StartDetectingRunnable
    implements Runnable {
        private final Runnable delegate;
        private final CountDownLatch cdl = new CountDownLatch(1);

        public StartDetectingRunnable(Runnable command) {
            this.delegate = command;
        }

        @Override
        public void run() {
            this.cdl.countDown();
            this.delegate.run();
        }

        public boolean awaitStarted(long timeout, TimeUnit unit) throws InterruptedException {
            return this.cdl.await(timeout, unit);
        }
    }

    private static class ValidatingExecutor
    implements Executor {
        private final Executor delegate;
        private final long timeoutMillis;

        public ValidatingExecutor(Executor executor, long timeoutMillis) {
            this.delegate = executor;
            this.timeoutMillis = timeoutMillis;
        }

        @Override
        public void execute(Runnable command) {
            StartDetectingRunnable task = new StartDetectingRunnable(command);
            this.delegate.execute(task);
            try {
                if (!task.awaitStarted(this.timeoutMillis, TimeUnit.MILLISECONDS)) {
                    throw new AxonConfigurationException("It seems that the given Executor is not providing a thread for the AsyncSagaManager. Ensure that the corePoolSize is larger than the processor count.");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class SagaManagerStatus {
        private volatile boolean isRunning;

        SagaManagerStatus() {
        }

        private void setStatus(boolean running) {
            this.isRunning = running;
        }

        public boolean isRunning() {
            return this.isRunning;
        }
    }

    private static final class LoggingExceptionHandler
    implements ExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(LoggingExceptionHandler.class);

        private LoggingExceptionHandler() {
        }

        public void handleEventException(Throwable ex, long sequence, Object event) {
            logger.warn("A fatal exception occurred while processing an Event for a Saga. Processing will continue with the next Event", ex);
        }

        public void handleOnStartException(Throwable ex) {
            logger.warn("An exception occurred while starting the AsyncAnnotatedSagaManager.", ex);
        }

        public void handleOnShutdownException(Throwable ex) {
            logger.warn("An exception occurred while shutting down the AsyncAnnotatedSagaManager.", ex);
        }
    }

    private static final class SagaProcessingEventTranslator
    implements EventTranslator<AsyncSagaProcessingEvent> {
        private final EventMessage event;
        private final SagaMethodMessageHandlerInspector annotationInspector;
        private final List<SagaMethodMessageHandler> handlers;
        private final AbstractAnnotatedSaga newSagaInstance;

        private SagaProcessingEventTranslator(EventMessage event, SagaMethodMessageHandlerInspector annotationInspector, List<SagaMethodMessageHandler> handlers, AbstractAnnotatedSaga newSagaInstance) {
            this.event = event;
            this.annotationInspector = annotationInspector;
            this.handlers = handlers;
            this.newSagaInstance = newSagaInstance;
        }

        public void translateTo(AsyncSagaProcessingEvent entry, long sequence) {
            entry.reset(this.event, this.annotationInspector.getSagaType(), this.handlers, this.newSagaInstance);
        }
    }
}

