package com.alibaba.datax.core.job;

import com.alibaba.datax.common.constant.PluginType;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.AbstractJobPlugin;
import com.alibaba.datax.common.plugin.JobPluginCollector;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.statistics.VMInfo;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.StrUtil;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.container.util.HookInvoker;
import com.alibaba.datax.core.container.util.JobAssignUtil;
import com.alibaba.datax.core.job.scheduler.AbstractScheduler;
import com.alibaba.datax.core.job.scheduler.processinner.StandAloneScheduler;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator;
import com.alibaba.datax.core.statistics.plugin.DefaultJobPluginCollector;
import com.alibaba.datax.core.util.ErrorRecordChecker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
import com.alibaba.fastjson.JSON;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/datax/core/job/JobContainer.class */
public class JobContainer extends AbstractContainer {
    private static final Logger LOG = LoggerFactory.getLogger(JobContainer.class);
    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private ClassLoaderSwapper classLoaderSwapper;
    private long jobId;
    private String readerPluginName;
    private String writerPluginName;
    private Reader.Job jobReader;
    private Writer.Job jobWriter;
    private Configuration userConf;
    private long startTimeStamp;
    private long endTimeStamp;
    private long startTransferTimeStamp;
    private long endTransferTimeStamp;
    private int needChannelNumber;
    private int totalStage;
    private ErrorRecordChecker errorLimit;

    public JobContainer(Configuration configuration) {
        super(configuration);
        this.classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
        this.totalStage = 1;
        this.errorLimit = new ErrorRecordChecker(configuration);
    }

