/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.journal;

import com.bigdata.concurrent.NonBlockingLockManagerWithNewDesign;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
import com.bigdata.journal.AbstractJournal;
import com.bigdata.journal.AbstractTask;
import com.bigdata.journal.IConcurrencyManager;
import com.bigdata.journal.ILocalTransactionManager;
import com.bigdata.journal.IResourceManager;
import com.bigdata.journal.WriteExecutorService;
import com.bigdata.resources.StoreManager;
import com.bigdata.service.AbstractDistributedFederation;
import com.bigdata.service.IBigdataClient;
import com.bigdata.service.IServiceShutdown;
import com.bigdata.util.DaemonThreadFactory;
import com.bigdata.util.concurrent.TaskCounters;
import com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask;
import com.bigdata.util.concurrent.WriteTaskCounters;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;

public class ConcurrencyManager
implements IConcurrencyManager {
    private static final Logger log = Logger.getLogger(ConcurrencyManager.class);
    private final Properties properties;
    private final ILocalTransactionManager transactionManager;
    private final IResourceManager resourceManager;
    private final long serviceStartTime = System.currentTimeMillis();
    private volatile boolean open = true;
    private final ThreadPoolExecutor txWriteService;
    private final ThreadPoolExecutor readService;
    private final WriteExecutorService writeService;
    private final boolean collectQueueStatistics;
    private final ScheduledExecutorService sampleService;
    private final long shutdownTimeout;
    protected final WriteTaskCounters countersUN = new WriteTaskCounters();
    protected final TaskCounters countersTX = new TaskCounters();
    protected final TaskCounters countersHR = new TaskCounters();
    private final ThreadPoolExecutorStatisticsTask writeServiceQueueStatisticsTask;
    private final ThreadPoolExecutorStatisticsTask txWriteServiceQueueStatisticsTask;
    private final ThreadPoolExecutorStatisticsTask readServiceQueueStatisticsTask;
    private static final boolean backoff = false;

    private void assertOpen() {
        if (!this.open) {
            throw new IllegalStateException();
        }
    }

    @Override
    public WriteExecutorService getWriteService() {
        this.assertOpen();
        return this.writeService;
    }

    @Override
    public ILocalTransactionManager getTransactionManager() {
        this.assertOpen();
        return this.transactionManager;
    }

    @Override
    public IResourceManager getResourceManager() {
        this.assertOpen();
        return this.resourceManager;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public synchronized void shutdown() {
        long elapsed;
        if (!this.isOpen()) {
            return;
        }
        this.open = false;
        if (log.isInfoEnabled()) {
            log.info("begin");
        }
        long begin = System.currentTimeMillis();
        long shutdownTimeout = this.shutdownTimeout == 0L ? Long.MAX_VALUE : this.shutdownTimeout;
        TimeUnit unit = TimeUnit.MILLISECONDS;
        this.txWriteService.shutdown();
        this.readService.shutdown();
        this.writeService.shutdown();
        if (this.sampleService != null) {
            this.sampleService.shutdown();
        }
        try {
            if (log.isInfoEnabled()) {
                log.info("Awaiting transaction service termination");
            }
            if (!this.txWriteService.awaitTermination(shutdownTimeout - (elapsed = System.currentTimeMillis() - begin), unit)) {
                log.warn("Transaction service termination: timeout");
            }
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted awaiting transaction service termination.", ex);
        }
        try {
            if (log.isInfoEnabled()) {
                log.info("Awaiting read service termination");
            }
            if (!this.readService.awaitTermination(shutdownTimeout - (elapsed = System.currentTimeMillis() - begin), unit)) {
                log.warn("Read service termination: timeout");
            }
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted awaiting read service termination.", ex);
        }
        try {
            long elapsed2 = System.currentTimeMillis() - begin;
            long timeout = shutdownTimeout - elapsed2;
            if (log.isInfoEnabled()) {
                log.info("Awaiting write service termination: will wait " + timeout + "ms");
            }
            if (!this.writeService.awaitTermination(timeout, unit)) {
                log.warn("Write service termination : timeout");
            }
        }
        catch (InterruptedException ex) {
            log.warn("Interrupted awaiting write service termination.", ex);
        }
        long elapsed3 = System.currentTimeMillis() - begin;
        if (log.isInfoEnabled()) {
            log.info("Done: elapsed=" + elapsed3 + "ms");
        }
    }

    @Override
    public void shutdownNow() {
        if (!this.isOpen()) {
            return;
        }
        this.open = false;
        if (log.isInfoEnabled()) {
            log.info("begin");
        }
        long begin = System.currentTimeMillis();
        this.txWriteService.shutdownNow();
        this.readService.shutdownNow();
        this.writeService.shutdownNow();
        if (this.sampleService != null) {
            this.sampleService.shutdownNow();
        }
        long elapsed = System.currentTimeMillis() - begin;
        if (log.isInfoEnabled()) {
            log.info("Done: elapsed=" + elapsed + "ms");
        }
    }

    public ConcurrencyManager(Properties properties, ILocalTransactionManager transactionManager, IResourceManager resourceManager) {
        boolean synchronousQueue;
        int writeServiceQueueCapacity;
        int writeServiceMaximumPoolSize;
        int readServicePoolSize;
        if (properties == null) {
            throw new IllegalArgumentException();
        }
        if (transactionManager == null) {
            throw new IllegalArgumentException();
        }
        if (resourceManager == null) {
            throw new IllegalArgumentException();
        }
        this.properties = properties;
        this.transactionManager = transactionManager;
        this.resourceManager = resourceManager;
        String val = properties.getProperty(Options.TX_SERVICE_CORE_POOL_SIZE, "0");
        int txServicePoolSize = Integer.parseInt(val);
        if (txServicePoolSize < 0) {
            throw new RuntimeException("The '" + Options.TX_SERVICE_CORE_POOL_SIZE + "' must be non-negative.");
        }
        if (log.isInfoEnabled()) {
            log.info(Options.TX_SERVICE_CORE_POOL_SIZE + "=" + txServicePoolSize);
        }
        if ((readServicePoolSize = Integer.parseInt(val = properties.getProperty(Options.READ_SERVICE_CORE_POOL_SIZE, "0"))) < 0) {
            throw new RuntimeException("The '" + Options.READ_SERVICE_CORE_POOL_SIZE + "' must be non-negative.");
        }
        if (log.isInfoEnabled()) {
            log.info(Options.READ_SERVICE_CORE_POOL_SIZE + "=" + readServicePoolSize);
        }
        val = properties.getProperty(Options.SHUTDOWN_TIMEOUT, "0");
        this.shutdownTimeout = Long.parseLong(val);
        if (this.shutdownTimeout < 0L) {
            throw new RuntimeException("The '" + Options.SHUTDOWN_TIMEOUT + "' must be non-negative.");
        }
        if (log.isInfoEnabled()) {
            log.info(Options.SHUTDOWN_TIMEOUT + "=" + this.shutdownTimeout);
        }
        this.txWriteService = txServicePoolSize == 0 ? (ThreadPoolExecutor)Executors.newCachedThreadPool(new DaemonThreadFactory(this.getClass().getName() + ".txWriteService")) : (ThreadPoolExecutor)Executors.newFixedThreadPool(txServicePoolSize, new DaemonThreadFactory(this.getClass().getName() + ".txWriteService"));
        this.readService = readServicePoolSize == 0 ? (ThreadPoolExecutor)Executors.newCachedThreadPool(new DaemonThreadFactory(this.getClass().getName() + ".readService")) : (ThreadPoolExecutor)Executors.newFixedThreadPool(readServicePoolSize, new DaemonThreadFactory(this.getClass().getName() + ".readService"));
        int writeServiceCorePoolSize = Integer.parseInt(properties.getProperty(Options.WRITE_SERVICE_CORE_POOL_SIZE, "10"));
        if (writeServiceCorePoolSize < 0) {
            throw new RuntimeException("The '" + Options.WRITE_SERVICE_CORE_POOL_SIZE + "' must be non-negative.");
        }
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_CORE_POOL_SIZE + "=" + writeServiceCorePoolSize);
        }
        if ((writeServiceMaximumPoolSize = Integer.parseInt(properties.getProperty(Options.WRITE_SERVICE_MAXIMUM_POOL_SIZE, "50"))) < writeServiceCorePoolSize) {
            throw new RuntimeException("The '" + Options.WRITE_SERVICE_MAXIMUM_POOL_SIZE + "' must be greater than the core pool size.");
        }
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_MAXIMUM_POOL_SIZE + "=" + writeServiceMaximumPoolSize);
        }
        if ((writeServiceQueueCapacity = Integer.parseInt(properties.getProperty(Options.WRITE_SERVICE_QUEUE_CAPACITY, "0"))) < 0) {
            throw new RuntimeException("The '" + Options.WRITE_SERVICE_QUEUE_CAPACITY + "' must be non-negative.");
        }
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_QUEUE_CAPACITY + "=" + writeServiceQueueCapacity);
        }
        boolean writeServicePrestart = Boolean.parseBoolean(properties.getProperty(Options.WRITE_SERVICE_PRESTART_ALL_CORE_THREADS, "false"));
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_PRESTART_ALL_CORE_THREADS + "=" + writeServicePrestart);
        }
        long groupCommitTimeout = Long.parseLong(properties.getProperty(Options.WRITE_SERVICE_GROUP_COMMIT_TIMEOUT, "100"));
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_GROUP_COMMIT_TIMEOUT + "=" + groupCommitTimeout);
        }
        long overflowLockRequestTimeout = Long.parseLong(properties.getProperty(Options.WRITE_SERVICE_OVERFLOW_LOCK_REQUEST_TIMEOUT, "120000"));
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_OVERFLOW_LOCK_REQUEST_TIMEOUT + "=" + overflowLockRequestTimeout);
        }
        long keepAliveTime = Long.parseLong(properties.getProperty(Options.WRITE_SERVICE_KEEP_ALIVE_TIME, "60000"));
        if (log.isInfoEnabled()) {
            log.info(Options.WRITE_SERVICE_KEEP_ALIVE_TIME + "=" + keepAliveTime);
        }
        boolean bl = synchronousQueue = writeServiceQueueCapacity == 0;
        AbstractQueue queue = synchronousQueue ? new SynchronousQueue() : (writeServiceQueueCapacity == Integer.MAX_VALUE ? new LinkedBlockingQueue(writeServiceQueueCapacity) : new ArrayBlockingQueue(writeServiceQueueCapacity));
        this.writeService = new WriteExecutorService(resourceManager, writeServiceCorePoolSize, synchronousQueue ? Integer.MAX_VALUE : writeServiceMaximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)((Object)queue), new DaemonThreadFactory(this.getClass().getName() + ".writeService"), groupCommitTimeout, overflowLockRequestTimeout);
        if (writeServicePrestart) {
            this.getWriteService().prestartAllCoreThreads();
        }
        this.collectQueueStatistics = Boolean.parseBoolean(properties.getProperty(IBigdataClient.Options.COLLECT_QUEUE_STATISTICS, "true"));
        if (log.isInfoEnabled()) {
            log.info(IBigdataClient.Options.COLLECT_QUEUE_STATISTICS + "=" + this.collectQueueStatistics);
        }
        if (this.collectQueueStatistics) {
            double w = 0.2;
            long initialDelay = 0L;
            long delay = 1000L;
            TimeUnit unit = TimeUnit.MILLISECONDS;
            this.writeServiceQueueStatisticsTask = new ThreadPoolExecutorStatisticsTask("writeService", this.getWriteService(), this.countersUN, 0.2);
            this.txWriteServiceQueueStatisticsTask = new ThreadPoolExecutorStatisticsTask("txWriteService", this.txWriteService, this.countersTX, 0.2);
            this.readServiceQueueStatisticsTask = new ThreadPoolExecutorStatisticsTask("readService", this.readService, this.countersHR, 0.2);
            this.sampleService = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(this.getClass().getName() + ".sampleService"));
            this.sampleService.scheduleWithFixedDelay(this.writeServiceQueueStatisticsTask, 0L, 1000L, unit);
            this.sampleService.scheduleWithFixedDelay(this.getWriteService().getLockManager().statisticsTask, 0L, 1000L, unit);
            this.sampleService.scheduleWithFixedDelay(this.txWriteServiceQueueStatisticsTask, 0L, 1000L, unit);
            this.sampleService.scheduleWithFixedDelay(this.readServiceQueueStatisticsTask, 0L, 1000L, unit);
        } else {
            this.writeServiceQueueStatisticsTask = null;
            this.txWriteServiceQueueStatisticsTask = null;
            this.readServiceQueueStatisticsTask = null;
            this.sampleService = null;
        }
    }

    @Override
    public CounterSet getCounters() {
        CounterSet countersRoot = new CounterSet();
        countersRoot.addCounter("elapsed", new ServiceElapsedTimeInstrument(this.serviceStartTime));
        if (this.collectQueueStatistics) {
            countersRoot.makePath("Read Service").attach(this.readServiceQueueStatisticsTask.getCounters());
            countersRoot.makePath("Transaction Write Service").attach(this.txWriteServiceQueueStatisticsTask.getCounters());
            countersRoot.makePath("Unisolated Write Service").attach(this.writeServiceQueueStatisticsTask.getCounters());
            countersRoot.makePath("Unisolated Write Service/LockManager").attach(this.getWriteService().getLockManager().getCounters());
        }
        return countersRoot;
    }

    @Override
    public <T> FutureTask<T> submit(AbstractTask<T> task) {
        this.assertOpen();
        task.nanoTime_submitTask = System.nanoTime();
        if (task.readOnly) {
            if (log.isInfoEnabled()) {
                log.info("Submitted to the read service: " + task.getClass().getName() + ", timestamp=" + task.timestamp);
            }
            return this.submitWithDynamicLatency(task, this.readService, this.countersHR);
        }
        if (task.isReadWriteTx) {
            if (log.isInfoEnabled()) {
                log.info("Submitted to the transaction service: " + task.getClass().getName() + ", timestamp=" + task.timestamp);
            }
            return this.submitWithDynamicLatency(task, this.txWriteService, this.countersTX);
        }
        if (log.isInfoEnabled()) {
            log.info("Submitted to the write service: " + task.getClass().getName() + ", timestamp=" + task.timestamp);
        }
        return this.submitWithDynamicLatency(task, this.getWriteService(), this.countersUN);
    }

    private void journalOverextended(AbstractTask<?> task) {
        double overextension = this.getJournalOverextended();
        if (overextension >= 2.0) {
            log.error("overextended=" + (int)overextension + "x : " + task.toString());
        }
    }

    public double getJournalOverextended() {
        if (!this.resourceManager.isOverflowEnabled()) {
            return 0.0;
        }
        try {
            if (!(this.resourceManager.getFederation() instanceof AbstractDistributedFederation)) {
                return 0.0;
            }
        }
        catch (UnsupportedOperationException ex) {
            return 0.0;
        }
        AbstractJournal journal = this.resourceManager.getLiveJournal();
        return (double)journal.size() / (double)journal.getMaximumExtent();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> FutureTask<T> submitWithDynamicLatency(AbstractTask<T> task, ExecutorService service, TaskCounters taskCounters) {
        FutureTask ft;
        AtomicLong atomicLong = taskCounters.lastArrivalNanoTime;
        synchronized (atomicLong) {
            long lastArrivalNanoTime = taskCounters.lastArrivalNanoTime.get();
            long now = System.nanoTime();
            long delta = now - lastArrivalNanoTime;
            taskCounters.interArrivalNanoTime.addAndGet(delta);
            taskCounters.lastArrivalNanoTime.set(now);
        }
        taskCounters.taskSubmitCount.incrementAndGet();
        if (this.resourceManager instanceof StoreManager && !((StoreManager)this.resourceManager).awaitRunning()) {
            throw new RejectedExecutionException("StoreManager is not available");
        }
        if (service == this.readService) {
            this.journalOverextended(task);
        }
        if (service instanceof WriteExecutorService) {
            NonBlockingLockManagerWithNewDesign<String> lockManager = ((WriteExecutorService)service).getLockManager();
            ft = lockManager.submit((Comparable[])task.getResource(), task);
        } else {
            ft = new FutureTask(task);
            service.submit(ft);
        }
        return ft;
    }

    void abortAllTx() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends AbstractTask<T>> tasks) throws InterruptedException {
        this.assertOpen();
        LinkedList<Future<T>> futures = new LinkedList<Future<T>>();
        boolean done = false;
        try {
            for (AbstractTask<T> abstractTask : tasks) {
                futures.add(this.submit(abstractTask));
            }
            for (Future future : futures) {
                if (future.isDone()) continue;
                try {
                    future.get();
                }
                catch (ExecutionException executionException) {
                }
                catch (CancellationException cancellationException) {}
            }
            done = true;
            LinkedList<Future<T>> linkedList = futures;
            return linkedList;
        }
        finally {
            if (!done) {
                for (Future future : futures) {
                    if (future.isDone()) continue;
                    future.cancel(true);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends AbstractTask<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        this.assertOpen();
        LinkedList<Future<T>> futures = new LinkedList<Future<T>>();
        boolean done = false;
        long nanos = unit.toNanos(timeout);
        long lastTime = System.nanoTime();
        try {
            for (AbstractTask<T> abstractTask : tasks) {
                long now2 = System.nanoTime();
                if ((nanos -= now2 - (lastTime = now2)) <= 0L) {
                    LinkedList<Future<T>> linkedList = futures;
                    return linkedList;
                }
                futures.add(this.submit(abstractTask));
            }
            for (Future future : futures) {
                if (future.isDone()) continue;
                if (nanos <= 0L) {
                    LinkedList<Future<T>> now2 = futures;
                    return now2;
                }
                try {
                    future.get(nanos, TimeUnit.NANOSECONDS);
                }
                catch (TimeoutException ex) {
                    if (log.isInfoEnabled()) {
                        log.info("Task Timeout");
                    }
                    LinkedList<Future<T>> linkedList = futures;
                    if (!done) {
                        for (Future future2 : futures) {
                            if (future2.isDone()) continue;
                            future2.cancel(true);
                        }
                    }
                    return linkedList;
                }
                catch (ExecutionException ex) {
                }
                catch (CancellationException ex) {
                    // empty catch block
                }
                {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            LinkedList<Future<T>> linkedList = futures;
            return linkedList;
        }
        finally {
            if (!done) {
                for (Future future : futures) {
                    if (future.isDone()) continue;
                    future.cancel(true);
                }
            }
        }
    }

    private static class ServiceElapsedTimeInstrument
    extends Instrument<Long> {
        final long serviceStartTime;

        public ServiceElapsedTimeInstrument(long serviceStartTime) {
            this.serviceStartTime = serviceStartTime;
        }

        @Override
        public void sample() {
            this.setValue(System.currentTimeMillis() - this.serviceStartTime);
        }
    }

    public static interface IConcurrencyManagerCounters {
        public static final String ReadService = "Read Service";
        public static final String TXWriteService = "Transaction Write Service";
        public static final String writeService = "Unisolated Write Service";
        public static final String LockManager = "LockManager";
    }

    public static interface Options
    extends IServiceShutdown.Options {
        public static final String TX_SERVICE_CORE_POOL_SIZE = ConcurrencyManager.class.getName() + ".txService.corePoolSize";
        public static final String DEFAULT_TX_SERVICE_CORE_POOL_SIZE = "0";
        public static final String READ_SERVICE_CORE_POOL_SIZE = ConcurrencyManager.class.getName() + ".readService.corePoolSize";
        public static final String DEFAULT_READ_SERVICE_CORE_POOL_SIZE = "0";
        public static final String WRITE_SERVICE_CORE_POOL_SIZE = ConcurrencyManager.class.getName() + ".writeService.corePoolSize";
        public static final String DEFAULT_WRITE_SERVICE_CORE_POOL_SIZE = "10";
        public static final String WRITE_SERVICE_MAXIMUM_POOL_SIZE = ConcurrencyManager.class.getName() + ".writeService.maximumPoolSize";
        public static final String DEFAULT_WRITE_SERVICE_MAXIMUM_POOL_SIZE = "50";
        public static final String WRITE_SERVICE_KEEP_ALIVE_TIME = ConcurrencyManager.class.getName() + ".writeService.keepAliveTime";
        public static final String DEFAULT_WRITE_SERVICE_KEEP_ALIVE_TIME = "60000";
        public static final String WRITE_SERVICE_PRESTART_ALL_CORE_THREADS = ConcurrencyManager.class.getName() + ".writeService.prestartAllCoreThreads";
        public static final String DEFAULT_WRITE_SERVICE_PRESTART_ALL_CORE_THREADS = "false";
        public static final String WRITE_SERVICE_QUEUE_CAPACITY = ConcurrencyManager.class.getName() + ".writeService.queueCapacity";
        public static final String DEFAULT_WRITE_SERVICE_QUEUE_CAPACITY = "0";
        public static final String WRITE_SERVICE_GROUP_COMMIT_TIMEOUT = ConcurrencyManager.class.getName() + ".writeService.groupCommitTimeout";
        public static final String DEFAULT_WRITE_SERVICE_GROUP_COMMIT_TIMEOUT = "100";
        public static final String WRITE_SERVICE_OVERFLOW_LOCK_REQUEST_TIMEOUT = ConcurrencyManager.class.getName() + ".writeService.overflowLockRequestTimeout";
        public static final String DEFAULT_WRITE_SERVICE_OVERFLOW_LOCK_REQUEST_TIMEOUT = "120000";
    }
}

