/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.endpoint;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;

public abstract class AbstractPollingEndpoint
extends AbstractEndpoint
implements BeanClassLoaderAware {
    private volatile Executor taskExecutor = new SyncTaskExecutor();
    private volatile ErrorHandler errorHandler;
    private volatile Trigger trigger = new PeriodicTrigger(10L);
    private volatile List<Advice> adviceChain;
    private volatile ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
    private volatile ScheduledFuture<?> runningTask;
    private volatile Runnable poller;
    private volatile boolean initialized;
    private volatile long maxMessagesPerPoll = -1L;
    private final Object initializationMonitor = new Object();
    private volatile TransactionSynchronizationFactory transactionSynchronizationFactory;

    public AbstractPollingEndpoint() {
        this.setPhase(Integer.MAX_VALUE);
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor != null ? taskExecutor : new SyncTaskExecutor();
    }

    public void setTrigger(Trigger trigger) {
        this.trigger = trigger != null ? trigger : new PeriodicTrigger(10L);
    }

    public void setAdviceChain(List<Advice> adviceChain) {
        this.adviceChain = adviceChain;
    }

    public void setMaxMessagesPerPoll(long maxMessagesPerPoll) {
        this.maxMessagesPerPoll = maxMessagesPerPoll;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setBeanClassLoader(ClassLoader classLoader) {
        this.beanClassLoader = classLoader;
    }

    public void setTransactionSynchronizationFactory(TransactionSynchronizationFactory transactionSynchronizationFactory) {
        this.transactionSynchronizationFactory = transactionSynchronizationFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onInit() {
        Object object = this.initializationMonitor;
        synchronized (object) {
            if (this.initialized) {
                return;
            }
            Assert.notNull((Object)this.trigger, (String)"Trigger is required");
            Executor providedExecutor = this.taskExecutor;
            if (providedExecutor != null) {
                this.taskExecutor = providedExecutor;
            }
            if (this.taskExecutor != null && !(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
                if (this.errorHandler == null) {
                    Assert.notNull((Object)this.getBeanFactory(), (String)"BeanFactory is required");
                    this.errorHandler = new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(this.getBeanFactory()));
                }
                this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler);
            }
            try {
                this.poller = this.createPoller();
                this.initialized = true;
            }
            catch (Exception e) {
                throw new MessagingException("Failed to create Poller", (Throwable)e);
            }
        }
    }

    private Runnable createPoller() throws Exception {
        Callable<Boolean> pollingTask = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return AbstractPollingEndpoint.this.doPoll();
            }
        };
        List<Advice> adviceChain = this.adviceChain;
        if (!CollectionUtils.isEmpty(adviceChain)) {
            ProxyFactory proxyFactory = new ProxyFactory((Object)pollingTask);
            if (!CollectionUtils.isEmpty(adviceChain)) {
                for (Advice advice : adviceChain) {
                    proxyFactory.addAdvice(advice);
                }
            }
            pollingTask = (Callable)proxyFactory.getProxy(this.beanClassLoader);
        }
        return new Poller(pollingTask);
    }

    @Override
    protected void doStart() {
        if (!this.initialized) {
            this.onInit();
        }
        Assert.state((this.getTaskScheduler() != null ? 1 : 0) != 0, (String)"unable to start polling, no taskScheduler available");
        this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger);
    }

    @Override
    protected void doStop() {
        if (this.runningTask != null) {
            this.runningTask.cancel(true);
        }
        this.runningTask = null;
        this.initialized = false;
    }

    private boolean doPoll() {
        boolean result;
        IntegrationResourceHolder holder = this.bindResourceHolderIfNecessary(this.getResourceKey(), this.getResourceToBind());
        Message<?> message = null;
        try {
            message = this.receiveMessage();
        }
        catch (Exception e) {
            if (Thread.interrupted()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Poll interrupted - during stop()? : " + e.getMessage()));
                }
                return false;
            }
            throw (RuntimeException)e;
        }
        if (message == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Received no Message during the poll, returning 'false'");
            }
            result = false;
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Poll resulted in Message: " + message));
            }
            if (holder != null) {
                holder.setMessage(message);
            }
            this.handleMessage(message);
            result = true;
        }
        return result;
    }

    protected abstract Message<?> receiveMessage();

    protected abstract void handleMessage(Message<?> var1);

    protected Object getResourceToBind() {
        return null;
    }

    protected String getResourceKey() {
        return null;
    }

    private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) {
        if (this.transactionSynchronizationFactory != null && resource != null && TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(resource);
            TransactionSynchronizationManager.registerSynchronization((TransactionSynchronization)synchronization);
            if (synchronization instanceof IntegrationResourceHolderSynchronization) {
                IntegrationResourceHolder holder = ((IntegrationResourceHolderSynchronization)synchronization).getResourceHolder();
                if (key != null) {
                    holder.addAttribute(key, resource);
                }
                return holder;
            }
        }
        return null;
    }

    private class Poller
    implements Runnable {
        private final Callable<Boolean> pollingTask;

        public Poller(Callable<Boolean> pollingTask) {
            this.pollingTask = pollingTask;
        }

        @Override
        public void run() {
            AbstractPollingEndpoint.this.taskExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    int count = 0;
                    while (AbstractPollingEndpoint.this.initialized && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0L || (long)count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                        try {
                            if (!((Boolean)Poller.this.pollingTask.call()).booleanValue()) break;
                            ++count;
                        }
                        catch (Exception e) {
                            if (e instanceof RuntimeException) {
                                throw (RuntimeException)e;
                            }
                            throw new MessageHandlingException((Message)new ErrorMessage((Throwable)e), (Throwable)e);
                        }
                    }
                }
            });
        }
    }
}