    @Override // com.alibaba.datax.core.AbstractContainer
    public void start() {
        LOG.info("DataX jobContainer starts job.");
        try {
            try {
                this.startTimeStamp = System.currentTimeMillis();
                boolean booleanValue = this.configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false).booleanValue();
                if (booleanValue) {
                    LOG.info("jobContainer starts to do preCheck ...");
                    preCheck();
                } else {
                    this.userConf = this.configuration.clone();
                    LOG.debug("jobContainer starts to do preHandle ...");
                    preHandle();
                    LOG.debug("jobContainer starts to do init ...");
                    init();
                    LOG.info("jobContainer starts to do prepare ...");
                    prepare();
                    LOG.info("jobContainer starts to do split ...");
                    this.totalStage = split();
                    LOG.info("jobContainer starts to do schedule ...");
                    schedule();
                    LOG.debug("jobContainer starts to do post ...");
                    post();
                    LOG.debug("jobContainer starts to do postHandle ...");
                    postHandle();
                    LOG.info("DataX jobId [{}] completed successfully.", Long.valueOf(this.jobId));
                    invokeHooks();
                }
                if (booleanValue) {
                    return;
                }
                destroy();
                this.endTimeStamp = System.currentTimeMillis();
                if (0 == 0) {
                    VMInfo vmInfo = VMInfo.getVmInfo();
                    if (vmInfo != null) {
                        vmInfo.getDelta(false);
                        LOG.info(vmInfo.totalString());
                    }
                    LOG.info(PerfTrace.getInstance().summarizeNoException());
                    logStatistics();
                }
            } catch (Throwable th) {
                LOG.error("Exception when job run", th);
                if (th instanceof OutOfMemoryError) {
                    destroy();
                    System.gc();
                }
                if (super.getContainerCommunicator() == null) {
                    super.setContainerCommunicator(new StandAloneJobContainerCommunicator(this.configuration));
                }
                Communication collect = super.getContainerCommunicator().collect();
                collect.setThrowable(th);
                collect.setTimestamp(this.endTimeStamp);
                Communication communication = new Communication();
                communication.setTimestamp(this.startTransferTimeStamp);
                super.getContainerCommunicator().report(CommunicationTool.getReportCommunication(collect, communication, this.totalStage));
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, th);
            }
        } catch (Throwable th2) {
            if (0 == 0) {
                destroy();
                this.endTimeStamp = System.currentTimeMillis();
                if (0 == 0) {
                    VMInfo vmInfo2 = VMInfo.getVmInfo();
                    if (vmInfo2 != null) {
                        vmInfo2.getDelta(false);
                        LOG.info(vmInfo2.totalString());
                    }
                    LOG.info(PerfTrace.getInstance().summarizeNoException());
                    logStatistics();
                }
            }
            throw th2;
        }
    }

    private void preCheck() {
        preCheckInit();
        adjustChannelNumber();
        if (this.needChannelNumber <= 0) {
            this.needChannelNumber = 1;
        }
        preCheckReader();
        preCheckWriter();
        LOG.info("PreCheck通过");
    }

    private void preCheckInit() {
        this.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1L).longValue();
        if (this.jobId < 0) {
            LOG.info("Set jobId = 0");
            this.jobId = 0L;
            this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, Long.valueOf(this.jobId));
        }
        Thread.currentThread().setName("job-" + this.jobId);
        DefaultJobPluginCollector defaultJobPluginCollector = new DefaultJobPluginCollector(getContainerCommunicator());
        this.jobReader = preCheckReaderInit(defaultJobPluginCollector);
        this.jobWriter = preCheckWriterInit(defaultJobPluginCollector);
    }

    private Reader.Job preCheckReaderInit(JobPluginCollector jobPluginCollector) {
        this.readerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
        Reader.Job loadJobPlugin = LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);
        this.configuration.set("job.content[0].reader.parameter.dryRun", true);
        loadJobPlugin.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
        loadJobPlugin.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
        loadJobPlugin.setJobPluginCollector(jobPluginCollector);
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
        return loadJobPlugin;
    }

    private Writer.Job preCheckWriterInit(JobPluginCollector jobPluginCollector) {
        this.writerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
        Writer.Job loadJobPlugin = LoadUtil.loadJobPlugin(PluginType.WRITER, this.writerPluginName);
        this.configuration.set("job.content[0].writer.parameter.dryRun", true);
        loadJobPlugin.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
        loadJobPlugin.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
        loadJobPlugin.setPeerPluginName(this.readerPluginName);
        loadJobPlugin.setJobPluginCollector(jobPluginCollector);
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
        return loadJobPlugin;
    }

    private void preCheckReader() {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
        LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .", this.readerPluginName));
        this.jobReader.preCheck();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
    }

    private void preCheckWriter() {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
        LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .", this.writerPluginName));
        this.jobWriter.preCheck();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
    }

    private void init() {
        this.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1L).longValue();
        if (this.jobId < 0) {
            LOG.info("Set jobId = 0");
            this.jobId = 0L;
            this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, Long.valueOf(this.jobId));
        }
        Thread.currentThread().setName("job-" + this.jobId);
        DefaultJobPluginCollector defaultJobPluginCollector = new DefaultJobPluginCollector(getContainerCommunicator());
        this.jobReader = initJobReader(defaultJobPluginCollector);
        this.jobWriter = initJobWriter(defaultJobPluginCollector);
    }

    private void prepare() {
        prepareJobReader();
        prepareJobWriter();
    }

    private void preHandle() {
        String string = this.configuration.getString(CoreConstant.DATAX_JOB_PREHANDLER_PLUGINTYPE);
        if (StringUtils.isNotEmpty(string)) {
            try {
                PluginType valueOf = PluginType.valueOf(string.toUpperCase());
                String string2 = this.configuration.getString(CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
                this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(valueOf, string2));
                AbstractJobPlugin loadJobPlugin = LoadUtil.loadJobPlugin(valueOf, string2);
                loadJobPlugin.setJobPluginCollector(new DefaultJobPluginCollector(getContainerCommunicator()));
                loadJobPlugin.preHandler(this.configuration);
                this.classLoaderSwapper.restoreCurrentThreadClassLoader();
                LOG.info("After PreHandler: \n" + Engine.filterJobConfiguration(this.configuration) + "\n");
            } catch (IllegalArgumentException e) {
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, String.format("Job preHandler's pluginType(%s) set error, reason(%s)", string.toUpperCase(), e.getMessage()));
            }
        }
    }

    private void postHandle() {
        String string = this.configuration.getString(CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINTYPE);
        if (StringUtils.isNotEmpty(string)) {
            try {
                PluginType valueOf = PluginType.valueOf(string.toUpperCase());
                String string2 = this.configuration.getString(CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
                this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(valueOf, string2));
                AbstractJobPlugin loadJobPlugin = LoadUtil.loadJobPlugin(valueOf, string2);
                loadJobPlugin.setJobPluginCollector(new DefaultJobPluginCollector(getContainerCommunicator()));
                loadJobPlugin.postHandler(this.configuration);
                this.classLoaderSwapper.restoreCurrentThreadClassLoader();
            } catch (IllegalArgumentException e) {
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, String.format("Job postHandler's pluginType(%s) set error, reason(%s)", string.toUpperCase(), e.getMessage()));
            }
        }
    }

    private int split() {
        adjustChannelNumber();
        if (this.needChannelNumber <= 0) {
            this.needChannelNumber = 1;
        }
        List<Configuration> doReaderSplit = doReaderSplit(this.needChannelNumber);
        List<Configuration> doWriterSplit = doWriterSplit(doReaderSplit.size());
        List<Configuration> listConfiguration = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
        LOG.debug("transformer configuration: " + JSON.toJSONString(listConfiguration));
        List<Configuration> mergeReaderAndWriterTaskConfigs = mergeReaderAndWriterTaskConfigs(doReaderSplit, doWriterSplit, listConfiguration);
        LOG.debug("contentConfig configuration: " + JSON.toJSONString(mergeReaderAndWriterTaskConfigs));
        this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, mergeReaderAndWriterTaskConfigs);
        return mergeReaderAndWriterTaskConfigs.size();
    }

    private void adjustChannelNumber() {
        int i = Integer.MAX_VALUE;
        int i2 = Integer.MAX_VALUE;
        if (this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0).intValue() > 0) {
            long intValue = this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10485760).intValue();
            Long l = this.configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
            if (l == null || l.longValue() <= 0) {
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "在有总bps限速条件下，单个channel的bps值不能为空，也不能为非正数");
            }
            int longValue = (int) (intValue / l.longValue());
            i = longValue > 0 ? longValue : 1;
            LOG.info("Job set Max-Byte-Speed to " + intValue + " bytes.");
        }
        if (this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0).intValue() > 0) {
            long intValue2 = this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000).intValue();
            Long l2 = this.configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
            if (l2 == null || l2.longValue() <= 0) {
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "在有总tps限速条件下，单个channel的tps值不能为空，也不能为非正数");
            }
            int longValue2 = (int) (intValue2 / l2.longValue());
            i2 = longValue2 > 0 ? longValue2 : 1;
            LOG.info("Job set Max-Record-Speed to " + intValue2 + " records.");
        }
        this.needChannelNumber = i < i2 ? i : i2;
        if (this.needChannelNumber < Integer.MAX_VALUE) {
            return;
        }
        if (!(this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0).intValue() > 0)) {
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "Job运行速度必须设置");
        }
        this.needChannelNumber = this.configuration.getInt(CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL).intValue();
        LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels.");
    }

    private void schedule() {
        int intValue = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5).intValue();
        this.needChannelNumber = Math.min(this.needChannelNumber, this.configuration.getList(CoreConstant.DATAX_JOB_CONTENT).size());
        PerfTrace.getInstance().setChannelNumber(this.needChannelNumber);
        List<Configuration> assignFairly = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, intValue);
        LOG.info("Scheduler starts [{}] taskGroups.", Integer.valueOf(assignFairly.size()));
        try {
            ExecuteMode executeMode = ExecuteMode.STANDALONE;
            AbstractScheduler initStandaloneScheduler = initStandaloneScheduler(this.configuration);
            Iterator<Configuration> it = assignFairly.iterator();
            while (it.hasNext()) {
                it.next().set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
            }
            if ((executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) && this.jobId <= 0) {
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, "在[ local | distribute ]模式下必须设置jobId，并且其值 > 0 .");
            }
            LOG.info("Running by {} Mode.", executeMode);
            this.startTransferTimeStamp = System.currentTimeMillis();
            initStandaloneScheduler.schedule(assignFairly);
            this.endTransferTimeStamp = System.currentTimeMillis();
            checkLimit();
        } catch (Exception e) {
            LOG.error("运行scheduler 模式[{}]出错.", (Object) null);
            this.endTransferTimeStamp = System.currentTimeMillis();
            throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    private AbstractScheduler initStandaloneScheduler(Configuration configuration) {
        StandAloneJobContainerCommunicator standAloneJobContainerCommunicator = new StandAloneJobContainerCommunicator(configuration);
        super.setContainerCommunicator(standAloneJobContainerCommunicator);
        return new StandAloneScheduler(standAloneJobContainerCommunicator);
    }

    private void post() {
        postJobWriter();
        postJobReader();
    }

    private void destroy() {
        if (this.jobWriter != null) {
            this.jobWriter.destroy();
            this.jobWriter = null;
        }
        if (this.jobReader != null) {
            this.jobReader.destroy();
            this.jobReader = null;
        }
    }

    private void logStatistics() {
        long j = (this.endTimeStamp - this.startTimeStamp) / 1000;
        long j2 = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
        if (0 == j2) {
            j2 = 1;
        }
        if (super.getContainerCommunicator() == null) {
            return;
        }
        Communication collect = super.getContainerCommunicator().collect();
        collect.setTimestamp(this.endTimeStamp);
        Communication communication = new Communication();
        communication.setTimestamp(this.startTransferTimeStamp);
        Communication reportCommunication = CommunicationTool.getReportCommunication(collect, communication, this.totalStage);
        long longValue = collect.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES).longValue() / j2;
        long longValue2 = collect.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS).longValue() / j2;
        reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, longValue);
        reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, longValue2);
        super.getContainerCommunicator().report(reportCommunication);
        LOG.info(String.format("\n%-26s: %-18s\n%-26s: %-18s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n", "任务启动时刻", dateFormat.format(Long.valueOf(this.startTimeStamp)), "任务结束时刻", dateFormat.format(Long.valueOf(this.endTimeStamp)), "任务总计耗时", String.valueOf(j) + "s", "任务平均流量", StrUtil.stringify(longValue) + "/s", "记录写入速度", String.valueOf(longValue2) + "rec/s", "读出记录总数", String.valueOf(CommunicationTool.getTotalReadRecords(collect)), "读写失败总数", String.valueOf(CommunicationTool.getTotalErrorRecords(collect))));
        if (collect.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS).longValue() > 0 || collect.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS).longValue() > 0 || collect.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS).longValue() > 0) {
            LOG.info(String.format("\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n", "Transformer成功记录总数", collect.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS), "Transformer失败记录总数", collect.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS), "Transformer过滤记录总数", collect.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)));
        }
    }

    private Reader.Job initJobReader(JobPluginCollector jobPluginCollector) {
        this.readerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
        Reader.Job loadJobPlugin = LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);
        loadJobPlugin.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
        loadJobPlugin.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
        loadJobPlugin.setJobPluginCollector(jobPluginCollector);
        loadJobPlugin.init();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
        return loadJobPlugin;
    }

    private Writer.Job initJobWriter(JobPluginCollector jobPluginCollector) {
        this.writerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
        Writer.Job loadJobPlugin = LoadUtil.loadJobPlugin(PluginType.WRITER, this.writerPluginName);
        loadJobPlugin.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
        loadJobPlugin.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
        loadJobPlugin.setPeerPluginName(this.readerPluginName);
        loadJobPlugin.setJobPluginCollector(jobPluginCollector);
        loadJobPlugin.init();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
        return loadJobPlugin;
    }

    private void prepareJobReader() {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
        LOG.info(String.format("DataX Reader.Job [%s] do prepare work .", this.readerPluginName));
        this.jobReader.prepare();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
    }

    private void prepareJobWriter() {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
        LOG.info(String.format("DataX Writer.Job [%s] do prepare work .", this.writerPluginName));
        this.jobWriter.prepare();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
    }

    private List<Configuration> doReaderSplit(int i) {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
        List<Configuration> split = this.jobReader.split(i);
        if (split == null || split.size() <= 0) {
            throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, "reader切分的task数目不能小于等于0");
        }
        LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.", this.readerPluginName, Integer.valueOf(split.size()));
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
        return split;
    }

    private List<Configuration> doWriterSplit(int i) {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
        List<Configuration> split = this.jobWriter.split(i);
        if (split == null || split.size() <= 0) {
            throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, "writer切分的task不能小于等于0");
        }
        LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.", this.writerPluginName, Integer.valueOf(split.size()));
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
        return split;
    }

    private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> list, List<Configuration> list2) {
        return mergeReaderAndWriterTaskConfigs(list, list2, null);
    }

    private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> list, List<Configuration> list2, List<Configuration> list3) {
        if (list.size() != list2.size()) {
            throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].", Integer.valueOf(list.size()), Integer.valueOf(list2.size())));
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            Configuration newDefault = Configuration.newDefault();
            newDefault.set(CoreConstant.JOB_READER_NAME, this.readerPluginName);
            newDefault.set(CoreConstant.JOB_READER_PARAMETER, list.get(i));
            newDefault.set(CoreConstant.JOB_WRITER_NAME, this.writerPluginName);
            newDefault.set(CoreConstant.JOB_WRITER_PARAMETER, list2.get(i));
            if (list3 != null && list3.size() > 0) {
                newDefault.set(CoreConstant.JOB_TRANSFORMER, list3);
            }
            newDefault.set(CoreConstant.TASK_ID, Integer.valueOf(i));
            arrayList.add(newDefault);
        }
        return arrayList;
    }

    private List<Configuration> distributeTasksToTaskGroup(int i, int i2, int i3) {
        Validate.isTrue(i > 0 && i2 > 0 && i3 > 0, "每个channel的平均task数[averTaskPerChannel]，channel数目[channelNumber]，每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数");
        List listConfiguration = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
        int i4 = i2 / i3;
        int i5 = i2 % i3;
        if (i5 > 0) {
            i4++;
        }
        if (i4 == 1) {
            final Configuration clone = this.configuration.clone();
            clone.set(CoreConstant.DATAX_JOB_CONTENT, this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT));
            clone.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, Integer.valueOf(i2));
            clone.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, 0);
            return new ArrayList<Configuration>() { // from class: com.alibaba.datax.core.job.JobContainer.1
                {
                    add(clone);
                }
            };
        }
        ArrayList arrayList = new ArrayList();
        for (int i6 = 0; i6 < i4; i6++) {
            Configuration clone2 = this.configuration.clone();
            List listConfiguration2 = clone2.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
            listConfiguration2.clear();
            clone2.set(CoreConstant.DATAX_JOB_CONTENT, listConfiguration2);
            arrayList.add(clone2);
        }
        int i7 = 0;
        int i8 = 0;
        if (i5 > 0) {
            Configuration configuration = (Configuration) arrayList.get(0);
            for (int i9 = 0; i9 < i5; i9++) {
                for (int i10 = 0; i10 < i; i10++) {
                    List listConfiguration3 = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
                    int i11 = i7;
                    i7++;
                    listConfiguration3.add(listConfiguration.get(i11));
                    configuration.set(CoreConstant.DATAX_JOB_CONTENT, listConfiguration3);
                }
            }
            configuration.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, Integer.valueOf(i5));
            i8 = 0 + 1;
            configuration.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, 0);
        }
        int i12 = i8;
        while (i7 < listConfiguration.size() && i12 < arrayList.size()) {
            for (int i13 = i12; i13 < arrayList.size() && i7 < listConfiguration.size(); i13++) {
                Configuration configuration2 = (Configuration) arrayList.get(i13);
                List listConfiguration4 = configuration2.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
                int i14 = i7;
                i7++;
                listConfiguration4.add(listConfiguration.get(i14));
                configuration2.set(CoreConstant.DATAX_JOB_CONTENT, listConfiguration4);
            }
        }
        int i15 = i12;
        while (i15 < arrayList.size()) {
            Configuration configuration3 = (Configuration) arrayList.get(i15);
            configuration3.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, Integer.valueOf(i3));
            int i16 = i15;
            i15++;
            configuration3.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, Integer.valueOf(i16));
        }
        return arrayList;
    }

    private void postJobReader() {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
        LOG.info("DataX Reader.Job [{}] do post work.", this.readerPluginName);
        this.jobReader.post();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
    }

    private void postJobWriter() {
        this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
        LOG.info("DataX Writer.Job [{}] do post work.", this.writerPluginName);
        this.jobWriter.post();
        this.classLoaderSwapper.restoreCurrentThreadClassLoader();
    }

    private void checkLimit() {
        Communication collect = super.getContainerCommunicator().collect();
        this.errorLimit.checkRecordLimit(collect);
        this.errorLimit.checkPercentageLimit(collect);
    }

    private void invokeHooks() {
        new HookInvoker(CoreConstant.DATAX_HOME + "/hook", this.configuration, super.getContainerCommunicator().collect().getCounter()).invokeAll();
    }
}
