/*
 * Decompiled with CFR 0.152.
 */
package lia.util.net.copy.transport;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import lia.util.net.common.AbstractFDTIOEntity;
import lia.util.net.common.Config;
import lia.util.net.common.DirectByteBufferPool;
import lia.util.net.common.Utils;
import lia.util.net.copy.FDTSession;
import lia.util.net.copy.monitoring.NetSessionMonitoringTask;
import lia.util.net.copy.transport.FDTKeyAttachement;
import lia.util.net.copy.transport.FDTProcolException;
import lia.util.net.copy.transport.SocketTask;
import lia.util.net.copy.transport.SpeedLimitManager;
import lia.util.net.copy.transport.SpeedLimiter;
import lia.util.net.copy.transport.internal.FDTSelectionKey;
import lia.util.net.copy.transport.internal.SelectionHandler;
import lia.util.net.copy.transport.internal.SelectionManager;

public abstract class TCPTransportProvider
extends AbstractFDTIOEntity
implements SelectionHandler,
SpeedLimiter {
    protected static final SelectionManager selectionManager = SelectionManager.getInstance();
    private static final Logger logger = Logger.getLogger(TCPTransportProvider.class.getName());
    private static final Config config = Config.getInstance();
    protected final Lock speedLimitLock = new ReentrantLock(true);
    protected final Condition isAvailable = this.speedLimitLock.newCondition();
    protected final HashMap<SocketChannel, FDTSelectionKey> channels = new HashMap();
    protected final FDTSession fdtSession;
    protected final ExecutorService executor;
    protected final ArrayList<SocketTask> socketTasks = new ArrayList();
    protected final BlockingQueue<FDTSelectionKey> selectionQueue;
    public NetSessionMonitoringTask monitoringTask;
    protected long availableBytes;
    protected InetAddress endPointAddress;
    protected int port;
    protected int numberOfStreams;
    ScheduledFuture<?> monitoringTaskFuture;
    ScheduledFuture<?> limiterTask;

    public TCPTransportProvider(FDTSession fdtSession) throws Exception {
        this(fdtSession, new LinkedBlockingQueue<FDTSelectionKey>());
    }

    public TCPTransportProvider(FDTSession fdtSession, BlockingQueue<FDTSelectionKey> selectionQueue) throws Exception {
        this.fdtSession = fdtSession;
        this.selectionQueue = selectionQueue;
        this.executor = Utils.getStandardExecService(" TCPTransportProvider task executor for FDTSession ( " + fdtSession.sessionID() + " )", Utils.availableProcessors(), 4096, 8);
        if (!this.isClosed()) {
            this.limiterTask = SpeedLimitManager.getInstance().addLimiter(this);
        }
    }

    public TCPTransportProvider(FDTSession fdtSession, InetAddress endPointAddress, int port, int numberOfStreams) throws Exception {
        this(fdtSession, endPointAddress, port, numberOfStreams, new LinkedBlockingQueue<FDTSelectionKey>());
    }

    public TCPTransportProvider(FDTSession fdtSession, InetAddress endPointAddress, int port, int numberOfStreams, BlockingQueue<FDTSelectionKey> selectionQueue) throws Exception {
        this(fdtSession, selectionQueue);
        this.endPointAddress = endPointAddress;
        this.port = port;
        this.numberOfStreams = numberOfStreams;
    }

    private static final List<SocketChannel> tryToConnect(InetSocketAddress addr, int numberOfStreams, ByteBuffer connectCookie, boolean sendCookie) throws Exception {
        if (addr == null) {
            throw new NullPointerException("Address cannot be null");
        }
        if (numberOfStreams <= 0) {
            throw new IllegalArgumentException("Number of streams must be > 0 ( " + numberOfStreams + ")");
        }
        ArrayList<SocketChannel> tmpChannels = new ArrayList<SocketChannel>();
        Selector tmpSelector = null;
        ArrayList<SocketChannel> connectedChannels = new ArrayList<SocketChannel>();
        try {
            int i;
            int windowSize = config.getSockBufSize();
            boolean noDelay = config.isNagleEnabled();
            int usedWindowSize = -1;
            tmpSelector = Selector.open();
            int bSockConn = Config.getBulkSockConnect();
            long bSockConnWait = Config.getBulkSockConnectWait();
            logger.log(Level.FINER, " bSockConn: " + bSockConn + " bSockConnWait: " + bSockConnWait);
            int cCounter = 0;
            for (i = 0; i < numberOfStreams; ++i) {
                String sdpConfFlag;
                boolean bSDP;
                if (bSockConn > 0 && cCounter > bSockConn) {
                    try {
                        long sTime = bSockConnWait > 100L ? bSockConnWait : 1000L;
                        logger.log(Level.INFO, " connected " + i + " sockets. sleeping " + sTime + " ms");
                        Thread.sleep(sTime);
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, " Exception trying to wait for bulk connections", t);
                    }
                    cCounter = 0;
                }
                ++cCounter;
                SocketChannel sc = SocketChannel.open();
                sc.configureBlocking(config.isBlocking());
                sc.connect(addr);
                tmpChannels.add(sc);
                Socket s = sc.socket();
                if (windowSize > 0) {
                    s.setSendBufferSize(windowSize);
                }
                boolean bl = bSDP = (sdpConfFlag = System.getProperty("com.sun.sdp.conf")) != null && !sdpConfFlag.isEmpty();
                if (!bSDP) {
                    try {
                        s.setKeepAlive(true);
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, "[ FDTServer ] [ AcceptableTask ] Cannot set KEEP_ALIVE for " + sc + ". Will ignore the error. Contact your sys admin.", t);
                    }
                    try {
                        s.setTrafficClass(28);
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, "[ FDTServer ] [ AcceptableTask ] Cannot set traffic class for " + sc + "[ IPTOS_RELIABILITY (0x04) | IPTOS_THROUGHPUT (0x08) | IPTOS_LOWDELAY (0x10) ] Will ignore the error. Contact your sys admin.", t);
                    }
                }
                if (!sc.isBlocking()) {
                    sc.register(tmpSelector, 12);
                    continue;
                }
                sc.finishConnect();
                usedWindowSize = sc.socket().getSendBufferSize();
                if (!sendCookie) {
                    connectedChannels.add(sc);
                    continue;
                }
                if (connectCookie == null) continue;
                if (sc.write(connectCookie) < 0 || connectCookie.hasRemaining()) {
                    throw new IOException("Cannot connect");
                }
                connectedChannels.add(sc);
                connectCookie.flip();
            }
            i = 0;
            while (tmpChannels.size() != connectedChannels.size()) {
                int n = tmpSelector.select();
                if (n == 0) continue;
                Set<SelectionKey> selectedKeys = tmpSelector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey ssk = it.next();
                    it.remove();
                    SocketChannel sc = (SocketChannel)ssk.channel();
                    if (ssk.isConnectable()) {
                        ssk.interestOps(ssk.interestOps() & 0xFFFFFFF7);
                        while (!sc.finishConnect()) {
                            System.out.println("Socket not yet connected!!!");
                            Thread.yield();
                        }
                        continue;
                    }
                    try {
                        if (noDelay) {
                            sc.socket().setTcpNoDelay(true);
                        }
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, " Cannot enable/disable Nagle's alg", t);
                    }
                    try {
                        sc.socket().setKeepAlive(true);
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, " Cannot set KEEP_ALIVE", t);
                    }
                    usedWindowSize = sc.socket().getSendBufferSize();
                    ssk.interestOps(ssk.interestOps() & 0xFFFFFFFB);
                    if (sendCookie && connectCookie != null) {
                        while (sc.write(connectCookie) >= 0 && connectCookie.hasRemaining()) {
                            Thread.yield();
                        }
                        connectCookie.flip();
                    }
                    ++i;
                    if (logger.isLoggable(Level.FINE)) {
                        logger.log(Level.FINE, "Connection ( " + i + " ) established [ " + sc.socket().getLocalAddress() + ":" + sc.socket().getLocalPort() + " -> " + sc.socket().getInetAddress() + ":" + sc.socket().getPort() + " ]");
                    }
                    connectedChannels.add(sc);
                }
            }
            logger.log(Level.INFO, "Requested window size " + windowSize + ". Using window size: " + usedWindowSize);
        }
        catch (Throwable t) {
            logger.log(Level.WARNING, "Unable to connect to " + addr.toString(), t);
            for (SocketChannel sc : tmpChannels) {
                try {
                    sc.close();
                }
                catch (Throwable throwable) {}
            }
            throw new Exception(t);
        }
        finally {
            if (tmpSelector != null) {
                try {
                    tmpSelector.close();
                }
                catch (Throwable windowSize) {}
            }
        }
        return tmpChannels;
    }

    public final boolean useFixedBlockSize() {
        return this.fdtSession.useFixedBlockSize();
    }

    public final boolean localLoop() {
        return this.fdtSession.localLoop();
    }

    public final boolean isNetTest() {
        return this.fdtSession.isNetTest();
    }

    @Override
    public long getSize() {
        return -1L;
    }

    @Override
    public long getNotifyDelay() {
        return this.fdtSession.getRateLimitDelay();
    }

    @Override
    public void notifyAvailableBytes(long available) {
    }

    @Override
    public final long getRateLimit() {
        return this.fdtSession.getRateLimit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfStreams() {
        Object object = this.closeLock;
        synchronized (object) {
            return this.channels.size();
        }
    }

    public InetAddress getRemoteEndPointAddress() {
        return null;
    }

    public FDTSession getSession() {
        return this.fdtSession;
    }

    private final void clearSelectionQueue() {
        try {
            FDTSelectionKey fsk = (FDTSelectionKey)this.selectionQueue.poll();
            while (fsk != null) {
                FDTKeyAttachement attachment = fsk.attachment();
                if (attachment != null) {
                    attachment.recycleBuffers();
                }
                fsk = (FDTSelectionKey)this.selectionQueue.poll();
            }
        }
        catch (Throwable t) {
            logger.log(Level.WARNING, "Got exception", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void internalClose() {
        if (this.limiterTask != null) {
            this.limiterTask.cancel(true);
        }
        if (this.monitoringTaskFuture != null) {
            this.monitoringTaskFuture.cancel(false);
        }
        if (this.monitoringTask != null) {
            ScheduledThreadPoolExecutor monitoringService = Utils.getMonitoringExecService();
            monitoringService.remove(this.monitoringTask);
            monitoringService.purge();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
        if (this.fdtSession != null) {
            this.fdtSession.close(this.downMessage(), this.downCause());
        }
        for (Map.Entry<SocketChannel, FDTSelectionKey> entry : this.channels.entrySet()) {
            SocketChannel sc = entry.getKey();
            FDTSelectionKey fdtSelKey = entry.getValue();
            try {
                FDTKeyAttachement attach = fdtSelKey.attachment();
                if (attach != null) {
                    attach.recycleBuffers();
                }
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            try {
                sc.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            try {
                fdtSelKey.cancel();
            }
            catch (Throwable throwable) {}
        }
        this.channels.clear();
        ArrayList<SocketTask> arrayList = this.socketTasks;
        synchronized (arrayList) {
            for (SocketTask st : this.socketTasks) {
                try {
                    st.close(this.downMessage(), this.downCause());
                }
                catch (Throwable throwable) {}
            }
            this.socketTasks.clear();
        }
        this.clearSelectionQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean addSocketTask(SocketTask socketTask) {
        ArrayList<SocketTask> arrayList = this.socketTasks;
        synchronized (arrayList) {
            if (this.isClosed()) {
                socketTask.close(this.downMessage(), this.downCause());
                return false;
            }
            return this.socketTasks.add(socketTask);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startTransport(boolean sendCookie) throws Exception {
        if (this.endPointAddress != null) {
            InetSocketAddress addr = new InetSocketAddress(this.endPointAddress, this.port);
            ByteBuffer connectCookie = null;
            DirectByteBufferPool instance = DirectByteBufferPool.getInstance();
            try {
                connectCookie = instance.take();
                connectCookie.limit(17);
                connectCookie.put((byte)1).putLong(this.fdtSession.sessionID().getMostSignificantBits()).putLong(this.fdtSession.sessionID().getLeastSignificantBits());
                connectCookie.flip();
                this.addChannels(TCPTransportProvider.tryToConnect(addr, this.numberOfStreams, connectCookie, sendCookie), sendCookie);
            }
            finally {
                if (connectCookie != null) {
                    instance.put(connectCookie);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addChannels(List<SocketChannel> channels, boolean sentCookie) throws Exception {
        if (this.isClosed()) {
            throw new FDTProcolException("The transport provider is down");
        }
        Object object = this.closeLock;
        synchronized (object) {
            for (SocketChannel sc : channels) {
                this.addWorkerStream(sc, sentCookie);
            }
        }
    }

    @Override
    public void handleSelection(FDTSelectionKey fdtSelectionKey) {
        if (logger.isLoggable(Level.FINEST)) {
            logger.log(Level.FINEST, " [ TCPTransportProvider ]  [ SELECTION ] [ NBIO ] handle selection for " + Utils.toStringSelectionKey(fdtSelectionKey));
        }
        this.selectionQueue.add(fdtSelectionKey);
    }

    @Override
    public void canceled(FDTSelectionKey fdtSelectionKey) {
        SocketChannel sc;
        block5: {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "  [ SELECTION ] [ NBIO ] Canceled event for " + fdtSelectionKey);
            }
            sc = fdtSelectionKey.channel();
            try {
                sc.close();
            }
            catch (Throwable ignore) {
                if (!logger.isLoggable(Level.FINE)) break block5;
                logger.log(Level.FINE, " Got exception closing socket " + sc, ignore);
            }
        }
        FDTKeyAttachement attachement = fdtSelectionKey.attachment();
        if (attachement != null) {
            attachement.recycleBuffers();
        } else {
            logger.log(Level.WARNING, " Null attachement for fdtSelectionKey: " + fdtSelectionKey + " sc: " + sc);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addWorkerStream(SocketChannel channel, boolean sentCookie) throws Exception {
        ScheduledThreadPoolExecutor monitoringService = Utils.getMonitoringExecService();
        Object object = this.closeLock;
        synchronized (object) {
            if (this.monitoringTaskFuture == null && !this.closed) {
                this.monitoringTask = new NetSessionMonitoringTask(this);
                this.monitoringTaskFuture = monitoringService.scheduleWithFixedDelay(this.monitoringTask, 1L, 1L, TimeUnit.SECONDS);
            }
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, " TCPTransportProvider add working stream for channel: " + channel);
            }
            this.channels.put(channel, null);
        }
    }
}

