package com.alibaba.datax.core.job.scheduler;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
import com.alibaba.datax.core.util.ErrorRecordChecker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/datax/core/job/scheduler/AbstractScheduler.class */
public abstract class AbstractScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduler.class);
    private ErrorRecordChecker errorLimit;
    private AbstractContainerCommunicator containerCommunicator;
    private Long jobId;

    public Long getJobId() {
        return this.jobId;
    }

    public AbstractScheduler(AbstractContainerCommunicator abstractContainerCommunicator) {
        this.containerCommunicator = abstractContainerCommunicator;
    }

    public void schedule(List<Configuration> list) {
        Validate.notNull(list, "scheduler配置不能为空");
        int intValue = list.get(0).getInt(CoreConstant.DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL, 30000).intValue();
        int intValue2 = list.get(0).getInt(CoreConstant.DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL, 10000).intValue();
        this.jobId = list.get(0).getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
        this.errorLimit = new ErrorRecordChecker(list.get(0));
        this.containerCommunicator.registerCommunication(list);
        int calculateTaskCount = calculateTaskCount(list);
        startAllTaskGroup(list);
        Communication communication = new Communication();
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                Communication collect = this.containerCommunicator.collect();
                collect.setTimestamp(System.currentTimeMillis());
                LOG.debug(collect.toString());
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 - currentTimeMillis > intValue) {
                    this.containerCommunicator.report(CommunicationTool.getReportCommunication(collect, communication, calculateTaskCount));
                    currentTimeMillis = currentTimeMillis2;
                    communication = collect;
                }
                this.errorLimit.checkRecordLimit(collect);
                if (collect.getState() == State.SUCCEEDED) {
                    LOG.info("Scheduler accomplished all tasks.");
                    return;
                }
                if (isJobKilling(getJobId())) {
                    dealKillingStat(this.containerCommunicator, calculateTaskCount);
                } else if (collect.getState() == State.FAILED) {
                    dealFailedStat(this.containerCommunicator, collect.getThrowable());
                }
                Thread.sleep(intValue2);
            } catch (InterruptedException e) {
                LOG.error("捕获到InterruptedException异常!", e);
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
            }
        }
    }

    protected abstract void startAllTaskGroup(List<Configuration> list);

    protected abstract void dealFailedStat(AbstractContainerCommunicator abstractContainerCommunicator, Throwable th);

    protected abstract void dealKillingStat(AbstractContainerCommunicator abstractContainerCommunicator, int i);

    private int calculateTaskCount(List<Configuration> list) {
        int i = 0;
        Iterator<Configuration> it = list.iterator();
        while (it.hasNext()) {
            i += it.next().getListConfiguration(CoreConstant.DATAX_JOB_CONTENT).size();
        }
        return i;
    }

    protected abstract boolean isJobKilling(Long l);
}
