package com.alibaba.datax.core.transport.exchanger;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.Validate;

/* loaded from: input_file:com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.class */
public class BufferedRecordTransformerExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
    private final Channel channel;
    private final Configuration configuration;
    private final List<Record> buffer;
    private int bufferSize;
    protected final int byteCapacity;
    private final AtomicInteger memoryBytes;
    private int bufferIndex;
    private static Class<? extends Record> RECORD_CLASS;
    private volatile boolean shutdown;
    static final /* synthetic */ boolean $assertionsDisabled;

    public BufferedRecordTransformerExchanger(int i, int i2, Channel channel, Communication communication, TaskPluginCollector taskPluginCollector, List<TransformerExecution> list) {
        super(i, i2, communication, list, taskPluginCollector);
        this.memoryBytes = new AtomicInteger(0);
        this.bufferIndex = 0;
        this.shutdown = false;
        if (!$assertionsDisabled && null == channel) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && null == channel.getConfiguration()) {
            throw new AssertionError();
        }
        this.channel = channel;
        this.configuration = channel.getConfiguration();
        this.bufferSize = this.configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE).intValue();
        this.buffer = new ArrayList(this.bufferSize);
        this.byteCapacity = this.configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8388608).intValue();
        try {
            RECORD_CLASS = Class.forName(this.configuration.getString(CoreConstant.DATAX_CORE_TRANSPORT_RECORD_CLASS, "com.alibaba.datax.core.transport.record.DefaultRecord"));
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    public Record createRecord() {
        try {
            return RECORD_CLASS.newInstance();
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    public void sendToWriter(Record record) {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        Validate.notNull(record, "record不能为空.");
        Record doTransformer = doTransformer(record);
        if (doTransformer == null) {
            return;
        }
        if (doTransformer.getMemorySize() > this.byteCapacity) {
            this.pluginCollector.collectDirtyRecord(doTransformer, new Exception(String.format("单条记录超过大小限制，当前限制为:%s", Integer.valueOf(this.byteCapacity))));
            return;
        }
        if (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + doTransformer.getMemorySize() > this.byteCapacity) {
            flush();
        }
        this.buffer.add(doTransformer);
        this.bufferIndex++;
        this.memoryBytes.addAndGet(doTransformer.getMemorySize());
    }

    public void flush() {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        this.channel.pushAll(this.buffer);
        doStat();
        this.buffer.clear();
        this.bufferIndex = 0;
        this.memoryBytes.set(0);
    }

    public void terminate() {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        flush();
        this.channel.pushTerminate(TerminateRecord.get());
    }

    public Record getFromReader() {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        if (this.bufferIndex >= this.buffer.size()) {
            receive();
        }
        List<Record> list = this.buffer;
        int i = this.bufferIndex;
        this.bufferIndex = i + 1;
        Record record = list.get(i);
        if (record instanceof TerminateRecord) {
            record = null;
        }
        return record;
    }

    public void shutdown() {
        this.shutdown = true;
        try {
            this.buffer.clear();
            this.channel.clear();
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    private void receive() {
        this.channel.pullAll(this.buffer);
        this.bufferIndex = 0;
        this.bufferSize = this.buffer.size();
    }

    static {
        $assertionsDisabled = !BufferedRecordTransformerExchanger.class.desiredAssertionStatus();
    }
}
