/*
 * Decompiled with CFR 0.152.
 */
package lia.util.net.copy;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.attribute.PosixFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import lia.util.net.common.Config;
import lia.util.net.common.MonitoringUtils;
import lia.util.net.common.Utils;
import lia.util.net.copy.Accountable;
import lia.util.net.copy.FDT;
import lia.util.net.copy.FDTSessionManager;
import lia.util.net.copy.FileSession;
import lia.util.net.copy.IOSession;
import lia.util.net.copy.monitoring.FDTSessionMonitoringTask;
import lia.util.net.copy.monitoring.lisa.LisaCtrlNotifier;
import lia.util.net.copy.transport.ControlChannel;
import lia.util.net.copy.transport.ControlChannelNotifier;
import lia.util.net.copy.transport.CtrlMsg;
import lia.util.net.copy.transport.FDTListFilesMsg;
import lia.util.net.copy.transport.FDTProcolException;
import lia.util.net.copy.transport.FDTSessionConfigMsg;
import lia.util.net.copy.transport.TCPTransportProvider;

public abstract class FDTSession
extends IOSession
implements ControlChannelNotifier,
Comparable<FDTSession>,
Accountable,
LisaCtrlNotifier {
    public static final short SERVER = 0;
    public static final short CLIENT = 1;
    public static final short COORDINATOR = 2;
    public static final int UNINITIALIZED = 0;
    public static final int STARTED = 1;
    public static final int INIT_CONF_SENT = 2;
    public static final int INIT_CONF_RCV = 4;
    public static final int FINAL_CONF_SENT = 8;
    public static final int FINAL_CONF_RCV = 16;
    public static final int START_SENT = 32;
    public static final int START_RCV = 64;
    public static final int TRANSFERING = 128;
    public static final int END_SENT = 256;
    public static final int END_RCV = 256;
    public static final int COORDINATOR_MSG_RCVD = 512;
    public static final int LIST_FILES_MSG_RCVD = 1024;
    public static final int MISSING_FILE = 2048;
    protected static final String[] FDT_SESION_STATES = new String[]{"UNINITIALIZED", "STARTED", "INIT_CONF_SENT", "INIT_CONF_RCV", "FINAL_CONF_SENT", "FINAL_CONF_RCV", "START_SENT", "START_RCV", "TRANSFERING", "END_SENT", "END_RCV"};
    private static final Logger logger = Logger.getLogger(FDTSession.class.getName());
    private static final String LISA_RATE_LIMIT_CMD = "limit";
    private static final Config config = Config.getInstance();
    protected final short role;
    protected final Object protocolLock = new Object();
    protected final Map<UUID, FileSession> fileSessions = new LinkedHashMap<UUID, FileSession>();
    protected final Map<UUID, byte[]> md5Sums = new LinkedHashMap<UUID, byte[]>();
    protected final boolean isNetTest;
    protected final Object ctrlNotifLock = new Object();
    protected final boolean customLog;
    final FDTSessionMonitoringTask monitoringTask;
    final ScheduledFuture<?> monitoringTaskFuture;
    private final Object lock = new Object();
    protected AtomicLong totalProcessedBytes;
    protected AtomicLong totalUtilBytes;
    protected String monID;
    protected short currentStatus;
    protected ServerSocketChannel ssc;
    protected ServerSocket ss;
    protected Selector sel;
    protected SocketChannel sc;
    protected Socket s;
    protected Map<Integer, LinkedList<FileSession>> partitionsMap;
    protected ControlChannel controlChannel;
    protected Set<UUID> finishedSessions = new TreeSet<UUID>();
    protected TCPTransportProvider transportProvider;
    protected AtomicBoolean postProcessingDone = new AtomicBoolean(false);
    protected boolean useFixedBlockSize = config.useFixedBlocks();
    protected boolean localLoop = config.localLoop();
    protected int transferPort;
    protected boolean isLoop = config.loop();
    protected String writeMode = config.getWriteMode();
    protected AtomicLong rateLimit = new AtomicLong(-1L);
    protected AtomicLong rateLimitDelay = new AtomicLong(300L);
    ExecutorService executor;
    AtomicBoolean ctrlThreadStarted = new AtomicBoolean(false);
    private volatile int historyState;
    private volatile int currentState;

    public FDTSession(short role, int transferPort) throws Exception {
        this.transferPort = transferPort;
        this.customLog = Utils.isCustomLog();
        this.currentStatus = 0;
        this.totalProcessedBytes = new AtomicLong(0L);
        this.totalUtilBytes = new AtomicLong(0L);
        this.setCurrentState(1);
        this.role = role;
        if (this.role == 1 || this.role == 2) {
            this.controlChannel = new ControlChannel(config.getHostName(), transferPort, this.sessionID(), (ControlChannelNotifier)this);
        }
        this.syncFDTConfig(this.controlChannel.remoteConf);
        this.rateLimit.set(config.getRateLimit());
        long remoteRateLimit = Utils.getLongValue(this.controlChannel.remoteConf, "-limit", -1L);
        this.rateLimitDelay.set(config.getRateLimitDelay());
        this.setNewRateLimit(remoteRateLimit, false);
        if (this.rateLimit.get() > 0L) {
            logger.log(Level.INFO, "Adding rate limit " + this.rateLimit + " bytes to the FDT session " + this.sessionID);
        }
        this.useFixedBlockSize = this.useFixedBlockSize || this.controlChannel.remoteConf.get("-fbs") != null;
        this.localLoop = this.localLoop || this.controlChannel.remoteConf.get("-ll") != null;
        this.isLoop = this.isLoop || this.controlChannel.remoteConf.get("-loop") != null;
        boolean isRemoteNetTest = this.controlChannel.remoteConf.get("-nettest") != null;
        boolean isLocalNetTest = config.isNetTest();
        boolean bl = this.isNetTest = isLocalNetTest || isRemoteNetTest;
        if (this.isNetTest) {
            logger.log(Level.INFO, "\n\n FDT started with " + (isLocalNetTest ? "local" : "remote") + " -nettest flag. Only network benchmark will be performed. The source and destination are *ignored*!\n");
        }
        if (this.writeMode == null) {
            this.writeMode = (String)this.controlChannel.remoteConf.get("-writeMode");
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "\n --> Fixed size blocks: " + this.useFixedBlockSize + " localLoop: " + this.localLoop + " for fdtSession: " + this.sessionID + " <---\n");
        }
        this.monitoringTask = new FDTSessionMonitoringTask(this);
        ScheduledThreadPoolExecutor monitoringService = Utils.getMonitoringExecService();
        this.monitoringTaskFuture = monitoringService.scheduleWithFixedDelay(this.monitoringTask, 1L, 5L, TimeUnit.SECONDS);
        this.monitoringTask.startSession();
    }

    private void syncFDTConfig(Map<String, Object> remoteConf) {
        if (remoteConf.get("-opentsdb") != null) {
            if (config.getFDTTag() != remoteConf.get("-fdtTAG") && remoteConf.get("-fdtTAG") != null) {
                config.setFDTTag((String)remoteConf.get("-fdtTAG"));
            }
            if (config.getOpentsdb() != remoteConf.get("-opentsdb") && remoteConf.get("-opentsdb") != null) {
                config.setOpentsdb((String)remoteConf.get("-opentsdb"));
            }
            try {
                FDT.initOpenTSDB(config);
            }
            catch (Exception e) {
                logger.log(Level.WARNING, "Failed to initOpenTSDB monitor task", e);
            }
        }
    }

    public int getTransferPort() {
        return this.transferPort;
    }

    public FDTSession(ControlChannel controlChannel, short role) throws Exception {
        super(controlChannel.fdtSessionID());
        this.customLog = Utils.isCustomLog();
        this.currentStatus = 0;
        this.setCurrentState(1);
        this.controlChannel = controlChannel;
        this.role = role;
        this.totalProcessedBytes = new AtomicLong(0L);
        this.totalUtilBytes = new AtomicLong(0L);
        this.rateLimit.set(config.getRateLimit());
        this.rateLimitDelay.set(config.getRateLimitDelay());
        long remoteRateLimit = Utils.getLongValue(controlChannel.remoteConf, "-limit", -1L);
        this.setNewRateLimit(remoteRateLimit, false);
        if (this.rateLimit.get() > 0L) {
            logger.log(Level.INFO, "Adding rate limit " + this.rateLimit + " bytes to the FDT session " + this.sessionID);
        }
        this.useFixedBlockSize = this.useFixedBlockSize || this.controlChannel.remoteConf.get("-fbs") != null;
        this.localLoop = this.localLoop || this.controlChannel.remoteConf.get("-ll") != null;
        this.isLoop = this.isLoop || this.controlChannel.remoteConf.get("-loop") != null;
        boolean isRemoteNetTest = controlChannel.remoteConf.get("-nettest") != null;
        boolean isLocalNetTest = config.isNetTest();
        boolean bl = this.isNetTest = isLocalNetTest || isRemoteNetTest;
        if (this.isNetTest) {
            logger.log(Level.INFO, "\n\n FDT started with " + (isLocalNetTest ? "local" : "remote") + " -nettest flag. Only network benchmark will be performed. The source and destination are *ignored*!\n");
        }
        if (this.writeMode == null) {
            this.writeMode = (String)this.controlChannel.remoteConf.get("-writeMode");
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "\n --> Fixed size blocks: " + this.useFixedBlockSize + " localLoop: " + this.localLoop + " for fdtSession: " + this.sessionID + " <---\n");
        }
        this.monitoringTask = new FDTSessionMonitoringTask(this);
        ScheduledThreadPoolExecutor monitoringService = Utils.getMonitoringExecService();
        this.monitoringTaskFuture = monitoringService.scheduleWithFixedDelay(this.monitoringTask, 1L, 5L, TimeUnit.SECONDS);
        this.monitoringTask.startSession();
    }

    public static List<String> getListOfFiles() {
        logger.log(Level.FINEST, " [ getListOfFiles ] ");
        File[] filesList = new File(config.getListFilesFrom()).listFiles();
        ArrayList<String> listOfFiles = new ArrayList<String>();
        if (filesList != null) {
            for (File fileInDir : filesList) {
                if (!fileInDir.canRead()) continue;
                listOfFiles.add(FDTSession.getFileListEntry(fileInDir));
            }
        }
        logger.log(Level.FINEST, " [ getListOfFiles ] file list collected from directory: " + config.getListFilesFrom());
        return listOfFiles;
    }

    private static String getFileListEntry(File fileInDir) {
        StringBuilder sb = new StringBuilder();
        try {
            PosixFileAttributes fa = Files.readAttributes(fileInDir.toPath(), PosixFileAttributes.class, LinkOption.NOFOLLOW_LINKS);
            sb.append(fa.isDirectory() ? "d" : (fa.isSymbolicLink() ? "l" : (fa.isRegularFile() ? "f" : "-")));
            sb.append(fileInDir.canRead() ? "r" : "-");
            sb.append(fileInDir.canWrite() ? "w" : "-");
            sb.append(fileInDir.canExecute() ? "x" : "-");
            sb.append("\t");
            sb.append(fa.owner());
            sb.append(fa.owner().getName().length() < 4 ? "\t\t" : "\t");
            sb.append(fa.group());
            sb.append(fa.group().getName().length() < 4 ? "\t\t" : "\t");
            sb.append(fa.size());
            sb.append(String.valueOf(fa.size()).length() < 4 ? "\t\t" : "\t");
            sb.append(fa.lastModifiedTime().toString());
            sb.append("\t");
            sb.append(fa.isDirectory() ? fileInDir.getName() + "/" : fileInDir.getName());
        }
        catch (IOException e) {
            logger.log(Level.WARNING, "Failed to get file attributes", e);
        }
        logger.info(sb.toString());
        return sb.toString();
    }

    final void startControlThread() {
        if (this.ctrlThreadStarted.compareAndSet(false, true)) {
            new Thread((Runnable)this.controlChannel, "Control channel for [ " + config.getHostName() + ":" + this.transferPort + " ]").start();
        }
    }

    public String getMonID() {
        return this.monID;
    }

    public FDTSessionMonitoringTask getMonitoringTask() {
        return this.monitoringTask;
    }

    public final void setNewRateLimit(long newRate, boolean ctrlSet) {
        long cLimit = this.rateLimit.get();
        if (ctrlSet) {
            cLimit = newRate;
        } else if (newRate != cLimit && newRate > 0L) {
            cLimit = newRate;
        }
        if (cLimit > 0L && cLimit < (long)Config.NETWORK_BUFF_LEN_SIZE) {
            cLimit = Config.NETWORK_BUFF_LEN_SIZE;
            logger.log(Level.WARNING, " The rate limit is too small. It will be set to " + this.rateLimit.get() + " Bytes/s");
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "[ FDTSession ] [ setNewRateLimit ( " + newRate + " ) ] prevRateLimit: " + this.rateLimit.get() + " newRateLimit: " + cLimit);
        }
        this.rateLimit.set(cLimit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setCurrentState(int newState) {
        Object object = this.ctrlNotifLock;
        synchronized (object) {
            try {
                if (this.currentState == 256) {
                    return;
                }
                this.currentState = newState;
                this.historyState |= newState;
            }
            finally {
                this.ctrlNotifLock.notifyAll();
            }
        }
    }

    public final int currentState() {
        return this.currentState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMD5Sum(UUID fileSessionID, byte[] md5Sum) {
        Map<UUID, byte[]> map = this.md5Sums;
        synchronized (map) {
            this.md5Sums.put(fileSessionID, md5Sum);
        }
    }

    public short getCurrentStatus() {
        return this.currentStatus;
    }

    protected final int historyState() {
        return this.historyState;
    }

    public TCPTransportProvider getTransportProvider() {
        return this.transportProvider;
    }

    public InetAddress getRemoteAddress() {
        return this.controlChannel.remoteAddress;
    }

    public int getRemotePort() {
        return this.controlChannel.remotePort;
    }

    public long getRateLimit() {
        return this.rateLimit.get();
    }

    public long getRateLimitDelay() {
        return this.rateLimitDelay.get();
    }

    public int getLocalPort() {
        return this.controlChannel.localPort;
    }

    public String toString() {
        return "FDTSession ( " + this.sessionID + " ) / " + (this.controlChannel != null ? this.controlChannel.toString() : "null");
    }

    public FileSession getFileSession(UUID fileSessionID) {
        return this.fileSessions.get(fileSessionID);
    }

    public abstract void handleInitFDTSessionConf(CtrlMsg var1) throws Exception;

    public abstract void handleFinalFDTSessionConf(CtrlMsg var1) throws Exception;

    public abstract void handleStartFDTSession(CtrlMsg var1) throws Exception;

    public abstract void handleEndFDTSession(CtrlMsg var1) throws Exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void notifyCtrlMsg(ControlChannel controlChannel, Object o) throws FDTProcolException {
        block18: {
            if (o == null) {
                FDTProcolException fpe = new FDTProcolException("Null control message");
                fpe.fillInStackTrace();
                this.close("FileProtocolException", fpe);
                throw fpe;
            }
            try {
                if (o instanceof CtrlMsg) {
                    CtrlMsg ctrlMsg = (CtrlMsg)o;
                    if (logger.isLoggable(Level.FINEST)) {
                        logger.log(Level.FINEST, " Got CtrlMessage for " + controlChannel + ":\n" + ctrlMsg);
                    }
                    Object object = this.protocolLock;
                    synchronized (object) {
                        switch (ctrlMsg.tag) {
                            case 6: {
                                this.setCurrentState(4);
                                this.handleInitFDTSessionConf(ctrlMsg);
                                break;
                            }
                            case 7: {
                                this.setCurrentState(16);
                                this.handleFinalFDTSessionConf(ctrlMsg);
                                break;
                            }
                            case 9: {
                                this.setCurrentState(64);
                                this.handleStartFDTSession(ctrlMsg);
                                break;
                            }
                            case 10: {
                                this.setCurrentState(256);
                                this.handleEndFDTSession(ctrlMsg);
                                break;
                            }
                            case 13: {
                                this.setCurrentState(512);
                                this.handleCoordinatorMessage(ctrlMsg);
                                break;
                            }
                            case 14: {
                                this.setCurrentState(1024);
                                this.handleListFilesMessage(ctrlMsg);
                                break;
                            }
                            case 15: {
                                this.setCurrentState(512);
                                this.handleGetRemoteTransferPortMessage(ctrlMsg);
                                break;
                            }
                            case 16: {
                                this.setCurrentState(2048);
                                this.handleFileNotFound(ctrlMsg);
                                break;
                            }
                            default: {
                                FDTProcolException fpe = new FDTProcolException("Illegal CtrlMsg tag [ " + ctrlMsg.tag + " ]");
                                fpe.fillInStackTrace();
                                this.close("FileProtocolException", fpe);
                                throw fpe;
                            }
                        }
                        break block18;
                    }
                }
                logger.log(Level.WARNING, " Got unknown message on control channel", o);
            }
            catch (Throwable t) {
                this.close("Got exception trying to process", t);
            }
        }
    }

    private void handleFileNotFound(CtrlMsg ctrlMsg) {
        logger.log(Level.WARNING, "[ FDTSession ] [ handleFileNotFound File not found:  ( " + ctrlMsg.message.toString() + " )");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleCoordinatorMessage(CtrlMsg ctrlMsg) {
        logger.log(Level.INFO, "[ FDTSession ] [ handleCoordinatorMessage ( " + ctrlMsg.message.toString() + " )");
        Map<String, Object> oldConfig = config.getConfigMap();
        try {
            FDTSessionConfigMsg sessionConfig = (FDTSessionConfigMsg)ctrlMsg.message;
            config.setDestinationDir(sessionConfig.destinationDir);
            config.setCoordinatorMode(false);
            config.setDestinationIP(sessionConfig.destinationIP);
            config.setHostName(sessionConfig.destinationIP);
            config.setFileList(sessionConfig.fileLists);
            config.setPortNo(sessionConfig.destinationPort);
            ControlChannel ctrlChann = this.controlChannel;
            int remoteTransferPort = this.getFDTTransferPort(sessionConfig.destinationPort);
            if (remoteTransferPort > 0) {
                FDTSession session = FDTSessionManager.getInstance().addFDTClientSession(remoteTransferPort);
                ctrlChann.sendSessionIDToCoordinator(new CtrlMsg(13, session.controlChannel.fdtSessionID().toString()));
            } else {
                ctrlChann.sendSessionIDToCoordinator(new CtrlMsg(13, "-1"));
            }
        }
        catch (Exception ex) {
            logger.log(Level.WARNING, "Exception while handling coordinator message", ex);
        }
        finally {
            this.setClosed(true);
            config.setConfigMap(oldConfig);
        }
    }

    public int getFDTTransferPort(int destinationMsgPort) throws Exception {
        ControlChannel cc = new ControlChannel(config.getHostName(), destinationMsgPort, UUID.randomUUID(), (ControlChannelNotifier)FDTSessionManager.getInstance());
        int transferPort = cc.sendTransferPortMessage(new CtrlMsg(15, "rtp"));
        logger.log(Level.INFO, "Got transfer port: " + config.getHostName() + ":" + transferPort);
        return transferPort;
    }

    private void handleListFilesMessage(CtrlMsg ctrlMsg) {
        logger.log(Level.INFO, "[ FDTSession ] [ handleListFilesMessage ( " + ctrlMsg.message.toString() + " )");
        try {
            FDTListFilesMsg lsMsg = (FDTListFilesMsg)ctrlMsg.message;
            config.setListFilesFrom(lsMsg.listFilesFrom);
            lsMsg.filesInDir = FDTSession.getListOfFiles();
            logger.log(Level.FINEST, "[ FDTSession ] [ handleListFilesMessage ] collected " + lsMsg.filesInDir.size());
            this.controlChannel.sendCtrlMessage(new CtrlMsg(14, lsMsg));
            this.controlChannel.emptyMsgQueue();
        }
        catch (Exception ex) {
            logger.log(Level.WARNING, "Exception while handling 'list files in dir' message", ex);
        }
        finally {
            this.setClosed(true);
        }
    }

    private void handleGetRemoteTransferPortMessage(CtrlMsg ctrlMsg) {
        logger.log(Level.INFO, "[ FDTSession ] [ handleGetRemoteTransferPortMessage ( " + ctrlMsg.message.toString() + " )");
        try {
            int newTransferPort = config.getNewRemoteTransferPort();
            if (newTransferPort > 0) {
                this.openSocketForTransferPort(newTransferPort);
                this.controlChannel.sendCtrlMessage(new CtrlMsg(15, newTransferPort));
                this.controlChannel.emptyMsgQueue();
                this.internalClose();
                logger.log(Level.INFO, "[ FDTSession ] [ handleGetRemoteTransferPortMessage ( closing session )");
                Utils.waitAndWork(this.executor, this.ss, this.sel, config);
            } else {
                this.controlChannel.sendCtrlMessage(new CtrlMsg(15, -1));
                this.controlChannel.emptyMsgQueue();
                logger.warning("There are no free transfer ports at this moment, please try again later");
            }
        }
        catch (Exception ex) {
            logger.log(Level.WARNING, "Exception while handling 'get remote transfer port' message", ex);
        }
        finally {
            this.setClosed(true);
        }
    }

    private void openSocketForTransferPort(int port) throws IOException {
        this.executor = Utils.getStandardExecService("[ Acceptable ServersThreadPool ] ", 2, 10, new ArrayBlockingQueue<Runnable>(65500), 5);
        this.ssc = ServerSocketChannel.open();
        this.ssc.configureBlocking(false);
        this.ss = this.ssc.socket();
        String listenIP = config.getListenAddress();
        if (listenIP == null) {
            this.ss.bind(new InetSocketAddress(port));
        } else {
            this.ss.bind(new InetSocketAddress(InetAddress.getByName(listenIP), port));
        }
        this.sel = Selector.open();
        this.ssc.register(this.sel, 16);
        this.sc = this.ssc.accept();
        config.setSessionSocket(this.ssc, this.ss, this.sc, this.s, port);
    }

    protected void buildPartitionMap() {
        if (logger.isLoggable(Level.FINEST)) {
            logger.log(Level.FINEST, " Building PMap for " + this.fileSessions);
        }
        this.partitionsMap = new HashMap<Integer, LinkedList<FileSession>>();
        for (FileSession fs : this.fileSessions.values()) {
            if (this.finishedSessions.contains(fs.sessionID)) continue;
            LinkedList<FileSession> ll = this.partitionsMap.get(fs.partitionID);
            if (ll == null) {
                ll = new LinkedList();
                this.partitionsMap.put(fs.partitionID, ll);
            }
            ll.add(fs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finishFileSession(UUID sessionID, Throwable downCause) {
        FileSession fs = null;
        boolean bFinest = logger.isLoggable(Level.FINEST);
        boolean bFiner = bFinest || logger.isLoggable(Level.FINER);
        boolean bFine = bFiner || logger.isLoggable(Level.FINE);
        Object object = this.lock;
        synchronized (object) {
            fs = this.fileSessions.get(sessionID);
            if (fs != null) {
                if (!this.isLoop) {
                    if (!this.finishedSessions.add(sessionID)) {
                        if (bFine) {
                            logger.log(Level.FINE, " [ FDTSession ] [ HANDLED ] The fileSession [ " + sessionID + " ] is already in the finised sessions list");
                        }
                        if (bFinest) {
                            Thread.dumpStack();
                        }
                    } else {
                        if (downCause == null) {
                            logger.log(Level.INFO, fs.fileName + " STATUS: OK");
                        }
                        if (logger.isLoggable(Level.FINE)) {
                            logger.log(Level.FINE, " [ FDTSession ] [ HANDLED ] The fileSession [ " + sessionID + " ] added to finised sessions list");
                        }
                    }
                } else if (bFiner) {
                    logger.log(Level.FINER, " I was supposed to finish ( " + sessionID + " ], but runnig in loop mode");
                }
            }
            if (downCause != null) {
                logger.log(Level.WARNING, fs.fileName + " STATUS: FAILED");
                this.close("the file session: " + sessionID + " / " + fs.fileName + " finished with errors: " + downCause.getMessage(), downCause);
            }
        }
        try {
            if (fs != null) {
                fs.close(null, downCause);
            } else {
                logger.log(Level.WARNING, " The session [ " + sessionID + " ] is not in my session list");
            }
        }
        catch (Throwable t) {
            logger.log(Level.WARNING, " Got exception closing file session " + fs, t);
        }
    }

    public boolean useFixedBlockSize() {
        return this.useFixedBlockSize;
    }

    public boolean localLoop() {
        return this.localLoop;
    }

    public boolean loop() {
        return this.isLoop;
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof FDTSession)) {
            return false;
        }
        return this.sessionID.equals(((FDTSession)obj).sessionID);
    }

    public int hashCode() {
        return this.sessionID.hashCode();
    }

    @Override
    public int compareTo(FDTSession fdtSession) {
        return this.sessionID.compareTo(fdtSession.sessionID);
    }

    @Override
    public long getUtilBytes() {
        return this.totalUtilBytes.get();
    }

    @Override
    public long getTotalBytes() {
        return this.totalProcessedBytes.get();
    }

    @Override
    public long addAndGetUtilBytes(long delta) {
        return this.totalUtilBytes.addAndGet(delta);
    }

    @Override
    public long addAndGetTotalBytes(long delta) {
        return this.totalProcessedBytes.addAndGet(delta);
    }

    @Override
    public abstract long getSize();

    @Override
    protected void internalClose() throws Exception {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "FDTSession " + this.sessionID + " finished. Internal close called.");
        }
        this.currentStatus = this.downCause() != null && this.downMessage() != null ? (short)1 : 0;
        if (this.monitoringTaskFuture != null) {
            this.monitoringTaskFuture.cancel(false);
        }
        if (this.monitoringTask != null) {
            ScheduledThreadPoolExecutor monitoringService = Utils.getMonitoringExecService();
            monitoringService.remove(this.monitoringTask);
            monitoringService.purge();
            this.monitoringTask.finishSession();
        }
        if (config.getMonitor().equals("OPENTSDB")) {
            MonitoringUtils monUtils = new MonitoringUtils(config, this);
            monUtils.monitorFinish(System.currentTimeMillis(), this.role == 1 ? "Readers" : "Writers");
        }
    }

    @Override
    public void notifyCtrlSessionDown(ControlChannel controlChannel, Throwable cause) {
        this.close("ControlChannel is down", cause);
    }

    @Override
    public void notifyLisaCtrlMsg(String lisaCtrlMsg) {
        if (logger.isLoggable(Level.FINE)) {
            logger.log(Level.FINE, "FDT Session [ " + this.sessionID + " / " + this.monID + " ] received remote ctrl cmd: " + lisaCtrlMsg);
        }
        if (lisaCtrlMsg.indexOf(LISA_RATE_LIMIT_CMD) >= 0) {
            long newLimit = -1L;
            try {
                newLimit = Long.parseLong(lisaCtrlMsg.split("(\\s)+")[1]);
            }
            catch (Throwable t) {
                logger.log(Level.INFO, "FDT Session [ " + this.sessionID + " / " + this.monID + " ] unable to set new rate limit", t);
            }
            long oldRateLimit = this.rateLimit.get();
            if (newLimit > 0L) {
                this.setNewRateLimit(newLimit, true);
            }
            if (oldRateLimit != this.rateLimit.get()) {
                logger.log(Level.INFO, "FDT Session [ " + this.sessionID + " / " + this.monID + " ] oldrate: " + oldRateLimit + " / newrate: " + this.rateLimit.get());
            }
        }
    }

    public boolean isNetTest() {
        return this.isNetTest;
    }
}

