/*
 * Decompiled with CFR 0.152.
 */
package lia.util.net.copy.transport;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.channels.SocketChannel;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import lia.util.net.common.Config;
import lia.util.net.common.Utils;
import lia.util.net.copy.FDTReaderSession;
import lia.util.net.copy.transport.FDTWriterKeyAttachement;
import lia.util.net.copy.transport.FDTWriterKeyAttachementComparator;
import lia.util.net.copy.transport.SocketWriterTask;
import lia.util.net.copy.transport.TCPTransportProvider;
import lia.util.net.copy.transport.internal.FDTSelectionKey;
import lia.util.net.copy.transport.internal.SelectionManager;

public class TCPSessionWriter
extends TCPTransportProvider {
    private static final Logger logger = Logger.getLogger("lia.util.net.copy.transport.TCPSessionWriter");
    private static final SelectionManager selectionManager = SelectionManager.getInstance();
    private static final Config config = Config.getInstance();

    public TCPSessionWriter(FDTReaderSession fdtSession) throws Exception {
        super(fdtSession, new PriorityBlockingQueue<FDTSelectionKey>(10, new FDTWriterKeyAttachementComparator()));
    }

    public TCPSessionWriter(FDTReaderSession fdtSession, InetAddress endPointAddress, int port, int numberOfStreams) throws Exception {
        super(fdtSession, endPointAddress, port, numberOfStreams, new PriorityBlockingQueue<FDTSelectionKey>(10, new FDTWriterKeyAttachementComparator()));
    }

    @Override
    public void notifyAvailableBytes(long available) {
        this.speedLimitLock.lock();
        try {
            this.availableBytes = available;
            this.isAvailable.signalAll();
        }
        finally {
            this.speedLimitLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public long awaitSend(long bytesNo) throws InterruptedException {
        long avForWrite = 0L;
        this.speedLimitLock.lock();
        try {
            while (avForWrite == 0L) {
                if (this.isClosed()) return avForWrite;
                if (this.availableBytes > 0L) {
                    long remainingBytes = this.availableBytes - bytesNo;
                    if (remainingBytes >= 0L) {
                        this.availableBytes = remainingBytes;
                        avForWrite = bytesNo;
                        return avForWrite;
                    }
                    avForWrite = this.availableBytes;
                    this.availableBytes = 0L;
                    continue;
                }
                if (this.isAvailable.await(2L, TimeUnit.SECONDS) && avForWrite > 0L) return avForWrite;
            }
            return avForWrite;
        }
        finally {
            this.speedLimitLock.unlock();
        }
    }

    private int getMSS(SocketChannel sc) {
        int retMSS;
        block3: {
            retMSS = Config.NETWORK_BUFF_LEN_SIZE;
            try {
                InetAddress ia = sc.socket().getLocalAddress();
                NetworkInterface ni = NetworkInterface.getByInetAddress(ia);
                int mss = ni.getMTU() - 40;
                if (mss > 1000) {
                    retMSS = mss;
                }
            }
            catch (Throwable t) {
                if (!logger.isLoggable(Level.FINE)) break block3;
                logger.log(Level.FINE, "Cannot determine MTU for socket channel: " + sc);
            }
        }
        return retMSS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addWorkerStream(SocketChannel sc, boolean sentCookie) throws Exception {
        Object object = this.closeLock;
        synchronized (object) {
            super.addWorkerStream(sc, sentCookie);
            FDTSelectionKey fsk = null;
            if (config.isBlocking()) {
                fsk = new FDTSelectionKey(this.fdtSession.sessionID(), sc, 4, this, null);
                fsk.attach(new FDTWriterKeyAttachement(fsk, this.fdtSession.useFixedBlockSize(), sentCookie));
                this.executor.submit(new SocketWriterTask(this.selectionQueue, (FDTReaderSession)this.fdtSession, this));
                if (logger.isLoggable(Level.FINEST)) {
                    logger.log(Level.FINEST, " BIO Mode. Adding SocketChannel " + sc + " to the selection queue");
                }
                this.selectionQueue.add(fsk);
            } else {
                if (logger.isLoggable(Level.FINEST)) {
                    logger.log(Level.FINEST, " NBIO Mode. Adding SocketChannel " + sc + " to the SelectionManager! ");
                }
                fsk = selectionManager.register(this.fdtSession.sessionID(), sc, 4, this);
                fsk.attach(new FDTWriterKeyAttachement(fsk, this.fdtSession.useFixedBlockSize(), sentCookie));
                if (!fsk.registerInterest()) {
                    logger.log(Level.WARNING, " \n\n Smth went terrible wrong ?? \n\n fsk.registerInterest() returned false \n\n");
                }
            }
            int mss = this.getMSS(sc);
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, " Setting MSS for: " + sc + " to: " + mss);
            }
            fsk.setMSS(mss);
            this.channels.put(sc, fsk);
        }
    }

    @Override
    public void startTransport(boolean sendCookie) throws Exception {
        super.startTransport(sendCookie);
        if (!config.isBlocking()) {
            for (int i = 0; i <= Utils.availableProcessors() * 2; ++i) {
                this.executor.submit(new SocketWriterTask(this.selectionQueue, (FDTReaderSession)this.fdtSession, this));
            }
        }
    }

    public void workerDown(FDTSelectionKey fdtSelectionKey, Throwable downCause) {
        if (downCause != null) {
            logger.log(Level.WARNING, " [ TCPSessionReader ] for fdtSession [ " + this.fdtSession + " ] got an error on a worker", downCause);
        }
        this.close("Worker down", downCause);
        if (this.fdtSession != null) {
            try {
                ((FDTReaderSession)this.fdtSession).transportWorkerDown();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }
}

