/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.axonframework.cache.Cache;
import org.axonframework.commandhandling.disruptor.CommandHandlingEntry;
import org.axonframework.commandhandling.disruptor.DisruptorUnitOfWork;
import org.axonframework.common.Assert;
import org.axonframework.common.io.IOUtils;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.EventStore;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.repository.AggregateNotFoundException;
import org.axonframework.repository.ConflictingAggregateVersionException;
import org.axonframework.repository.Repository;
import org.axonframework.unitofwork.CurrentUnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandHandlerInvoker
implements EventHandler<CommandHandlingEntry>,
LifecycleAware {
    private static final Logger logger = LoggerFactory.getLogger(CommandHandlerInvoker.class);
    private static final ThreadLocal<CommandHandlerInvoker> CURRENT_INVOKER = new ThreadLocal();
    private static final Object PLACEHOLDER_VALUE = new Object();
    private final ConcurrentMap<String, DisruptorRepository> repositories = new ConcurrentHashMap<String, DisruptorRepository>();
    private final Cache cache;
    private final int segmentId;
    private final EventStore eventStore;

    public CommandHandlerInvoker(EventStore eventStore, Cache cache, int segmentId) {
        this.eventStore = eventStore;
        this.cache = cache;
        this.segmentId = segmentId;
    }

    public static <T extends EventSourcedAggregateRoot> DisruptorRepository<T> getRepository(String typeIdentifier) {
        CommandHandlerInvoker invoker = CURRENT_INVOKER.get();
        Assert.state(invoker != null, "The repositories of a DisruptorCommandBus are only available in the invoker thread");
        return (DisruptorRepository)invoker.repositories.get(typeIdentifier);
    }

    public void onEvent(CommandHandlingEntry entry, long sequence, boolean endOfBatch) throws Exception {
        if (entry.isRecoverEntry()) {
            this.removeEntry(entry.getAggregateIdentifier());
        } else if (entry.getInvokerId() == this.segmentId) {
            DisruptorUnitOfWork unitOfWork = entry.getUnitOfWork();
            unitOfWork.start();
            try {
                Object result = entry.getInvocationInterceptorChain().proceed(entry.getCommand());
                entry.setResult(result);
                unitOfWork.commit();
            }
            catch (Throwable throwable) {
                entry.setExceptionResult(throwable);
                unitOfWork.rollback(throwable);
            }
        }
    }

    public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, EventStreamDecorator decorator) {
        String typeIdentifier = aggregateFactory.getTypeIdentifier();
        if (!this.repositories.containsKey(typeIdentifier)) {
            DisruptorRepository repository = new DisruptorRepository(aggregateFactory, this.cache, this.eventStore, decorator);
            this.repositories.putIfAbsent(typeIdentifier, repository);
        }
        return (Repository)this.repositories.get(typeIdentifier);
    }

    private void removeEntry(Object aggregateIdentifier) {
        for (DisruptorRepository repository : this.repositories.values()) {
            repository.removeFromCache(aggregateIdentifier);
        }
        this.cache.remove(aggregateIdentifier);
    }

    public void onStart() {
        CURRENT_INVOKER.set(this);
    }

    public void onShutdown() {
        CURRENT_INVOKER.remove();
    }

    static final class DisruptorRepository<T extends EventSourcedAggregateRoot>
    implements Repository<T> {
        private final EventStore eventStore;
        private final EventStreamDecorator decorator;
        private final AggregateFactory<T> aggregateFactory;
        private final Map<T, Object> firstLevelCache = new WeakHashMap<T, Object>();
        private final String typeIdentifier;
        private final Cache cache;

        private DisruptorRepository(AggregateFactory<T> aggregateFactory, Cache cache, EventStore eventStore, EventStreamDecorator decorator) {
            this.aggregateFactory = aggregateFactory;
            this.cache = cache;
            this.eventStore = eventStore;
            this.decorator = decorator;
            this.typeIdentifier = this.aggregateFactory.getTypeIdentifier();
        }

        @Override
        public T load(Object aggregateIdentifier, Long expectedVersion) {
            Object aggregate = this.load(aggregateIdentifier);
            if (expectedVersion != null && aggregate.getVersion() > expectedVersion) {
                throw new ConflictingAggregateVersionException(aggregateIdentifier, expectedVersion, aggregate.getVersion());
            }
            return (T)aggregate;
        }

        @Override
        public T load(Object aggregateIdentifier) {
            Object cachedItem;
            EventSourcedAggregateRoot<Object> aggregateRoot = null;
            for (EventSourcedAggregateRoot cachedAggregate : this.firstLevelCache.keySet()) {
                if (!aggregateIdentifier.equals(cachedAggregate.getIdentifier())) continue;
                logger.debug("Aggregate {} found in first level cache", aggregateIdentifier);
                aggregateRoot = cachedAggregate;
            }
            if (aggregateRoot == null && (cachedItem = this.cache.get(aggregateIdentifier)) != null && this.aggregateFactory.getAggregateType().isInstance(cachedItem)) {
                aggregateRoot = (EventSourcedAggregateRoot)this.aggregateFactory.getAggregateType().cast(cachedItem);
            }
            if (aggregateRoot == null) {
                logger.debug("Aggregate {} not in first level cache, loading fresh one from Event Store", aggregateIdentifier);
                DomainEventStream events = null;
                try {
                    events = this.decorator.decorateForRead(this.typeIdentifier, aggregateIdentifier, this.eventStore.readEvents(this.typeIdentifier, aggregateIdentifier));
                    if (events.hasNext()) {
                        aggregateRoot = this.aggregateFactory.createAggregate(aggregateIdentifier, events.peek());
                        aggregateRoot.initializeState(events);
                    }
                }
                catch (EventStreamNotFoundException e) {
                    try {
                        throw new AggregateNotFoundException(aggregateIdentifier, "Aggregate not found. Possibly involves an aggregate being created, or a command that was executed against an aggregate that did not yet finish the creation process. It will be rescheduled for publication when it attempts to load an aggregate", e);
                    }
                    catch (Throwable throwable) {
                        IOUtils.closeQuietlyIfCloseable(events);
                        throw throwable;
                    }
                }
                IOUtils.closeQuietlyIfCloseable(events);
                this.firstLevelCache.put(aggregateRoot, PLACEHOLDER_VALUE);
                this.cache.put(aggregateIdentifier, aggregateRoot);
            }
            if (aggregateRoot != null) {
                DisruptorUnitOfWork unitOfWork = (DisruptorUnitOfWork)CurrentUnitOfWork.get();
                unitOfWork.setAggregateType(this.typeIdentifier);
                unitOfWork.setEventStreamDecorator(this.decorator);
                unitOfWork.registerAggregate(aggregateRoot, null, null);
            }
            return (T)aggregateRoot;
        }

        @Override
        public void add(T aggregate) {
            DisruptorUnitOfWork unitOfWork = (DisruptorUnitOfWork)CurrentUnitOfWork.get();
            unitOfWork.setEventStreamDecorator(this.decorator);
            unitOfWork.setAggregateType(this.typeIdentifier);
            unitOfWork.registerAggregate(aggregate, (EventBus)null, null);
            this.firstLevelCache.put(aggregate, PLACEHOLDER_VALUE);
            this.cache.put(aggregate.getIdentifier(), aggregate);
        }

        private void removeFromCache(Object aggregateIdentifier) {
            for (EventSourcedAggregateRoot cachedAggregate : this.firstLevelCache.keySet()) {
                if (!aggregateIdentifier.equals(cachedAggregate.getIdentifier())) continue;
                this.firstLevelCache.remove(cachedAggregate);
                logger.debug("Aggregate {} removed from first level cache for recovery purposes.", aggregateIdentifier);
                return;
            }
        }
    }
}

