package com.alibaba.datax.core.transport.channel.memory;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.record.TerminateRecord;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/alibaba/datax/core/transport/channel/memory/MemoryChannel.class */
public class MemoryChannel extends Channel {
    private int bufferSize;
    private AtomicInteger memoryBytes;
    private ArrayBlockingQueue<Record> queue;
    private ReentrantLock lock;
    private Condition notInsufficient;
    private Condition notEmpty;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MemoryChannel(Configuration configuration) {
        super(configuration);
        this.bufferSize = 0;
        this.memoryBytes = new AtomicInteger(0);
        this.queue = null;
        this.queue = new ArrayBlockingQueue<>(getCapacity());
        this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE).intValue();
        this.lock = new ReentrantLock();
        this.notInsufficient = this.lock.newCondition();
        this.notEmpty = this.lock.newCondition();
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    public void close() {
        super.close();
        try {
            this.queue.put(TerminateRecord.get());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    public void clear() {
        this.queue.clear();
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    protected void doPush(Record record) {
        try {
            long nanoTime = System.nanoTime();
            this.queue.put(record);
            this.waitWriterTime += System.nanoTime() - nanoTime;
            this.memoryBytes.addAndGet(record.getMemorySize());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    protected void doPushAll(Collection<Record> collection) {
        try {
            try {
                long nanoTime = System.nanoTime();
                this.lock.lockInterruptibly();
                int recordBytes = getRecordBytes(collection);
                while (true) {
                    if (this.memoryBytes.get() + recordBytes <= this.byteCapacity && collection.size() <= this.queue.remainingCapacity()) {
                        this.queue.addAll(collection);
                        this.waitWriterTime += System.nanoTime() - nanoTime;
                        this.memoryBytes.addAndGet(recordBytes);
                        this.notEmpty.signalAll();
                        this.lock.unlock();
                        return;
                    }
                    this.notInsufficient.await(200L, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    protected Record doPull() {
        try {
            long nanoTime = System.nanoTime();
            Record take = this.queue.take();
            this.waitReaderTime += System.nanoTime() - nanoTime;
            this.memoryBytes.addAndGet(-take.getMemorySize());
            return take;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    protected void doPullAll(Collection<Record> collection) {
        if (!$assertionsDisabled && collection == null) {
            throw new AssertionError();
        }
        collection.clear();
        try {
            try {
                long nanoTime = System.nanoTime();
                this.lock.lockInterruptibly();
                while (this.queue.drainTo(collection, this.bufferSize) <= 0) {
                    this.notEmpty.await(200L, TimeUnit.MILLISECONDS);
                }
                this.waitReaderTime += System.nanoTime() - nanoTime;
                this.memoryBytes.addAndGet(-getRecordBytes(collection));
                this.notInsufficient.signalAll();
                this.lock.unlock();
            } catch (InterruptedException e) {
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private int getRecordBytes(Collection<Record> collection) {
        int i = 0;
        Iterator<Record> it = collection.iterator();
        while (it.hasNext()) {
            i += it.next().getMemorySize();
        }
        return i;
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    public int size() {
        return this.queue.size();
    }

    @Override // com.alibaba.datax.core.transport.channel.Channel
    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    static {
        $assertionsDisabled = !MemoryChannel.class.desiredAssertionStatus();
    }
}
