/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.EventHandler;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Executor;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.RollbackConfiguration;
import org.axonframework.commandhandling.disruptor.AggregateBlacklistedException;
import org.axonframework.commandhandling.disruptor.AggregateStateCorruptedException;
import org.axonframework.commandhandling.disruptor.CommandHandlingEntry;
import org.axonframework.commandhandling.disruptor.DisruptorCommandBus;
import org.axonframework.commandhandling.disruptor.DisruptorUnitOfWork;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventstore.EventStore;
import org.axonframework.repository.AggregateNotFoundException;
import org.axonframework.unitofwork.CurrentUnitOfWork;
import org.axonframework.unitofwork.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventPublisher
implements EventHandler<CommandHandlingEntry> {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorCommandBus.class);
    private final EventStore eventStore;
    private final EventBus eventBus;
    private final Executor executor;
    private final RollbackConfiguration rollbackConfiguration;
    private final int segmentId;
    private final Set<Object> blackListedAggregates = new HashSet<Object>();
    private final Map<CommandMessage, Object> failedCreateCommands = new WeakHashMap<CommandMessage, Object>();
    private final TransactionManager transactionManager;

    public EventPublisher(EventStore eventStore, EventBus eventBus, Executor executor, TransactionManager transactionManager, RollbackConfiguration rollbackConfiguration, int segmentId) {
        this.eventStore = eventStore;
        this.eventBus = eventBus;
        this.executor = executor;
        this.transactionManager = transactionManager;
        this.rollbackConfiguration = rollbackConfiguration;
        this.segmentId = segmentId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(CommandHandlingEntry entry, long sequence, boolean endOfBatch) throws Exception {
        if (entry.isRecoverEntry()) {
            this.recoverAggregate(entry);
        } else if (entry.getPublisherId() == this.segmentId) {
            if (entry.getExceptionResult() instanceof AggregateNotFoundException && this.failedCreateCommands.remove(entry.getCommand()) == null) {
                this.reschedule(entry);
            } else {
                DisruptorUnitOfWork unitOfWork = entry.getUnitOfWork();
                CurrentUnitOfWork.set(unitOfWork);
                try {
                    EventSourcedAggregateRoot aggregate = unitOfWork.getAggregate();
                    if (aggregate != null && this.blackListedAggregates.contains(aggregate.getIdentifier())) {
                        this.rejectExecution(entry, unitOfWork, entry.getAggregateIdentifier());
                    } else {
                        this.processPublication(entry, unitOfWork, aggregate);
                    }
                }
                finally {
                    CurrentUnitOfWork.clear(unitOfWork);
                }
            }
        }
    }

    private void reschedule(CommandHandlingEntry entry) {
        this.failedCreateCommands.put(entry.getCommand(), logger);
        this.executor.execute(new ReportResultTask<Object>(entry.getCallback(), null, new AggregateStateCorruptedException(entry.getAggregateIdentifier(), "Rescheduling command for execution. It was executed against a potentially recently created command")));
    }

    private void recoverAggregate(CommandHandlingEntry entry) {
        if (this.blackListedAggregates.remove(entry.getAggregateIdentifier())) {
            logger.info("Reset notification for {} received. The aggregate is removed from the blacklist", entry.getAggregateIdentifier());
        }
    }

    private void rejectExecution(CommandHandlingEntry entry, DisruptorUnitOfWork unitOfWork, Object aggregateIdentifier) {
        this.executor.execute(new ReportResultTask<Object>(entry.getCallback(), null, new AggregateStateCorruptedException(unitOfWork.getAggregate(), String.format("Aggregate %s has been blacklisted and will be ignored until its state has been recovered.", aggregateIdentifier))));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPublication(CommandHandlingEntry entry, DisruptorUnitOfWork unitOfWork, EventSourcedAggregateRoot aggregate) {
        this.invokeInterceptorChain(entry);
        Throwable exceptionResult = entry.getExceptionResult();
        try {
            exceptionResult = exceptionResult != null && this.rollbackConfiguration.rollBackOn(exceptionResult) ? this.performRollback(unitOfWork, entry.getAggregateIdentifier(), exceptionResult) : this.performCommit(unitOfWork, aggregate, exceptionResult);
        }
        finally {
            unitOfWork.onCleanup();
        }
        if (exceptionResult != null || entry.getCallback().hasDelegate()) {
            this.executor.execute(new ReportResultTask<Object>(entry.getCallback(), entry.getResult(), exceptionResult));
        }
    }

    private void invokeInterceptorChain(CommandHandlingEntry entry) {
        try {
            entry.setResult(entry.getPublisherInterceptorChain().proceed(entry.getCommand()));
        }
        catch (Throwable throwable) {
            entry.setExceptionResult(throwable);
        }
    }

    private Throwable performRollback(DisruptorUnitOfWork unitOfWork, Object aggregateIdentifier, Throwable exceptionResult) {
        unitOfWork.onRollback(exceptionResult);
        if (aggregateIdentifier != null) {
            exceptionResult = this.notifyBlacklisted(unitOfWork, aggregateIdentifier, exceptionResult);
        }
        return exceptionResult;
    }

    private Throwable performCommit(DisruptorUnitOfWork unitOfWork, EventSourcedAggregateRoot aggregate, Throwable exceptionResult) {
        unitOfWork.onPrepareCommit();
        Object transaction = null;
        try {
            if (exceptionResult != null && this.rollbackConfiguration.rollBackOn(exceptionResult)) {
                unitOfWork.rollback(exceptionResult);
            } else {
                if (this.transactionManager != null) {
                    transaction = this.transactionManager.startTransaction();
                }
                this.storeAndPublish(unitOfWork);
                if (transaction != null) {
                    unitOfWork.onPrepareTransactionCommit(transaction);
                    this.transactionManager.commitTransaction(transaction);
                }
                unitOfWork.onAfterCommit();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            if (transaction != null) {
                this.transactionManager.rollbackTransaction(transaction);
            }
            exceptionResult = this.notifyBlacklisted(unitOfWork, aggregate.getIdentifier(), e);
        }
        return exceptionResult;
    }

    private void storeAndPublish(DisruptorUnitOfWork unitOfWork) {
        DomainEventStream eventsToStore = unitOfWork.getEventsToStore();
        if (eventsToStore.hasNext()) {
            this.eventStore.appendEvents(unitOfWork.getAggregateType(), eventsToStore);
        }
        List<EventMessage> eventMessages = unitOfWork.getEventsToPublish();
        EventMessage[] eventsToPublish = eventMessages.toArray(new EventMessage[eventMessages.size()]);
        if (this.eventBus != null && eventsToPublish.length > 0) {
            this.eventBus.publish(eventsToPublish);
        }
    }

    private Throwable notifyBlacklisted(DisruptorUnitOfWork unitOfWork, Object aggregateIdentifier, Throwable cause) {
        this.blackListedAggregates.add(aggregateIdentifier);
        AggregateBlacklistedException exceptionResult = new AggregateBlacklistedException(aggregateIdentifier, String.format("Aggregate %s state corrupted. Blacklisting the aggregate until a reset message has been received", aggregateIdentifier), cause);
        unitOfWork.onRollback(exceptionResult);
        return exceptionResult;
    }

    private static class ReportResultTask<R>
    implements Runnable {
        private final CommandCallback<R> callback;
        private final R result;
        private final Throwable exceptionResult;

        public ReportResultTask(CommandCallback<R> callback, R result, Throwable exceptionResult) {
            this.callback = callback;
            this.result = result;
            this.exceptionResult = exceptionResult;
        }

        @Override
        public void run() {
            if (this.exceptionResult != null) {
                this.callback.onFailure(this.exceptionResult);
            } else {
                this.callback.onSuccess(this.result);
            }
        }
    }
}

