/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.repositories.s3.async;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.GenericStatsMetricPublisher;
import org.opensearch.repositories.s3.S3TransferRejectedException;

public class SizeBasedBlockingQ
extends AbstractLifecycleComponent {
    private static final Logger log = LogManager.getLogger(SizeBasedBlockingQ.class);
    protected final LinkedBlockingQueue<Item> queue = new LinkedBlockingQueue();
    protected final Lock lock = new ReentrantLock();
    protected final Condition notEmpty = this.lock.newCondition();
    protected final AtomicLong currentSize = new AtomicLong();
    protected final ByteSizeValue capacity;
    protected final AtomicBoolean closed;
    protected final ExecutorService executorService;
    protected final int consumers;
    private final GenericStatsMetricPublisher genericStatsMetricPublisher;
    private final QueueEventType queueEventType;

    public SizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers, GenericStatsMetricPublisher genericStatsMetricPublisher, QueueEventType queueEventType) {
        this.capacity = capacity;
        this.closed = new AtomicBoolean();
        this.executorService = executorService;
        this.consumers = consumers;
        this.genericStatsMetricPublisher = genericStatsMetricPublisher;
        this.queueEventType = queueEventType;
    }

    protected void doStart() {
        for (int worker = 0; worker < this.consumers; ++worker) {
            Consumer consumer = new Consumer(this.queue, this.currentSize, this.lock, this.notEmpty, this.closed, this.genericStatsMetricPublisher, this.queueEventType);
            this.executorService.submit(consumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void produce(Item item) throws InterruptedException {
        if (item == null || item.size <= 0L) {
            throw new IllegalStateException("Invalid item input to produce.");
        }
        log.debug(() -> "Transfer queue event received of size: " + item.size + ". Current queue utilisation: " + this.currentSize.get());
        if (this.currentSize.get() + item.size >= this.capacity.getBytes()) {
            throw new S3TransferRejectedException("S3 Transfer queue capacity reached");
        }
        Lock lock = this.lock;
        AtomicLong currentSize = this.currentSize;
        lock.lock();
        try {
            if (currentSize.get() + item.size >= this.capacity.getBytes()) {
                throw new S3TransferRejectedException("S3 Transfer queue capacity reached");
            }
            if (this.closed.get()) {
                throw new AlreadyClosedException("Transfer queue is already closed.");
            }
            this.queue.put(item);
            currentSize.addAndGet(item.size);
            this.notEmpty.signalAll();
            SizeBasedBlockingQ.updateStats(item.size, this.queueEventType, this.genericStatsMetricPublisher);
        }
        finally {
            lock.unlock();
        }
    }

    private static void updateStats(long itemSize, QueueEventType queueEventType, GenericStatsMetricPublisher genericStatsMetricPublisher) {
        if (queueEventType == QueueEventType.NORMAL) {
            genericStatsMetricPublisher.updateNormalPriorityQSize(itemSize);
        } else if (queueEventType == QueueEventType.LOW) {
            genericStatsMetricPublisher.updateLowPriorityQSize(itemSize);
        }
    }

    public int getSize() {
        return this.queue.size();
    }

    public boolean isMaxCapacityBelowContentLength(long contentLength) {
        return contentLength < this.capacity.getBytes();
    }

    protected void doStop() {
        this.doClose();
    }

    protected void doClose() {
        this.lock.lock();
        try {
            if (this.closed.get()) {
                return;
            }
            this.closed.set(true);
            this.notEmpty.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    public static enum QueueEventType {
        NORMAL,
        LOW;

    }

    protected static class Consumer
    extends Thread {
        private final LinkedBlockingQueue<Item> queue;
        private final Lock lock;
        private final Condition notEmpty;
        private final AtomicLong currentSize;
        private final AtomicBoolean closed;
        private final GenericStatsMetricPublisher genericStatsMetricPublisher;
        private final QueueEventType queueEventType;

        public Consumer(LinkedBlockingQueue<Item> queue, AtomicLong currentSize, Lock lock, Condition notEmpty, AtomicBoolean closed, GenericStatsMetricPublisher genericStatsMetricPublisher, QueueEventType queueEventType) {
            this.queue = queue;
            this.lock = lock;
            this.notEmpty = notEmpty;
            this.currentSize = currentSize;
            this.closed = closed;
            this.genericStatsMetricPublisher = genericStatsMetricPublisher;
            this.queueEventType = queueEventType;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        this.consume();
                    }
                }
                catch (AlreadyClosedException ex) {
                    return;
                }
                catch (Exception ex) {
                    log.error("Failed to consume transfer event", (Throwable)ex);
                    continue;
                }
                break;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void consume() throws InterruptedException {
            Item item;
            Lock lock = this.lock;
            AtomicLong currentSize = this.currentSize;
            lock.lock();
            try {
                if (this.closed.get()) {
                    throw new AlreadyClosedException("transfer queue closed");
                }
                while (currentSize.get() == 0L) {
                    this.notEmpty.await();
                    if (!this.closed.get()) continue;
                    throw new AlreadyClosedException("transfer queue closed");
                }
                item = this.queue.take();
                currentSize.addAndGet(-item.size);
                SizeBasedBlockingQ.updateStats(-item.size, this.queueEventType, this.genericStatsMetricPublisher);
            }
            finally {
                lock.unlock();
            }
            try {
                item.consumable.run();
            }
            catch (Exception ex) {
                log.error("Exception on executing item consumable", (Throwable)ex);
            }
        }
    }

    public static class Item {
        private final long size;
        private final Runnable consumable;

        public Item(long size, Runnable consumable) {
            this.size = size;
            this.consumable = consumable;
        }
    }
}

