package com.alibaba.datax.core.transport.channel;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import com.alibaba.datax.core.util.container.CoreConstant;
import java.util.Collection;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/datax/core/transport/channel/Channel.class */
public abstract class Channel {
    protected int taskGroupId;
    protected int capacity;
    protected int byteCapacity;
    protected long byteSpeed;
    protected long recordSpeed;
    protected long flowControlInterval;
    protected Configuration configuration;
    private Communication currentCommunication;
    private static final Logger LOG = LoggerFactory.getLogger(Channel.class);
    private static Boolean isFirstPrint = true;
    protected volatile boolean isClosed = false;
    protected volatile long waitReaderTime = 0;
    protected volatile long waitWriterTime = 0;
    private Communication lastCommunication = new Communication();

    public Channel(Configuration configuration) {
        this.configuration = null;
        int intValue = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048).intValue();
        long longValue = configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1048576L).longValue();
        long longValue2 = configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000L).longValue();
        if (intValue <= 0) {
            throw new IllegalArgumentException(String.format("通道容量[%d]必须大于0.", Integer.valueOf(intValue)));
        }
        synchronized (isFirstPrint) {
            if (isFirstPrint.booleanValue()) {
                LOG.info("Channel set byte_speed_limit to " + longValue + (longValue <= 0 ? ", No bps activated." : "."));
                LOG.info("Channel set record_speed_limit to " + longValue2 + (longValue2 <= 0 ? ", No tps activated." : "."));
                isFirstPrint = false;
            }
        }
        this.taskGroupId = configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID).intValue();
        this.capacity = intValue;
        this.byteSpeed = longValue;
        this.recordSpeed = longValue2;
        this.flowControlInterval = configuration.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000L).longValue();
        this.byteCapacity = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8388608).intValue();
        this.configuration = configuration;
    }

    public void close() {
        this.isClosed = true;
    }

    public void open() {
        this.isClosed = false;
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    public int getTaskGroupId() {
        return this.taskGroupId;
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getByteSpeed() {
        return this.byteSpeed;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void setCommunication(Communication communication) {
        this.currentCommunication = communication;
        this.lastCommunication.reset();
    }

    public void push(Record record) {
        Validate.notNull(record, "record不能为空.");
        doPush(record);
        statPush(1L, record.getByteSize());
    }

    public void pushTerminate(TerminateRecord terminateRecord) {
        Validate.notNull(terminateRecord, "record不能为空.");
        doPush(terminateRecord);
    }

    public void pushAll(Collection<Record> collection) {
        Validate.notNull(collection);
        Validate.noNullElements(collection);
        doPushAll(collection);
        statPush(collection.size(), getByteSize(collection));
    }

    public Record pull() {
        Record doPull = doPull();
        statPull(1L, doPull.getByteSize());
        return doPull;
    }

    public void pullAll(Collection<Record> collection) {
        Validate.notNull(collection);
        doPullAll(collection);
        statPull(collection.size(), getByteSize(collection));
    }

    protected abstract void doPush(Record record);

    protected abstract void doPushAll(Collection<Record> collection);

    protected abstract Record doPull();

    protected abstract void doPullAll(Collection<Record> collection);

    public abstract int size();

    public abstract boolean isEmpty();

    public abstract void clear();

    private long getByteSize(Collection<Record> collection) {
        long j = 0;
        while (collection.iterator().hasNext()) {
            j += r0.next().getByteSize();
        }
        return j;
    }

    private void statPush(long j, long j2) {
        this.currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS, j);
        this.currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES, j2);
        this.currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, this.waitReaderTime);
        this.currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, this.waitWriterTime);
        boolean z = this.byteSpeed > 0;
        boolean z2 = this.recordSpeed > 0;
        if (z || z2) {
            long timestamp = this.lastCommunication.getTimestamp();
            long currentTimeMillis = System.currentTimeMillis();
            long j3 = currentTimeMillis - timestamp;
            if (j3 - this.flowControlInterval >= 0) {
                long j4 = 0;
                long j5 = 0;
                if (z) {
                    long totalReadBytes = ((CommunicationTool.getTotalReadBytes(this.currentCommunication) - CommunicationTool.getTotalReadBytes(this.lastCommunication)) * 1000) / j3;
                    if (totalReadBytes > this.byteSpeed) {
                        j4 = ((totalReadBytes * j3) / this.byteSpeed) - j3;
                    }
                }
                if (z2) {
                    long totalReadRecords = ((CommunicationTool.getTotalReadRecords(this.currentCommunication) - CommunicationTool.getTotalReadRecords(this.lastCommunication)) * 1000) / j3;
                    if (totalReadRecords > this.recordSpeed) {
                        j5 = ((totalReadRecords * j3) / this.recordSpeed) - j3;
                    }
                }
                long j6 = j4 < j5 ? j5 : j4;
                if (j6 > 0) {
                    try {
                        Thread.sleep(j6);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                this.lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES, this.currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES).longValue());
                this.lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES, this.currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES).longValue());
                this.lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS, this.currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS).longValue());
                this.lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS, this.currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS).longValue());
                this.lastCommunication.setTimestamp(currentTimeMillis);
            }
        }
    }

    private void statPull(long j, long j2) {
        this.currentCommunication.increaseCounter(CommunicationTool.WRITE_RECEIVED_RECORDS, j);
        this.currentCommunication.increaseCounter(CommunicationTool.WRITE_RECEIVED_BYTES, j2);
    }
}
