/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandDispatchInterceptor;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandHandlerInterceptor;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.disruptor.BlacklistDetectingCallback;
import org.axonframework.commandhandling.disruptor.CommandHandlerInvoker;
import org.axonframework.commandhandling.disruptor.CommandHandlingEntry;
import org.axonframework.commandhandling.disruptor.DisruptorConfiguration;
import org.axonframework.commandhandling.disruptor.EventPublisher;
import org.axonframework.commandhandling.disruptor.SerializerHandler;
import org.axonframework.commandhandling.interceptors.SerializationOptimizingInterceptor;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.EventStore;
import org.axonframework.repository.Repository;
import org.axonframework.serializer.Serializer;
import org.axonframework.unitofwork.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorCommandBus
implements CommandBus {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorCommandBus.class);
    private static final ThreadGroup DISRUPTOR_THREAD_GROUP = new ThreadGroup("DisruptorCommandBus");
    private final ConcurrentMap<String, CommandHandler<?>> commandHandlers = new ConcurrentHashMap();
    private final Disruptor<CommandHandlingEntry> disruptor;
    private final CommandHandlerInvoker[] commandHandlerInvokers;
    private final List<CommandDispatchInterceptor> dispatchInterceptors;
    private final List<CommandHandlerInterceptor> invokerInterceptors;
    private final List<CommandHandlerInterceptor> publisherInterceptors;
    private final ExecutorService executorService;
    private final boolean rescheduleOnCorruptState;
    private volatile boolean started = true;
    private volatile boolean disruptorShutDown = false;
    private final long coolingDownPeriod;
    private final CommandTargetResolver commandTargetResolver;
    private final int publisherCount;
    private final int serializerCount;
    private final CommandCallback<Object> failureLoggingCallback = new FailureLoggingCommandCallback();

    public DisruptorCommandBus(EventStore eventStore, EventBus eventBus) {
        this(eventStore, eventBus, new DisruptorConfiguration());
    }

    public DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorConfiguration configuration) {
        Assert.notNull(eventStore, "eventStore may not be null");
        Assert.notNull(eventBus, "eventBus may not be null");
        Assert.notNull(configuration, "configuration may not be null");
        Executor executor = configuration.getExecutor();
        if (executor == null) {
            this.executorService = Executors.newCachedThreadPool(new AxonThreadFactory(DISRUPTOR_THREAD_GROUP));
            executor = this.executorService;
        } else {
            this.executorService = null;
        }
        this.rescheduleOnCorruptState = configuration.getRescheduleCommandsOnCorruptState();
        this.invokerInterceptors = new ArrayList<CommandHandlerInterceptor>(configuration.getInvokerInterceptors());
        this.publisherInterceptors = new ArrayList<CommandHandlerInterceptor>(configuration.getPublisherInterceptors());
        this.dispatchInterceptors = new ArrayList<CommandDispatchInterceptor>(configuration.getDispatchInterceptors());
        TransactionManager transactionManager = configuration.getTransactionManager();
        this.disruptor = new Disruptor((EventFactory)new CommandHandlingEntry.Factory(configuration.getTransactionManager() != null), configuration.getBufferSize(), executor, configuration.getProducerType(), configuration.getWaitStrategy());
        this.commandTargetResolver = configuration.getCommandTargetResolver();
        this.commandHandlerInvokers = this.initializeInvokerThreads(eventStore, configuration);
        EventHandler[] serializerThreads = this.initializeSerializerThreads(configuration);
        this.serializerCount = serializerThreads.length;
        EventHandler[] publishers = this.initializePublisherThreads(eventStore, eventBus, configuration, executor, transactionManager);
        this.publisherCount = publishers.length;
        this.disruptor.handleExceptionsWith((com.lmax.disruptor.ExceptionHandler)new ExceptionHandler());
        EventHandlerGroup eventHandlerGroup = this.disruptor.handleEventsWith((EventHandler[])this.commandHandlerInvokers);
        if (serializerThreads.length > 0) {
            eventHandlerGroup = eventHandlerGroup.then(serializerThreads);
            this.invokerInterceptors.add(new SerializationOptimizingInterceptor());
        }
        eventHandlerGroup.then(publishers);
        this.coolingDownPeriod = configuration.getCoolingDownPeriod();
        this.disruptor.start();
    }

    private EventPublisher[] initializePublisherThreads(EventStore eventStore, EventBus eventBus, DisruptorConfiguration configuration, Executor executor, TransactionManager transactionManager) {
        EventPublisher[] publishers = new EventPublisher[configuration.getPublisherThreadCount()];
        for (int t = 0; t < publishers.length; ++t) {
            publishers[t] = new EventPublisher(eventStore, eventBus, executor, transactionManager, configuration.getRollbackConfiguration(), t);
        }
        return publishers;
    }

    private SerializerHandler[] initializeSerializerThreads(DisruptorConfiguration configuration) {
        if (!configuration.isPreSerializationConfigured()) {
            return new SerializerHandler[0];
        }
        Serializer serializer = configuration.getSerializer();
        SerializerHandler[] serializerThreads = new SerializerHandler[configuration.getSerializerThreadCount()];
        for (int t = 0; t < serializerThreads.length; ++t) {
            serializerThreads[t] = new SerializerHandler(serializer, t, configuration.getSerializedRepresentation());
        }
        return serializerThreads;
    }

    private CommandHandlerInvoker[] initializeInvokerThreads(EventStore eventStore, DisruptorConfiguration configuration) {
        CommandHandlerInvoker[] invokers = new CommandHandlerInvoker[configuration.getInvokerThreadCount()];
        for (int t = 0; t < invokers.length; ++t) {
            invokers[t] = new CommandHandlerInvoker(eventStore, configuration.getCache(), t);
        }
        return invokers;
    }

    @Override
    public void dispatch(CommandMessage<?> command) {
        this.dispatch(command, this.failureLoggingCallback);
    }

    @Override
    public <R> void dispatch(CommandMessage<?> command, CommandCallback<R> callback) {
        Assert.state(this.started, "CommandBus has been shut down. It is not accepting any Commands");
        CommandMessage<?> commandToDispatch = command;
        for (CommandDispatchInterceptor interceptor : this.dispatchInterceptors) {
            commandToDispatch = interceptor.handle(commandToDispatch);
        }
        this.doDispatch(commandToDispatch, callback);
    }

    public <R> void doDispatch(CommandMessage command, CommandCallback<R> callback) {
        Object aggregateIdentifier;
        Assert.state(!this.disruptorShutDown, "Disruptor has been shut down. Cannot dispatch or re-dispatch commands");
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        int invokerSegment = 0;
        int publisherSegment = 0;
        int serializerSegment = 0;
        if ((this.commandHandlerInvokers.length > 1 || this.publisherCount > 1 || this.serializerCount > 1) && (aggregateIdentifier = this.commandTargetResolver.resolveTarget(command).getIdentifier()) != null) {
            int idHash = aggregateIdentifier.hashCode() & Integer.MAX_VALUE;
            if (this.commandHandlerInvokers.length > 1) {
                invokerSegment = idHash % this.commandHandlerInvokers.length;
            }
            if (this.serializerCount > 1) {
                serializerSegment = idHash % this.serializerCount;
            }
            if (this.publisherCount > 1) {
                publisherSegment = idHash % this.publisherCount;
            }
        }
        long sequence = ringBuffer.next();
        CommandHandlingEntry event = (CommandHandlingEntry)ringBuffer.get(sequence);
        event.reset(command, (CommandHandler)this.commandHandlers.get(command.getCommandName()), invokerSegment, publisherSegment, serializerSegment, new BlacklistDetectingCallback<R>(callback, command, (RingBuffer<CommandHandlingEntry>)this.disruptor.getRingBuffer(), this, this.rescheduleOnCorruptState), this.invokerInterceptors, this.publisherInterceptors);
        ringBuffer.publish(sequence);
    }

    public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory) {
        return this.createRepository(aggregateFactory, NoOpEventStreamDecorator.INSTANCE);
    }

    public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, EventStreamDecorator decorator) {
        for (CommandHandlerInvoker invoker : this.commandHandlerInvokers) {
            invoker.createRepository(aggregateFactory, decorator);
        }
        return new DisruptorRepository(aggregateFactory.getTypeIdentifier());
    }

    @Override
    public <C> void subscribe(String commandName, CommandHandler<? super C> handler) {
        this.commandHandlers.put(commandName, handler);
    }

    @Override
    public <C> boolean unsubscribe(String commandName, CommandHandler<? super C> handler) {
        return this.commandHandlers.remove(commandName, handler);
    }

    public void stop() {
        if (!this.started) {
            return;
        }
        this.started = false;
        long lastChangeDetected = System.currentTimeMillis();
        long lastKnownCursor = this.disruptor.getRingBuffer().getCursor();
        while (System.currentTimeMillis() - lastChangeDetected < this.coolingDownPeriod && !Thread.interrupted()) {
            if (this.disruptor.getRingBuffer().getCursor() == lastKnownCursor) continue;
            lastChangeDetected = System.currentTimeMillis();
            lastKnownCursor = this.disruptor.getRingBuffer().getCursor();
        }
        this.disruptorShutDown = true;
        this.disruptor.shutdown();
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    private static class NoOpEventStreamDecorator
    implements EventStreamDecorator {
        public static final EventStreamDecorator INSTANCE = new NoOpEventStreamDecorator();

        private NoOpEventStreamDecorator() {
        }

        @Override
        public DomainEventStream decorateForRead(String aggregateType, Object aggregateIdentifier, DomainEventStream eventStream) {
            return eventStream;
        }

        @Override
        public DomainEventStream decorateForAppend(String aggregateType, EventSourcedAggregateRoot aggregate, DomainEventStream eventStream) {
            return eventStream;
        }
    }

    private static class DisruptorRepository<T extends EventSourcedAggregateRoot>
    implements Repository<T> {
        private final String typeIdentifier;

        public DisruptorRepository(String typeIdentifier) {
            this.typeIdentifier = typeIdentifier;
        }

        @Override
        public T load(Object aggregateIdentifier, Long expectedVersion) {
            return (T)CommandHandlerInvoker.getRepository(this.typeIdentifier).load(aggregateIdentifier, expectedVersion);
        }

        @Override
        public T load(Object aggregateIdentifier) {
            return (T)CommandHandlerInvoker.getRepository(this.typeIdentifier).load(aggregateIdentifier);
        }

        @Override
        public void add(T aggregate) {
            CommandHandlerInvoker.getRepository(this.typeIdentifier).add(aggregate);
        }
    }

    private class ExceptionHandler
    implements com.lmax.disruptor.ExceptionHandler {
        private ExceptionHandler() {
        }

        public void handleEventException(Throwable ex, long sequence, Object event) {
            logger.error("Exception occurred while processing a {}.", (Object)((CommandHandlingEntry)event).getCommand().getPayloadType().getSimpleName(), (Object)ex);
        }

        public void handleOnStartException(Throwable ex) {
            logger.error("Failed to start the DisruptorCommandBus.", ex);
            DisruptorCommandBus.this.disruptor.shutdown();
        }

        public void handleOnShutdownException(Throwable ex) {
            logger.error("Error while shutting down the DisruptorCommandBus", ex);
        }
    }

    private static class FailureLoggingCommandCallback
    implements CommandCallback<Object> {
        private FailureLoggingCommandCallback() {
        }

        @Override
        public void onSuccess(Object result) {
        }

        @Override
        public void onFailure(Throwable cause) {
            logger.info("An error occurred while handling a command.", cause);
        }
    }
}

