/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.service.ndx;

import com.bigdata.btree.AsynchronousIndexWriteConfiguration;
import com.bigdata.btree.ICounter;
import com.bigdata.btree.ITuple;
import com.bigdata.btree.ITupleIterator;
import com.bigdata.btree.ITupleSerializer;
import com.bigdata.btree.IndexMetadata;
import com.bigdata.btree.keys.KVO;
import com.bigdata.btree.proc.AbstractKeyArrayIndexProcedure;
import com.bigdata.btree.proc.AbstractKeyArrayIndexProcedureConstructor;
import com.bigdata.btree.proc.AbstractKeyRangeIndexProcedure;
import com.bigdata.btree.proc.BatchContains;
import com.bigdata.btree.proc.BatchInsert;
import com.bigdata.btree.proc.BatchLookup;
import com.bigdata.btree.proc.BatchPutIfAbsent;
import com.bigdata.btree.proc.BatchRemove;
import com.bigdata.btree.proc.IKeyArrayIndexProcedure;
import com.bigdata.btree.proc.IKeyRangeIndexProcedure;
import com.bigdata.btree.proc.IParallelizableIndexProcedure;
import com.bigdata.btree.proc.IResultHandler;
import com.bigdata.btree.proc.ISimpleIndexProcedure;
import com.bigdata.btree.proc.LongAggregator;
import com.bigdata.btree.proc.RangeCountProcedure;
import com.bigdata.counters.CounterSet;
import com.bigdata.journal.TimestampUtility;
import com.bigdata.mdi.IMetadataIndex;
import com.bigdata.mdi.IResourceMetadata;
import com.bigdata.mdi.MetadataIndex;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.accesspath.BlockingBuffer;
import com.bigdata.relation.accesspath.IRunnableBuffer;
import com.bigdata.relation.accesspath.UnsynchronizedArrayBuffer;
import com.bigdata.resources.StaleLocatorException;
import com.bigdata.service.AbstractClient;
import com.bigdata.service.AbstractScaleOutFederation;
import com.bigdata.service.IBigdataClient;
import com.bigdata.service.IDataService;
import com.bigdata.service.IMetadataService;
import com.bigdata.service.Split;
import com.bigdata.service.ndx.AbstractDataServiceProcedureTask;
import com.bigdata.service.ndx.AbstractSplitter;
import com.bigdata.service.ndx.ClientException;
import com.bigdata.service.ndx.IScaleOutClientIndex;
import com.bigdata.service.ndx.ISplitter;
import com.bigdata.service.ndx.IdentityHandler;
import com.bigdata.service.ndx.KeyArrayDataServiceProcedureTask;
import com.bigdata.service.ndx.KeyRangeDataServiceProcedureTask;
import com.bigdata.service.ndx.PartitionedTupleIterator;
import com.bigdata.service.ndx.SimpleDataServiceProcedureTask;
import com.bigdata.service.ndx.pipeline.IDuplicateRemover;
import com.bigdata.service.ndx.pipeline.IndexAsyncWriteStats;
import com.bigdata.service.ndx.pipeline.IndexPartitionWriteStats;
import com.bigdata.service.ndx.pipeline.IndexWriteTask;
import com.bigdata.util.InnerCause;
import com.bigdata.util.concurrent.ExecutionHelper;
import cutthecrap.utils.striterators.ICloseableIterator;
import cutthecrap.utils.striterators.IFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class ClientIndexView
implements IScaleOutClientIndex {
    protected static final transient Logger log = Logger.getLogger(ClientIndexView.class);
    protected final boolean WARN = log.getEffectiveLevel().toInt() <= Level.WARN.toInt();
    protected static final transient String ERR_NEW_TX = "Could not start transaction";
    protected static final transient String ERR_ABORT_TX = "Could not abort transaction: tx=";
    private final AbstractScaleOutFederation<?> fed;
    private final long taskTimeout;
    protected static final String NON_BATCH_API = "Non-batch API";
    private final boolean batchOnly;
    private final int capacity;
    private final long timestamp;
    private final String name;
    private final IMetadataIndex metadataIndex;
    private final MetadataIndex.MetadataIndexMetadata metadataIndexMetadata;
    private final ISplitter splitter;
    private ThreadLocal<AtomicInteger> recursionDepth = new ThreadLocal<AtomicInteger>(){

        @Override
        protected synchronized AtomicInteger initialValue() {
            return new AtomicInteger();
        }
    };
    private final boolean readConsistent;
    private volatile ITupleSerializer tupleSer = null;

    @Override
    public AbstractScaleOutFederation<?> getFederation() {
        return this.fed;
    }

    protected ThreadPoolExecutor getThreadPool() {
        return (ThreadPoolExecutor)this.fed.getExecutorService();
    }

    @Override
    public final long getTimestamp() {
        return this.timestamp;
    }

    @Override
    public final String getName() {
        return this.name;
    }

    protected final IMetadataService getMetadataService() {
        return this.fed.getMetadataService();
    }

    public final IMetadataIndex getMetadataIndex() {
        return this.metadataIndex;
    }

    @Override
    public AtomicInteger getRecursionDepth() {
        return this.recursionDepth.get();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClass().getSimpleName());
        sb.append("{ ");
        sb.append("name=" + this.name);
        sb.append(", timestamp=" + this.timestamp);
        sb.append(", readConsistent=" + this.readConsistent);
        sb.append("}");
        return sb.toString();
    }

    public ClientIndexView(final AbstractScaleOutFederation<?> fed, final String name, long timestamp, IMetadataIndex metadataIndex) {
        if (fed == null) {
            throw new IllegalArgumentException();
        }
        if (name == null) {
            throw new IllegalArgumentException();
        }
        if (metadataIndex == null) {
            throw new IllegalArgumentException();
        }
        this.fed = fed;
        this.name = name;
        this.timestamp = timestamp;
        this.metadataIndex = metadataIndex;
        this.metadataIndexMetadata = metadataIndex.getIndexMetadata();
        this.splitter = new AbstractSplitter(){

            @Override
            protected IMetadataIndex getMetadataIndex(long ts) {
                return fed.getMetadataIndex(name, ts);
            }
        };
        IBigdataClient client = fed.getClient();
        this.capacity = ((AbstractClient)client).getDefaultRangeQueryCapacity();
        this.batchOnly = ((AbstractClient)client).getBatchApiOnly();
        this.taskTimeout = ((AbstractClient)client).getTaskTimeout();
        this.readConsistent = ((AbstractClient)client).isReadConsistent();
    }

    public MetadataIndex.MetadataIndexMetadata getMetadataIndexMetadata() {
        return this.metadataIndexMetadata;
    }

    @Override
    public IndexMetadata getIndexMetadata() {
        return this.metadataIndexMetadata.getManagedIndexMetadata();
    }

    @Override
    public ICounter getCounter() {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ITupleSerializer getTupleSerializer() {
        if (this.tupleSer == null) {
            ClientIndexView clientIndexView = this;
            synchronized (clientIndexView) {
                if (this.tupleSer == null) {
                    this.tupleSer = this.getIndexMetadata().getTupleSerializer();
                }
            }
        }
        return this.tupleSer;
    }

    @Override
    public boolean contains(Object key) {
        key = this.getTupleSerializer().serializeKey(key);
        return this.contains((byte[])key);
    }

    @Override
    public boolean contains(byte[] key) {
        if (this.batchOnly) {
            log.error(NON_BATCH_API, new RuntimeException());
        } else if (this.WARN) {
            log.warn(NON_BATCH_API);
        }
        byte[][] keys = new byte[][]{key};
        IdentityHandler resultHandler = new IdentityHandler();
        this.submit(0, 1, keys, null, BatchContains.BatchContainsConstructor.INSTANCE, resultHandler);
        return ((AbstractKeyArrayIndexProcedure.ResultBitBuffer)resultHandler.getResult()).getResult()[0];
    }

    @Override
    public Object insert(Object key, Object val) {
        ITupleSerializer tupleSer = this.getTupleSerializer();
        key = tupleSer.serializeKey(key);
        val = tupleSer.serializeKey(val);
        byte[] oldval = this.insert((byte[])key, (byte[])val);
        throw new UnsupportedOperationException();
    }

    @Override
    public byte[] insert(byte[] key, byte[] value) {
        if (this.batchOnly) {
            log.error(NON_BATCH_API, new RuntimeException());
        } else if (this.WARN) {
            log.warn(NON_BATCH_API);
        }
        byte[][] keys = new byte[][]{key};
        byte[][] vals = new byte[][]{value};
        IdentityHandler resultHandler = new IdentityHandler();
        this.submit(0, 1, keys, vals, BatchInsert.BatchInsertConstructor.RETURN_OLD_VALUES, resultHandler);
        return ((AbstractKeyArrayIndexProcedure.ResultBuffer)resultHandler.getResult()).getResult(0);
    }

    @Override
    public byte[] putIfAbsent(byte[] key, byte[] value) {
        if (this.batchOnly) {
            log.error(NON_BATCH_API, new RuntimeException());
        } else if (this.WARN) {
            log.warn(NON_BATCH_API);
        }
        byte[][] keys = new byte[][]{key};
        byte[][] vals = new byte[][]{value};
        IdentityHandler resultHandler = new IdentityHandler();
        this.submit(0, 1, keys, vals, BatchPutIfAbsent.BatchPutIfAbsentConstructor.RETURN_OLD_VALUES, resultHandler);
        return ((AbstractKeyArrayIndexProcedure.ResultBuffer)resultHandler.getResult()).getResult(0);
    }

    @Override
    public Object lookup(Object key) {
        key = this.getTupleSerializer().serializeKey(key);
        byte[] val = this.lookup((byte[])key);
        throw new UnsupportedOperationException();
    }

    @Override
    public byte[] lookup(byte[] key) {
        if (this.batchOnly) {
            log.error(NON_BATCH_API, new RuntimeException());
        } else if (this.WARN) {
            log.warn(NON_BATCH_API);
        }
        byte[][] keys = new byte[][]{key};
        IdentityHandler resultHandler = new IdentityHandler();
        this.submit(0, 1, keys, null, BatchLookup.BatchLookupConstructor.INSTANCE, resultHandler);
        return ((AbstractKeyArrayIndexProcedure.ResultBuffer)resultHandler.getResult()).getResult(0);
    }

    @Override
    public Object remove(Object key) {
        key = this.getTupleSerializer().serializeKey(key);
        byte[] oldval = this.remove((byte[])key);
        throw new UnsupportedOperationException();
    }

    @Override
    public byte[] remove(byte[] key) {
        if (this.batchOnly) {
            log.error(NON_BATCH_API, new RuntimeException());
        } else if (this.WARN) {
            log.warn(NON_BATCH_API);
        }
        byte[][] keys = new byte[][]{key};
        IdentityHandler resultHandler = new IdentityHandler();
        this.submit(0, 1, keys, null, BatchRemove.BatchRemoveConstructor.RETURN_OLD_VALUES, resultHandler);
        return ((AbstractKeyArrayIndexProcedure.ResultBuffer)resultHandler.getResult()).getValues().get(0);
    }

    @Override
    public long rangeCount() {
        return this.rangeCount(null, null);
    }

    @Override
    public long rangeCount(byte[] fromKey, byte[] toKey) {
        LongAggregator handler = new LongAggregator();
        RangeCountProcedure proc = new RangeCountProcedure(false, false, fromKey, toKey);
        this.submit(fromKey, toKey, proc, handler);
        return handler.getResult();
    }

    @Override
    public final long rangeCountExact(byte[] fromKey, byte[] toKey) {
        LongAggregator handler = new LongAggregator();
        RangeCountProcedure proc = new RangeCountProcedure(true, false, fromKey, toKey);
        this.submit(fromKey, toKey, proc, handler);
        return handler.getResult();
    }

    @Override
    public final long rangeCountExactWithDeleted(byte[] fromKey, byte[] toKey) {
        LongAggregator handler = new LongAggregator();
        RangeCountProcedure proc = new RangeCountProcedure(true, true, fromKey, toKey);
        this.submit(fromKey, toKey, proc, handler);
        return handler.getResult();
    }

    @Override
    public final ITupleIterator rangeIterator() {
        return this.rangeIterator(null, null);
    }

    @Override
    public ITupleIterator rangeIterator(byte[] fromKey, byte[] toKey) {
        return this.rangeIterator(fromKey, toKey, this.capacity, 3, null);
    }

    @Override
    public ITupleIterator rangeIterator(byte[] fromKey, byte[] toKey, int capacity, int flags, IFilter filter) {
        boolean isReadConsistentTx;
        long ts;
        boolean readOnly;
        if (capacity == 0) {
            capacity = this.capacity;
        }
        boolean parallel = (flags & 0x100) != 0;
        boolean bl = readOnly = (flags & 8) != 0;
        if (readOnly && (flags & 0x10) != 0) {
            throw new IllegalArgumentException();
        }
        if (this.timestamp == 0L && readOnly || this.timestamp == -1L && this.readConsistent) {
            try {
                ts = this.fed.getTransactionService().newTx(-1L);
            }
            catch (IOException ex) {
                throw new RuntimeException(ERR_NEW_TX, ex);
            }
            isReadConsistentTx = true;
        } else {
            ts = this.timestamp;
            isReadConsistentTx = false;
        }
        try {
            if (parallel) {
                return this.parallelRangeIterator(ts, isReadConsistentTx, fromKey, toKey, capacity, flags, filter);
            }
            return new PartitionedTupleIterator(this, ts, isReadConsistentTx, fromKey, toKey, capacity, flags, filter);
        }
        catch (Throwable t) {
            if (isReadConsistentTx) {
                try {
                    this.fed.getTransactionService().abort(ts);
                }
                catch (Throwable t2) {
                    log.error(t2, t2);
                }
            }
            throw new RuntimeException(t);
        }
    }

    private ITupleIterator parallelRangeIterator(long ts, boolean isReadConsistentTx, byte[] fromKey, byte[] toKey, int capacity, int flags, IFilter filter) {
        int minimumChunkSize = 100;
        long chunkTimeout = 1L;
        TimeUnit chunkTimeoutUnit = TimeUnit.MILLISECONDS;
        BlockingBuffer<ITuple<?>[]> queryBuffer = new BlockingBuffer<ITuple<?>[]>(capacity, 100, 1L, chunkTimeoutUnit);
        ParallelRangeIteratorTask task = new ParallelRangeIteratorTask(ts, isReadConsistentTx, fromKey, toKey, capacity, flags, filter, queryBuffer);
        FutureTask<Void> ft = new FutureTask<Void>(task);
        queryBuffer.setFuture(ft);
        this.fed.getExecutorService().submit(ft);
        return new UnchunkedTupleIterator(queryBuffer.iterator());
    }

    @Override
    public <T> T submit(byte[] key, ISimpleIndexProcedure<T> proc) {
        return this.submit(this.timestamp, key, proc);
    }

    private <T> T submit(long ts, byte[] key, ISimpleIndexProcedure<T> proc) {
        PartitionLocator locator = this.fed.getMetadataIndex(this.name, ts).find(key);
        try {
            if (log.isInfoEnabled()) {
                log.info("Submitting " + proc.getClass() + " to partition" + locator);
            }
            IdentityHandler resultHandler = new IdentityHandler();
            SimpleDataServiceProcedureTask task = new SimpleDataServiceProcedureTask(this, key, ts, new Split(locator, 0, 0), proc, resultHandler);
            this.getThreadPool().submit(task).get(this.taskTimeout, TimeUnit.MILLISECONDS);
            Object result = resultHandler.getResult();
            return (T)result;
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public Iterator<PartitionLocator> locatorScan(long ts, byte[] fromKey, byte[] toKey, boolean reverseScan) {
        return this.fed.locatorScan(this.name, ts, fromKey, toKey, reverseScan);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submit(byte[] fromKey, byte[] toKey, IKeyRangeIndexProcedure proc, IResultHandler resultHandler) {
        if (proc == null) {
            throw new IllegalArgumentException();
        }
        if (this.readConsistent && proc.isReadOnly() && TimestampUtility.isReadCommittedOrUnisolated(this.getTimestamp())) {
            long tx;
            try {
                tx = this.fed.getTransactionService().newTx(-1L);
            }
            catch (IOException ex) {
                throw new RuntimeException(ERR_NEW_TX, ex);
            }
            try {
                this.submit(tx, fromKey, toKey, proc, resultHandler);
            }
            finally {
                try {
                    this.fed.getTransactionService().abort(tx);
                }
                catch (IOException ex) {
                    log.error(ERR_ABORT_TX + tx, ex);
                }
            }
        }
        this.submit(this.timestamp, fromKey, toKey, proc, resultHandler);
    }

    void submit(long ts, byte[] fromKey, byte[] toKey, IKeyRangeIndexProcedure proc, IResultHandler resultHandler) {
        int maxTasks;
        boolean parallel = proc instanceof IParallelizableIndexProcedure;
        if (log.isInfoEnabled()) {
            log.info("Procedure " + proc.getClass().getName() + " will be mapped across index partitions in " + (parallel ? "parallel" : "sequence"));
        }
        int poolSize = this.getThreadPool().getCorePoolSize();
        int maxTasksPerRequest = ((AbstractClient)this.fed.getClient()).getMaxParallelTasksPerRequest();
        int n = maxTasks = poolSize == 0 ? maxTasksPerRequest : Math.min(poolSize, maxTasksPerRequest);
        assert (maxTasks > 0) : "maxTasks=" + maxTasks + ", poolSize=" + poolSize + ", maxTasksPerRequest=" + maxTasksPerRequest;
        Iterator<PartitionLocator> itr = this.locatorScan(ts, fromKey, toKey, false);
        long nparts = 0L;
        while (itr.hasNext()) {
            ArrayList<AbstractDataServiceProcedureTask> tasks = new ArrayList<AbstractDataServiceProcedureTask>(maxTasks);
            for (int i = 0; i < maxTasks && itr.hasNext(); ++i) {
                PartitionLocator locator = itr.next();
                Split split = new Split(locator, 0, 0);
                tasks.add(new KeyRangeDataServiceProcedureTask(this, fromKey, toKey, ts, split, proc, resultHandler));
                ++nparts;
            }
            this.runTasks(parallel, tasks);
        }
        if (log.isInfoEnabled()) {
            log.info("Procedure " + proc.getClass().getName() + " mapped across " + nparts + " index partitions in " + (parallel ? "parallel" : "sequence"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submit(int fromIndex, int toIndex, byte[][] keys, byte[][] vals, AbstractKeyArrayIndexProcedureConstructor ctor, IResultHandler aggregator) {
        long ts;
        boolean isTx;
        if (ctor == null) {
            throw new IllegalArgumentException();
        }
        Object proc = ctor.newInstance(this, fromIndex, toIndex, keys, vals);
        if (this.readConsistent && proc.isReadOnly() && TimestampUtility.isReadCommittedOrUnisolated(this.getTimestamp())) {
            isTx = true;
            try {
                ts = this.fed.getTransactionService().newTx(-1L);
            }
            catch (IOException e) {
                throw new RuntimeException(ERR_NEW_TX, e);
            }
        } else {
            isTx = false;
            ts = this.getTimestamp();
        }
        try {
            this.submit(ts, fromIndex, toIndex, keys, vals, ctor, aggregator);
        }
        finally {
            if (isTx) {
                try {
                    this.fed.getTransactionService().abort(ts);
                }
                catch (IOException e) {
                    log.error("Could not abort transaction: tx=: " + ts, e);
                }
            }
        }
    }

    void submit(long ts, int fromIndex, int toIndex, byte[][] keys, byte[][] vals, AbstractKeyArrayIndexProcedureConstructor ctor, IResultHandler aggregator) {
        LinkedList<Split> splits = this.splitKeys(ts, fromIndex, toIndex, keys);
        int nsplits = splits.size();
        ArrayList<AbstractDataServiceProcedureTask> tasks = new ArrayList<AbstractDataServiceProcedureTask>(nsplits);
        Iterator itr = splits.iterator();
        boolean parallel = false;
        while (itr.hasNext()) {
            Split split = (Split)itr.next();
            Object proc = ctor.newInstance(this, split.fromIndex, split.toIndex, keys, vals);
            if (proc instanceof IParallelizableIndexProcedure) {
                parallel = true;
            }
            tasks.add(new KeyArrayDataServiceProcedureTask(this, keys, vals, ts, split, (IKeyArrayIndexProcedure)proc, aggregator, ctor));
        }
        if (log.isInfoEnabled()) {
            log.info("Procedures created by " + ctor.getClass().getName() + " will run on " + nsplits + " index partitions in " + (parallel ? "parallel" : "sequence"));
        }
        this.runTasks(parallel, tasks);
    }

    protected void runTasks(boolean parallel, ArrayList<AbstractDataServiceProcedureTask> tasks) {
        if (tasks.isEmpty()) {
            log.warn("No tasks to run?", new RuntimeException("No tasks to run?"));
            return;
        }
        if (this.getRecursionDepth().get() > 0) {
            this.runInCallersThread(tasks);
        } else if (tasks.size() == 1) {
            this.runOne(tasks.get(0));
        } else if (parallel) {
            this.runParallel(tasks);
        } else {
            this.runSequence(tasks);
        }
    }

    protected void runOne(Callable<Void> task) {
        if (log.isInfoEnabled()) {
            log.info("Running one task (#active=" + this.getThreadPool().getActiveCount() + ", queueSize=" + this.getThreadPool().getQueue().size() + ") : " + task.toString());
        }
        try {
            Future<Void> f = this.getThreadPool().submit(task);
            f.get(this.taskTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            if (log.isInfoEnabled()) {
                log.info("Execution failed: task=" + task, e);
            }
            throw new ClientException("Execution failed: " + task, e);
        }
    }

    protected void runParallel(ArrayList<AbstractDataServiceProcedureTask> tasks) {
        long begin = System.currentTimeMillis();
        if (log.isInfoEnabled()) {
            log.info("Running " + tasks.size() + " tasks in parallel (#active=" + this.getThreadPool().getActiveCount() + ", queueSize=" + this.getThreadPool().getQueue().size() + ") : " + tasks.get(0).toString());
        }
        int nfailed = 0;
        LinkedList<Throwable> causes = new LinkedList<Throwable>();
        try {
            List futures = this.getThreadPool().invokeAll(tasks, this.taskTimeout, TimeUnit.MILLISECONDS);
            Iterator itr = futures.iterator();
            int i = 0;
            while (itr.hasNext()) {
                Future f = itr.next();
                try {
                    f.get();
                }
                catch (ExecutionException e) {
                    AbstractDataServiceProcedureTask task = tasks.get(i);
                    log.error("Execution failed: task=" + task, e);
                    if (task.causes != null) {
                        causes.addAll(task.causes);
                    } else {
                        causes.add(e);
                    }
                    ++nfailed;
                }
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted: " + e);
        }
        if (nfailed > 0) {
            throw new ClientException("Execution failed: ntasks=" + tasks.size() + ", nfailed=" + nfailed, causes);
        }
        if (log.isInfoEnabled()) {
            log.info("Ran " + tasks.size() + " tasks in parallel: elapsed=" + (System.currentTimeMillis() - begin));
        }
    }

    protected void runSequence(ArrayList<AbstractDataServiceProcedureTask> tasks) {
        if (log.isInfoEnabled()) {
            log.info("Running " + tasks.size() + " tasks in sequence (#active=" + this.getThreadPool().getActiveCount() + ", queueSize=" + this.getThreadPool().getQueue().size() + ") : " + tasks.get(0).toString());
        }
        for (AbstractDataServiceProcedureTask task : tasks) {
            try {
                Future<Void> f = this.getThreadPool().submit(task);
                f.get(this.taskTimeout, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                if (log.isInfoEnabled()) {
                    log.info("Execution failed: task=" + task, e);
                }
                throw new ClientException("Execution failed: " + task, e, task.causes);
            }
        }
    }

    protected void runInCallersThread(ArrayList<AbstractDataServiceProcedureTask> tasks) {
        int ntasks = tasks.size();
        if (this.WARN && ntasks > 1) {
            log.warn("Running " + ntasks + " tasks in caller's thread: recursionDepth=" + this.getRecursionDepth().get() + "(#active=" + this.getThreadPool().getActiveCount() + ", queueSize=" + this.getThreadPool().getQueue().size() + ") : " + tasks.get(0).toString());
        }
        for (AbstractDataServiceProcedureTask task : tasks) {
            try {
                task.call();
            }
            catch (Exception e) {
                throw new ClientException("Execution failed: recursionDepth=" + this.getRecursionDepth() + ", task=" + task, e, task.causes);
            }
        }
    }

    @Override
    public LinkedList<Split> splitKeys(long ts, int fromIndex, int toIndex, byte[][] keys) {
        return this.splitter.splitKeys(ts, fromIndex, toIndex, keys);
    }

    @Override
    public LinkedList<Split> splitKeys(long ts, int fromIndex, int toIndex, KVO[] a) {
        return this.splitter.splitKeys(ts, fromIndex, toIndex, a);
    }

    @Override
    public IDataService getDataService(PartitionLocator pmd) {
        return this.fed.getDataService(pmd.getDataServiceUUID());
    }

    @Override
    public IResourceMetadata[] getResourceMetadata() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void staleLocator(long ts, PartitionLocator locator, StaleLocatorException cause) {
        if (locator == null) {
            throw new IllegalArgumentException();
        }
        if (ts != 0L && ts != -1L) {
            throw new RuntimeException("Stale locator, but views should be consistent? timestamp=" + TimestampUtility.toString(ts));
        }
        this.fed.getMetadataIndex(this.name, this.timestamp).staleLocator(locator);
    }

    @Override
    public <T extends IKeyArrayIndexProcedure, O, R, A> IRunnableBuffer<KVO<O>[]> newWriteBuffer(IResultHandler<R, A> resultHandler, IDuplicateRemover<O> duplicateRemover, AbstractKeyArrayIndexProcedureConstructor<T> ctor) {
        AsynchronousIndexWriteConfiguration conf = this.getIndexMetadata().getAsynchronousIndexWriteConfiguration();
        BlockingBuffer<KVO<O>[]> writeBuffer = new BlockingBuffer<KVO<O>[]>(new LinkedBlockingDeque(conf.getMasterQueueCapacity()), conf.getMasterChunkSize(), conf.getMasterChunkTimeoutNanos(), TimeUnit.NANOSECONDS, true);
        IndexWriteTask.M<T, O, R, A> task = new IndexWriteTask.M<T, O, R, A>((IScaleOutClientIndex)this, conf.getSinkIdleTimeoutNanos(), conf.getSinkPollTimeoutNanos(), conf.getSinkQueueCapacity(), conf.getSinkChunkSize(), conf.getSinkChunkTimeoutNanos(), duplicateRemover, ctor, resultHandler, this.fed.getIndexCounters((String)this.name).asynchronousStats, writeBuffer);
        FutureTask<IndexAsyncWriteStats<PartitionLocator, IndexPartitionWriteStats>> ft = new FutureTask<IndexAsyncWriteStats<PartitionLocator, IndexPartitionWriteStats>>(task);
        writeBuffer.setFuture(ft);
        this.fed.getExecutorService().submit(ft);
        return task.getBuffer();
    }

    @Override
    public CounterSet getCounters() {
        return this.getFederation().getIndexCounters(this.name).getCounters();
    }

    private class ParallelRangeIteratorTask
    implements Callable<Void> {
        private final long ts;
        private final boolean isReadConsistentTx;
        private final byte[] fromKey;
        private final byte[] toKey;
        private final int capacity;
        private final int flags;
        private final IFilter filter;
        private final BlockingBuffer<ITuple<?>[]> queryBuffer;
        private final int maxTasks;
        private final ExecutionHelper<Void> helper;

        public ParallelRangeIteratorTask(long ts, boolean isReadConsistentTx, byte[] fromKey, byte[] toKey, int capacity, int flags, IFilter filter, BlockingBuffer<ITuple<?>[]> queryBuffer) {
            this.ts = ts;
            this.isReadConsistentTx = isReadConsistentTx;
            this.fromKey = fromKey;
            this.toKey = toKey;
            this.capacity = capacity;
            this.flags = flags;
            this.filter = filter;
            this.queryBuffer = queryBuffer;
            int poolSize = ClientIndexView.this.getThreadPool().getCorePoolSize();
            int maxTasksPerRequest = ((AbstractClient)ClientIndexView.this.fed.getClient()).getMaxParallelTasksPerRequest();
            int n = this.maxTasks = poolSize == 0 ? maxTasksPerRequest : Math.min(poolSize, maxTasksPerRequest);
            assert (this.maxTasks > 0) : "maxTasks=" + this.maxTasks + ", poolSize=" + poolSize + ", maxTasksPerRequest=" + maxTasksPerRequest;
            this.helper = new ExecutionHelper(ClientIndexView.this.fed.getExecutorService(), ((AbstractClient)ClientIndexView.this.fed.getClient()).getTaskTimeout(), TimeUnit.MILLISECONDS);
        }

        @Override
        public Void call() throws Exception {
            try {
                Iterator<PartitionLocator> itr = ClientIndexView.this.locatorScan(this.ts, this.fromKey, this.toKey, false);
                long nparts = 0L;
                while (itr.hasNext()) {
                    ArrayList tasks = new ArrayList(this.maxTasks);
                    for (int i = 0; i < this.maxTasks && itr.hasNext(); ++i) {
                        PartitionLocator locator = itr.next();
                        byte[] _fromKey = AbstractKeyRangeIndexProcedure.constrainFromKey(this.fromKey, locator);
                        byte[] _toKey = AbstractKeyRangeIndexProcedure.constrainToKey(this.toKey, locator);
                        tasks.add(new RobustIteratorTask(_fromKey, _toKey));
                        ++nparts;
                    }
                    this.helper.submitTasks(tasks);
                }
                return null;
            }
            catch (Throwable t) {
                if (this.isReadConsistentTx) {
                    ClientIndexView.this.fed.getTransactionService().abort(this.ts);
                }
                throw new RuntimeException(t);
            }
        }

        private class RobustIteratorTask
        implements Callable<Void> {
            private final PartitionedTupleIterator itr;

            RobustIteratorTask(byte[] fromKey, byte[] toKey) {
                this.itr = new PartitionedTupleIterator(ClientIndexView.this, ParallelRangeIteratorTask.this.ts, ParallelRangeIteratorTask.this.isReadConsistentTx, fromKey, toKey, ParallelRangeIteratorTask.this.capacity, ParallelRangeIteratorTask.this.flags, ParallelRangeIteratorTask.this.filter);
            }

            @Override
            public Void call() throws Exception {
                try {
                    UnsynchronizedArrayBuffer<ITuple> unsyncBuffer = new UnsynchronizedArrayBuffer<ITuple>(ParallelRangeIteratorTask.this.queryBuffer, ITuple.class, ParallelRangeIteratorTask.this.queryBuffer.getMinimumChunkSize());
                    while (this.itr.hasNext() && ParallelRangeIteratorTask.this.queryBuffer.isOpen()) {
                        unsyncBuffer.add((ITuple)this.itr.next());
                    }
                    if (ParallelRangeIteratorTask.this.queryBuffer.isOpen()) {
                        unsyncBuffer.flush();
                    }
                }
                catch (Throwable t) {
                    if (InnerCause.isInnerCause(t, InterruptedException.class)) {
                        ParallelRangeIteratorTask.this.queryBuffer.abort(t);
                    }
                    throw new RuntimeException(t);
                }
                return null;
            }
        }
    }

    private static class UnchunkedTupleIterator<E>
    implements ITupleIterator<E> {
        private final ICloseableIterator<ITuple<E>[]> src;
        private ITuple<E>[] chunk = null;
        private int index = 0;
        private boolean exhausted = false;

        public UnchunkedTupleIterator(ICloseableIterator<ITuple<E>[]> src) {
            if (src == null) {
                throw new IllegalArgumentException();
            }
            this.src = src;
        }

        @Override
        public boolean hasNext() {
            while (!(this.exhausted || this.chunk != null && this.chunk.length != 0 && this.index < this.chunk.length)) {
                if (!this.src.hasNext()) {
                    this.exhausted = true;
                    break;
                }
                this.chunk = (ITuple[])this.src.next();
                this.index = 0;
            }
            return !this.exhausted;
        }

        @Override
        public ITuple<E> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.chunk[this.index++];
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        protected void finalize() throws Exception {
            this.src.close();
        }
    }
}

