/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.async;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.MultiplexingEventProcessingMonitor;
import org.axonframework.eventhandling.async.ErrorHandler;
import org.axonframework.eventhandling.async.RetryPolicy;
import org.axonframework.unitofwork.UnitOfWork;
import org.axonframework.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(EventProcessor.class);
    private final ShutdownCallback shutDownCallback;
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final MultiplexingEventProcessingMonitor eventProcessingMonitor;
    private final Executor executor;
    private final ErrorHandler errorHandler;
    private final Deque<EventMessage<?>> eventQueue;
    private boolean isScheduled = false;
    private volatile boolean cleanedUp;
    private final Set<EventListener> listeners;
    private volatile long retryAfter = 0L;
    private final List<EventMessage> processedEvents = new ArrayList<EventMessage>();
    private final Object runnerMonitor = new Object();

    public EventProcessor(Executor executor, ShutdownCallback shutDownCallback, ErrorHandler errorHandler, UnitOfWorkFactory unitOfWorkFactory, Set<EventListener> eventListeners, MultiplexingEventProcessingMonitor eventProcessingMonitor) {
        this.unitOfWorkFactory = unitOfWorkFactory;
        this.eventProcessingMonitor = eventProcessingMonitor;
        this.eventQueue = new LinkedList();
        this.shutDownCallback = shutDownCallback;
        this.executor = executor;
        this.errorHandler = errorHandler;
        this.listeners = eventListeners;
    }

    public synchronized boolean scheduleEvent(EventMessage<?> event) {
        if (this.cleanedUp) {
            return false;
        }
        this.eventQueue.add(event);
        if (!this.isScheduled) {
            this.isScheduled = true;
            this.executor.execute(this);
        }
        return true;
    }

    private synchronized EventMessage<?> nextEvent() {
        return this.eventQueue.poll();
    }

    private synchronized boolean yield() {
        this.notifyProcessingHandlers();
        if (this.eventQueue.isEmpty()) {
            this.cleanUp();
        } else {
            try {
                if (this.retryAfter <= System.currentTimeMillis()) {
                    this.executor.execute(this);
                    logger.debug("Processing of event listener yielded.");
                } else {
                    long waitTimeRemaining = this.retryAfter - System.currentTimeMillis();
                    boolean executionScheduled = this.scheduleDelayedExecution(waitTimeRemaining);
                    if (!executionScheduled) {
                        logger.warn("The provided executor does not seem to support delayed execution. Scheduling for immediate processing and expecting processing to wait if scheduled to soon.");
                        this.executor.execute(this);
                    }
                }
            }
            catch (RejectedExecutionException e) {
                logger.info("Processing of event listener could not yield. Executor refused the task.");
                return false;
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitUntilAllowedStartingTime() {
        long waitTimeRemaining = this.retryAfter - System.currentTimeMillis();
        if (waitTimeRemaining > 0L) {
            try {
                logger.warn("Event processing started before delay expired. Forcing thread to sleep for {} millis.", (Object)waitTimeRemaining);
                Thread.sleep(waitTimeRemaining);
            }
            catch (InterruptedException e) {
                logger.warn("Thread was interrupted while waiting for retry. Scheduling for immediate retry.");
                Thread.currentThread().interrupt();
            }
            finally {
                this.retryAfter = 0L;
            }
        }
    }

    private boolean scheduleDelayedExecution(long waitTimeRemaining) {
        if (this.executor instanceof ScheduledExecutorService) {
            logger.debug("Executor supports delayed executing. Rescheduling for processing in {} millis", (Object)waitTimeRemaining);
            ((ScheduledExecutorService)this.executor).schedule(this, waitTimeRemaining, TimeUnit.MILLISECONDS);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Object object = this.runnerMonitor;
        synchronized (object) {
            boolean mayContinue = true;
            this.waitUntilAllowedStartingTime();
            int itemsAtStart = this.eventQueue.size();
            int processedItems = 0;
            while (mayContinue) {
                RetryPolicy result = this.processNextEntry();
                mayContinue = ++processedItems < itemsAtStart && !this.eventQueue.isEmpty() && !result.requiresRescheduleEvent() || !this.yield();
            }
            this.notifyProcessingHandlers();
        }
    }

    private void notifyProcessingHandlers() {
        if (!this.processedEvents.isEmpty()) {
            this.eventProcessingMonitor.onEventProcessingCompleted(this.processedEvents);
        }
        this.processedEvents.clear();
    }

    private RetryPolicy processNextEntry() {
        ProcessingResult processingResult;
        block11: {
            EventMessage<?> event = this.nextEvent();
            processingResult = ProcessingResult.REGULAR;
            if (event != null) {
                UnitOfWork uow = null;
                try {
                    uow = this.unitOfWorkFactory.createUnitOfWork();
                    processingResult = this.doHandle(event);
                    if (processingResult.requiresRollback()) {
                        uow.rollback();
                    } else {
                        uow.commit();
                    }
                    if (processingResult.requiresRescheduleEvent()) {
                        this.eventQueue.addFirst(event);
                    } else if (processingResult.isFailure()) {
                        this.notifyProcessingHandlers();
                        this.eventProcessingMonitor.onEventProcessingFailed(Arrays.asList(event), processingResult.getError());
                    } else {
                        this.processedEvents.add(event);
                    }
                    this.retryAfter = System.currentTimeMillis() + processingResult.waitTime();
                }
                catch (RuntimeException e) {
                    processingResult = new ProcessingResult(this.errorHandler.handleError(e, event, null), e);
                    if (processingResult.requiresRescheduleEvent()) {
                        this.eventQueue.addFirst(event);
                        this.retryAfter = System.currentTimeMillis() + processingResult.waitTime();
                    }
                    if (uow != null && uow.isStarted()) {
                        uow.rollback();
                    }
                    if (processingResult.requiresRescheduleEvent()) break block11;
                    this.notifyProcessingHandlers();
                    this.eventProcessingMonitor.onEventProcessingFailed(Collections.singletonList(event), e);
                }
            }
        }
        return processingResult;
    }

    protected ProcessingResult doHandle(EventMessage<?> event) {
        RuntimeException failure = null;
        this.eventProcessingMonitor.prepare(event);
        for (EventListener member : this.listeners) {
            try {
                this.eventProcessingMonitor.prepareForInvocation(event, member);
                member.handle(event);
            }
            catch (RuntimeException e) {
                RetryPolicy policy = this.errorHandler.handleError(e, event, member);
                if (policy.requiresRescheduleEvent() || policy.requiresRollback()) {
                    return new ProcessingResult(policy, e);
                }
                failure = e;
            }
        }
        return new ProcessingResult(RetryPolicy.proceed(), failure);
    }

    private synchronized void cleanUp() {
        this.isScheduled = false;
        this.cleanedUp = true;
        this.shutDownCallback.afterShutdown(this);
    }

    protected static class ProcessingResult
    extends RetryPolicy {
        public static final ProcessingResult REGULAR = new ProcessingResult(RetryPolicy.proceed(), null);
        private final RetryPolicy retryPolicy;
        private final Throwable error;

        public ProcessingResult(RetryPolicy retryPolicy, Throwable error) {
            this.retryPolicy = retryPolicy;
            this.error = error;
        }

        public boolean isFailure() {
            return this.error != null;
        }

        public Throwable getError() {
            return this.error;
        }

        @Override
        public long waitTime() {
            return this.retryPolicy.waitTime();
        }

        @Override
        public boolean requiresRescheduleEvent() {
            return this.retryPolicy.requiresRescheduleEvent();
        }

        @Override
        public boolean requiresRollback() {
            return this.retryPolicy.requiresRollback();
        }
    }

    public static interface ShutdownCallback {
        public void afterShutdown(EventProcessor var1);
    }
}

