/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.rdf.rio;

import com.bigdata.btree.AsynchronousIndexWriteConfiguration;
import com.bigdata.btree.keys.IKeyBuilder;
import com.bigdata.btree.keys.KVO;
import com.bigdata.btree.keys.KeyBuilder;
import com.bigdata.btree.proc.IAsyncResultHandler;
import com.bigdata.btree.proc.LongAggregator;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
import com.bigdata.counters.OneShotInstrument;
import com.bigdata.io.ByteArrayBuffer;
import com.bigdata.io.DataOutputBuffer;
import com.bigdata.rdf.internal.IV;
import com.bigdata.rdf.internal.VTE;
import com.bigdata.rdf.internal.impl.BlobIV;
import com.bigdata.rdf.lexicon.AssignTermId;
import com.bigdata.rdf.lexicon.BigdataValueCentricFullTextIndex;
import com.bigdata.rdf.lexicon.BlobsIndexHelper;
import com.bigdata.rdf.lexicon.BlobsWriteProc;
import com.bigdata.rdf.lexicon.Id2TermWriteProc;
import com.bigdata.rdf.lexicon.LexiconKeyBuilder;
import com.bigdata.rdf.lexicon.LexiconKeyOrder;
import com.bigdata.rdf.lexicon.LexiconRelation;
import com.bigdata.rdf.lexicon.Term2IdTupleSerializer;
import com.bigdata.rdf.lexicon.Term2IdWriteProc;
import com.bigdata.rdf.model.BigdataBNode;
import com.bigdata.rdf.model.BigdataBNodeImpl;
import com.bigdata.rdf.model.BigdataLiteral;
import com.bigdata.rdf.model.BigdataResource;
import com.bigdata.rdf.model.BigdataStatement;
import com.bigdata.rdf.model.BigdataURI;
import com.bigdata.rdf.model.BigdataValue;
import com.bigdata.rdf.model.BigdataValueFactory;
import com.bigdata.rdf.model.BigdataValueImpl;
import com.bigdata.rdf.model.BigdataValueSerializer;
import com.bigdata.rdf.model.StatementEnum;
import com.bigdata.rdf.rio.IAsynchronousWriteStatementBufferFactory;
import com.bigdata.rdf.rio.IStatementBuffer;
import com.bigdata.rdf.rio.PresortRioLoader;
import com.bigdata.rdf.rio.RDFParserOptions;
import com.bigdata.rdf.spo.ISPO;
import com.bigdata.rdf.spo.SPOIndexWriteProc;
import com.bigdata.rdf.spo.SPOKeyOrder;
import com.bigdata.rdf.spo.SPORelation;
import com.bigdata.rdf.spo.SPOTupleSerializer;
import com.bigdata.rdf.store.AbstractTripleStore;
import com.bigdata.rdf.store.ScaleOutTripleStore;
import com.bigdata.relation.accesspath.IRunnableBuffer;
import com.bigdata.relation.accesspath.UnsynchronizedUnboundedChunkBuffer;
import com.bigdata.search.TextIndexWriteProc;
import com.bigdata.service.AbstractFederation;
import com.bigdata.service.Split;
import com.bigdata.service.ndx.IScaleOutClientIndex;
import com.bigdata.service.ndx.pipeline.DefaultDuplicateRemover;
import com.bigdata.service.ndx.pipeline.KVOC;
import com.bigdata.service.ndx.pipeline.KVOLatch;
import com.bigdata.service.ndx.pipeline.KVOList;
import com.bigdata.striterator.ChunkedWrappedIterator;
import com.bigdata.striterator.IChunkedIterator;
import com.bigdata.striterator.IChunkedOrderedIterator;
import com.bigdata.striterator.IKeyOrder;
import com.bigdata.util.DaemonThreadFactory;
import com.bigdata.util.concurrent.Latch;
import com.bigdata.util.concurrent.ShutdownHelper;
import com.bigdata.util.concurrent.ThreadPoolExecutorBaseStatisticsTask;
import cutthecrap.utils.striterators.Filter;
import cutthecrap.utils.striterators.Striterator;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipInputStream;
import org.apache.log4j.Logger;
import org.openrdf.model.BNode;
import org.openrdf.model.Resource;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.rio.RDFFormat;

public class AsynchronousStatementBufferFactory<S extends BigdataStatement, R>
implements IAsynchronousWriteStatementBufferFactory<S> {
    private static final transient Logger log = Logger.getLogger(AsynchronousStatementBufferFactory.class);
    private final ScaleOutTripleStore tripleStore;
    private final LexiconRelation lexiconRelation;
    private final SPORelation spoRelation;
    private final int valuesInitialCapacity;
    private final int bnodesInitialCapacity;
    private final int producerChunkSize;
    private final RDFFormat defaultFormat;
    private final String defaultGraph;
    private final RDFParserOptions parserOptions;
    private final boolean deleteAfter;
    private final boolean indexDatatypeLiterals;
    private final IRunnableBuffer<KVO<BigdataValue>[]> buffer_t2id;
    private final IRunnableBuffer<KVO<BigdataValue>[]> buffer_id2t;
    private final IRunnableBuffer<KVO<BigdataValue>[]> buffer_blobs;
    private final IRunnableBuffer<KVO<BigdataValue>[]> buffer_text;
    private final Map<SPOKeyOrder, IRunnableBuffer<KVO<ISPO>[]>> buffer_stmts;
    private final LongAggregator statementResultHandler = new LongAggregator();
    private final LongAggregator textResultHandler = new LongAggregator();
    private volatile long startTime;
    private long endTime;
    private final AtomicLong documentsParsedCount = new AtomicLong(0L);
    private final AtomicLong documentTIDsReadyCount = new AtomicLong(0L);
    private final AtomicLong documentTIDsWaitingCount = new AtomicLong(0L);
    private final AtomicLong toldTriplesRestartSafeCount = new AtomicLong();
    private final AtomicLong documentRestartSafeCount = new AtomicLong();
    private final AtomicLong documentErrorCount = new AtomicLong();
    private final ReentrantLock lock = new ReentrantLock();
    private final Latch workflowLatch_document = new Latch("document", this.lock);
    private final Latch workflowLatch_parser = new Latch("parser", this.lock);
    private final Latch workflowLatch_bufferTids = new Latch("bufferTids", this.lock);
    private final Latch workflowLatch_bufferOther = new Latch("bufferOther", this.lock);
    private final Latch guardLatch_term2Id = new Latch("guard_term2Id", this.lock);
    private final Latch guardLatch_other = new Latch("guard_other", this.lock);
    private final Latch guardLatch_notify = new Latch("guard_notify", this.lock);
    private final long pauseParserPoolStatementThreshold;
    private final AtomicLong unbufferedStatementCount = new AtomicLong();
    private final AtomicLong outstandingStatementCount = new AtomicLong();
    private Condition unpaused = this.lock.newCondition();
    private AtomicLong pausedThreadCount = new AtomicLong();
    private AtomicLong poolPausedCount = new AtomicLong();
    private final ParserThreadPoolExecutor parserService;
    private final ThreadPoolExecutor tidsWriterService;
    private final ThreadPoolExecutor otherWriterService;
    private final ThreadPoolExecutor notifyService;
    private final ServiceStatisticsTask serviceStatisticsTask;

    protected boolean isDeleteAfter() {
        return this.deleteAfter;
    }

    protected RDFFormat getDefaultRDFFormat() {
        return this.defaultFormat;
    }

    protected void notifyStart() {
        if (!this.lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        if (this.startTime == 0L) {
            this.endTime = 0L;
            this.startTime = System.currentTimeMillis();
        }
    }

    protected void notifyEnd() {
        this.endTime = System.currentTimeMillis();
        this.parserService.shutdownNow();
        this.tidsWriterService.shutdownNow();
        this.otherWriterService.shutdownNow();
        this.notifyService.shutdownNow();
        if (this.serviceStatisticsTask != null) {
            this.serviceStatisticsTask.cancel();
        }
    }

    public long getElapsedMillis() {
        if (this.startTime == 0L) {
            return 0L;
        }
        if (this.endTime == 0L) {
            return System.currentTimeMillis() - this.startTime;
        }
        return this.endTime - this.startTime;
    }

    private void assertSumOfLatchs() {
        long n2;
        if (!this.lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        long n1 = this.workflowLatch_parser.get() + this.workflowLatch_bufferTids.get() + this.workflowLatch_bufferOther.get();
        if (n1 != (n2 = this.workflowLatch_document.get())) {
            throw new AssertionError((Object)("Sum of Latches=" + n1 + ", but unfinished=" + n2 + " : " + this.getCounters().toString()));
        }
    }

    public long getStatementCount() {
        return this.statementResultHandler.getResult();
    }

    public long getDocumentErrorCount() {
        return this.documentErrorCount.get();
    }

    public long getDocumentDoneCount() {
        return this.documentRestartSafeCount.get();
    }

    @Override
    public IStatementBuffer<S> newStatementBuffer() {
        return this.newStatementBuffer(null);
    }

    protected AsynchronousStatementBufferImpl newStatementBuffer(R resource) {
        return new AsynchronousStatementBufferImpl(resource);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void submitOne(R resource) throws Exception {
        this.lock.lock();
        try {
            Callable<?> task = this.newParserTask(resource);
            this.submitOne(resource, task);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitOne(R resource, Callable<?> task) throws Exception {
        if (resource == null) {
            throw new IllegalArgumentException();
        }
        if (task == null) {
            throw new IllegalArgumentException();
        }
        this.lock.lock();
        try {
            this.assertSumOfLatchs();
            this.notifyStart();
            this.workflowLatch_document.inc();
            this.workflowLatch_parser.inc();
            this.assertSumOfLatchs();
            try {
                this.parserService.submit(task);
            }
            catch (RejectedExecutionException ex) {
                this.assertSumOfLatchs();
                this.workflowLatch_document.dec();
                this.workflowLatch_parser.dec();
                this.assertSumOfLatchs();
                throw ex;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void submitOne(R resource, long retryMillis) throws InterruptedException, Exception {
        long begin;
        if (resource == null) {
            throw new IllegalArgumentException();
        }
        if (retryMillis < 0L) {
            throw new IllegalArgumentException();
        }
        int retryCount = 0;
        long lastLogTime = begin = System.currentTimeMillis();
        Callable<?> task = this.newParserTask(resource);
        while (true) {
            try {
                this.submitOne(resource, task);
                return;
            }
            catch (RejectedExecutionException ex) {
                long now;
                long elapsedSinceLastLogTime;
                if (this.parserService.isShutdown()) {
                    throw ex;
                }
                if (retryMillis == 0L) {
                    throw ex;
                }
                Thread.sleep(retryMillis);
                ++retryCount;
                if (!log.isInfoEnabled() || (elapsedSinceLastLogTime = (now = System.currentTimeMillis()) - lastLogTime) <= 5000L) continue;
                long elapsed = now - begin;
                lastLogTime = now;
                log.info("Parser pool blocking: retryCount=" + retryCount + ", elapsed=" + elapsed + "ms, resource=" + resource);
                continue;
            }
            catch (InterruptedException ex) {
                throw ex;
            }
            catch (Exception ex) {
                log.error(resource, ex);
                continue;
            }
            break;
        }
    }

    public int submitAll(File fileOrDir, FilenameFilter filter, long retryMillis) throws Exception {
        return new RunnableFileSystemLoader(fileOrDir, filter, retryMillis).call();
    }

    protected InputStream getInputStream(R resource) throws IOException {
        InputStream is;
        if (resource instanceof File) {
            is = new FileInputStream((File)resource);
            String name = ((File)resource).getName();
            if (name.endsWith(".gz")) {
                is = new GZIPInputStream(is);
            } else if (name.endsWith(".zip")) {
                is = new ZipInputStream(is);
            }
        } else if (resource instanceof URL) {
            is = ((URL)resource).openStream();
        } else {
            throw new UnsupportedOperationException();
        }
        return is;
    }

    protected Callable<?> newParserTask(R resource) throws Exception {
        RDFFormat defaultFormat;
        RDFFormat rdfFormat;
        String resourceStr = resource.toString();
        if (log.isInfoEnabled()) {
            log.info("resource=" + resourceStr);
        }
        RDFFormat rDFFormat = rdfFormat = (defaultFormat = this.getDefaultRDFFormat()) == null ? RDFFormat.forFileName(resourceStr) : RDFFormat.forFileName(resourceStr, defaultFormat);
        if (rdfFormat == null) {
            String msg = "Could not determine interchange syntax - skipping : file=" + resource;
            log.error(msg);
            throw new RuntimeException(msg);
        }
        String baseURI = this.getClass().getResource(resourceStr) != null ? this.getClass().getResource(resourceStr).toURI().toString() : new File(resourceStr).toURI().toString();
        return new ParserTask(resource, baseURI, rdfFormat);
    }

    public String toString() {
        return super.toString() + "::" + this.getCounters();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AsynchronousStatementBufferFactory(ScaleOutTripleStore tripleStore, int producerChunkSize, int valuesInitialCapacity, int bnodesInitialCapacity, RDFFormat defaultFormat, String defaultGraph, RDFParserOptions parserOptions, boolean deleteAfter, int parserPoolSize, int parserQueueCapacity, int term2IdWriterPoolSize, int otherWriterPoolSize, int notifyPoolSize, long pauseParsedPoolStatementThreshold) {
        if (tripleStore == null) {
            throw new IllegalArgumentException();
        }
        if (parserOptions == null) {
            throw new IllegalArgumentException();
        }
        if (producerChunkSize <= 0) {
            throw new IllegalArgumentException();
        }
        if (valuesInitialCapacity <= 0) {
            throw new IllegalArgumentException();
        }
        if (bnodesInitialCapacity <= 0) {
            throw new IllegalArgumentException();
        }
        if (pauseParsedPoolStatementThreshold < 0L) {
            throw new IllegalArgumentException();
        }
        this.tripleStore = tripleStore;
        this.lexiconRelation = tripleStore.getLexiconRelation();
        this.spoRelation = tripleStore.getSPORelation();
        this.producerChunkSize = producerChunkSize;
        this.valuesInitialCapacity = valuesInitialCapacity;
        this.bnodesInitialCapacity = bnodesInitialCapacity;
        this.defaultFormat = defaultFormat;
        this.defaultGraph = defaultGraph;
        this.parserOptions = parserOptions;
        this.deleteAfter = deleteAfter;
        this.pauseParserPoolStatementThreshold = pauseParsedPoolStatementThreshold;
        if (tripleStore.isStatementIdentifiers()) {
            throw new UnsupportedOperationException("SIDs not supported");
        }
        this.lock.lock();
        try {
            AsynchronousIndexWriteConfiguration config = tripleStore.getLexiconRelation().getTerm2IdIndex().getIndexMetadata().getAsynchronousIndexWriteConfiguration();
            AsynchronousStatementBufferFactory.assertLiveness(this.lexiconRelation.getTerm2IdIndex().getIndexMetadata().getName(), config);
            this.buffer_t2id = ((IScaleOutClientIndex)this.lexiconRelation.getTerm2IdIndex()).newWriteBuffer(new Term2IdWriteProcAsyncResultHandler(false), new DefaultDuplicateRemover(true), new Term2IdWriteProc.Term2IdWriteProcConstructor(false, this.lexiconRelation.isStoreBlankNodes(), this.lexiconRelation.getTermIdBitsToReverse()));
            this.buffer_id2t = ((IScaleOutClientIndex)this.lexiconRelation.getId2TermIndex()).newWriteBuffer(null, new DefaultDuplicateRemover(true), Id2TermWriteProc.Id2TermWriteProcConstructor.INSTANCE);
            config = tripleStore.getLexiconRelation().getBlobsIndex().getIndexMetadata().getAsynchronousIndexWriteConfiguration();
            AsynchronousStatementBufferFactory.assertLiveness(this.lexiconRelation.getBlobsIndex().getIndexMetadata().getName(), config);
            this.buffer_blobs = ((IScaleOutClientIndex)this.lexiconRelation.getBlobsIndex()).newWriteBuffer(new BlobsWriteProcAsyncResultHandler(false), new DefaultDuplicateRemover(true), new BlobsWriteProc.BlobsWriteProcConstructor(false, this.lexiconRelation.isStoreBlankNodes()));
            if (this.lexiconRelation.isTextIndex()) {
                BigdataValueCentricFullTextIndex tmp = (BigdataValueCentricFullTextIndex)this.lexiconRelation.getSearchEngine();
                this.buffer_text = ((IScaleOutClientIndex)tmp.getIndex()).newWriteBuffer(this.textResultHandler, new DefaultDuplicateRemover(true), TextIndexWriteProc.IndexWriteProcConstructor.NO_OVERWRITE);
                this.indexDatatypeLiterals = Boolean.parseBoolean(this.lexiconRelation.getProperties().getProperty(AbstractTripleStore.Options.TEXT_INDEX_DATATYPE_LITERALS, "true"));
            } else {
                this.buffer_text = null;
                this.indexDatatypeLiterals = false;
            }
            this.buffer_stmts = new LinkedHashMap<SPOKeyOrder, IRunnableBuffer<KVO<ISPO>[]>>(tripleStore.isQuads() ? 6 : 3);
            Iterator<SPOKeyOrder> itr = tripleStore.getSPORelation().statementKeyOrderIterator();
            while (itr.hasNext()) {
                SPOKeyOrder keyOrder = itr.next();
                IRunnableBuffer<KVO<O>[]> buffer = ((IScaleOutClientIndex)this.spoRelation.getIndex(keyOrder)).newWriteBuffer(keyOrder.isPrimaryIndex() ? this.statementResultHandler : null, new DefaultDuplicateRemover(true), SPOIndexWriteProc.IndexWriteProcConstructor.INSTANCE);
                this.buffer_stmts.put(keyOrder, buffer);
            }
        }
        finally {
            this.lock.unlock();
        }
        AbstractFederation fed = tripleStore.getIndexManager() instanceof AbstractFederation ? (AbstractFederation)tripleStore.getIndexManager() : null;
        this.parserService = new ParserThreadPoolExecutor(1, parserPoolSize, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(parserQueueCapacity), new DaemonThreadFactory(this.getClass().getName() + "_parserService"));
        this.tidsWriterService = new ThreadPoolExecutor(term2IdWriterPoolSize, term2IdWriterPoolSize, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory(this.getClass().getName() + "_term2IdWriteService"));
        this.otherWriterService = new ThreadPoolExecutor(otherWriterPoolSize, otherWriterPoolSize, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory(this.getClass().getName() + "_otherWriteService"));
        this.notifyService = new ThreadPoolExecutor(notifyPoolSize, notifyPoolSize, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory(this.getClass().getName() + "_notifyService"));
        this.serviceStatisticsTask = fed == null ? null : new ServiceStatisticsTask(fed.getScheduledExecutorService());
    }

    private static void assertLiveness(String name, AsynchronousIndexWriteConfiguration config) {
        if (config.getSinkIdleTimeoutNanos() > TimeUnit.SECONDS.toNanos(60L)) {
            log.error("Large idle timeout will not preserve liveness: index=" + name + ", config=" + config);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isAnyDone() {
        this.lock.lock();
        try {
            if (this.buffer_blobs != null && this.buffer_blobs.getFuture().isDone()) {
                boolean bl = true;
                return bl;
            }
            if (this.buffer_t2id != null && this.buffer_t2id.getFuture().isDone()) {
                boolean bl = true;
                return bl;
            }
            if (this.buffer_id2t.getFuture().isDone()) {
                boolean bl = true;
                return bl;
            }
            if (this.buffer_text != null && this.buffer_text.getFuture().isDone()) {
                boolean bl = true;
                return bl;
            }
            for (Map.Entry<SPOKeyOrder, IRunnableBuffer<KVO<ISPO>[]>> e : this.buffer_stmts.entrySet()) {
                IRunnableBuffer<KVO<ISPO>[]> buffer = e.getValue();
                if (buffer == null || !buffer.getFuture().isDone()) continue;
                boolean bl = true;
                return bl;
            }
            if (this.parserService.isTerminated()) {
                boolean bl = true;
                return bl;
            }
            if (this.tidsWriterService.isTerminated()) {
                boolean bl = true;
                return bl;
            }
            if (this.otherWriterService.isTerminated()) {
                boolean bl = true;
                return bl;
            }
            if (this.notifyService != null && this.notifyService.isTerminated()) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void cancelAll(boolean mayInterruptIfRunning) {
        if (log.isInfoEnabled()) {
            log.info("Cancelling futures.");
        }
        if (this.buffer_blobs != null) {
            this.buffer_blobs.getFuture().cancel(mayInterruptIfRunning);
        }
        if (this.buffer_t2id != null) {
            this.buffer_t2id.getFuture().cancel(mayInterruptIfRunning);
        }
        this.buffer_id2t.getFuture().cancel(mayInterruptIfRunning);
        if (this.buffer_text != null) {
            this.buffer_text.getFuture().cancel(mayInterruptIfRunning);
        }
        for (Map.Entry<SPOKeyOrder, IRunnableBuffer<KVO<ISPO>[]>> e : this.buffer_stmts.entrySet()) {
            IRunnableBuffer<KVO<ISPO>[]> buffer = e.getValue();
            if (buffer == null) continue;
            buffer.getFuture().cancel(mayInterruptIfRunning);
        }
        this.notifyEnd();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        log.info("");
        try {
            this.lock.lockInterruptibly();
            try {
                this.assertSumOfLatchs();
                this.workflowLatch_parser.await();
                this.assertSumOfLatchs();
                this.guardLatch_term2Id.await();
                if (this.buffer_t2id != null) {
                    if (log.isInfoEnabled()) {
                        log.info("Closing TERM2ID buffer.");
                    }
                    this.buffer_t2id.close();
                }
                if (this.buffer_blobs != null) {
                    if (log.isInfoEnabled()) {
                        log.info("Closing BLOBS buffer.");
                    }
                    this.buffer_blobs.close();
                }
                this.workflowLatch_bufferTids.await();
                this.tidsWriterService.shutdown();
                new ShutdownHelper(this.tidsWriterService, 10L, TimeUnit.SECONDS){

                    @Override
                    protected void logTimeout() {
                        log.warn("Waiting for term2Id write service shutdown.");
                    }
                };
                this.assertSumOfLatchs();
                this.guardLatch_other.await();
                if (log.isInfoEnabled()) {
                    log.info("Closing remaining buffers.");
                }
                this.buffer_id2t.close();
                if (this.buffer_text != null) {
                    this.buffer_text.close();
                }
                for (Map.Entry<SPOKeyOrder, IRunnableBuffer<KVO<ISPO>[]>> e : this.buffer_stmts.entrySet()) {
                    IRunnableBuffer<KVO<ISPO>[]> buffer = e.getValue();
                    if (buffer == null) continue;
                    buffer.close();
                }
                this.workflowLatch_bufferOther.await();
                this.otherWriterService.shutdown();
                new ShutdownHelper(this.otherWriterService, 10L, TimeUnit.SECONDS){

                    @Override
                    protected void logTimeout() {
                        log.warn("Waiting for other write service shutdown.");
                    }
                };
                this.assertSumOfLatchs();
                this.workflowLatch_document.await();
                this.assertSumOfLatchs();
                if (this.notifyService != null) {
                    this.guardLatch_notify.await();
                    this.notifyService.shutdown();
                    new ShutdownHelper(this.notifyService, 10L, TimeUnit.SECONDS){

                        @Override
                        protected void logTimeout() {
                            log.warn("Waiting for delete service shutdown.");
                        }
                    };
                }
            }
            finally {
                this.lock.unlock();
                this.notifyEnd();
            }
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void awaitAll() throws InterruptedException, ExecutionException {
        if (log.isInfoEnabled()) {
            log.info("Start");
        }
        this.close();
        if (log.isInfoEnabled()) {
            log.info("Awaiting futures.");
        }
        if (this.buffer_blobs != null) {
            this.buffer_blobs.getFuture().get();
        }
        if (this.buffer_t2id != null) {
            this.buffer_t2id.getFuture().get();
        }
        this.buffer_id2t.getFuture().get();
        if (this.buffer_text != null) {
            this.buffer_text.getFuture().get();
        }
        for (Map.Entry<SPOKeyOrder, IRunnableBuffer<KVO<ISPO>[]>> e : this.buffer_stmts.entrySet()) {
            IRunnableBuffer<KVO<ISPO>[]> buffer = e.getValue();
            if (buffer == null) continue;
            buffer.getFuture().get();
        }
        if (log.isInfoEnabled()) {
            log.info("Done.");
        }
    }

    protected final void documentDone(R resource) {
        block5: {
            if (!this.lock.isHeldByCurrentThread()) {
                throw new IllegalMonitorStateException();
            }
            try {
                final Runnable task = this.newSuccessTask(resource);
                if (task == null) break block5;
                this.guardLatch_notify.inc();
                try {
                    this.notifyService.submit(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                task.run();
                            }
                            finally {
                                AsynchronousStatementBufferFactory.this.lock.lock();
                                try {
                                    AsynchronousStatementBufferFactory.this.guardLatch_notify.dec();
                                }
                                finally {
                                    AsynchronousStatementBufferFactory.this.lock.unlock();
                                }
                            }
                        }
                    });
                }
                catch (RejectedExecutionException ex) {
                    this.guardLatch_notify.dec();
                    throw ex;
                }
            }
            catch (Throwable t) {
                log.error(t, t);
            }
        }
    }

    protected final void documentError(R resource, Throwable t) {
        block5: {
            if (!this.lock.isHeldByCurrentThread()) {
                throw new IllegalMonitorStateException();
            }
            this.documentErrorCount.incrementAndGet();
            this.workflowLatch_document.dec();
            try {
                final Runnable task = this.newFailureTask(resource, t);
                if (task == null) break block5;
                this.guardLatch_notify.inc();
                try {
                    this.notifyService.submit(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                task.run();
                            }
                            finally {
                                AsynchronousStatementBufferFactory.this.lock.lock();
                                try {
                                    AsynchronousStatementBufferFactory.this.guardLatch_notify.dec();
                                }
                                finally {
                                    AsynchronousStatementBufferFactory.this.lock.unlock();
                                }
                            }
                        }
                    });
                }
                catch (RejectedExecutionException ex) {
                    this.guardLatch_notify.dec();
                    throw ex;
                }
            }
            catch (Throwable ex) {
                log.error(ex, ex);
            }
        }
    }

    protected Runnable newSuccessTask(R resource) {
        if (log.isInfoEnabled()) {
            log.info("resource=" + resource);
        }
        if (this.deleteAfter) {
            return new DeleteTask(resource);
        }
        return null;
    }

    protected Runnable newFailureTask(final R resource, final Throwable cause) {
        return new Runnable(){

            @Override
            public void run() {
                log.error(resource, cause);
            }
        };
    }

    protected void deleteResource(R resource) {
        if (resource instanceof File && !((File)resource).delete()) {
            log.warn("Could not delete: " + resource);
        }
    }

    @Override
    public CounterSet getCounters() {
        CounterSet counterSet = new CounterSet();
        counterSet.addCounter("elapsedMillis", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.getElapsedMillis());
            }
        });
        counterSet.addCounter("documentsParsedCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.documentsParsedCount.get());
            }
        });
        counterSet.addCounter("documentTIDsWaitingCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.documentTIDsWaitingCount.get());
            }
        });
        counterSet.addCounter("documentTIDsReadyCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.documentTIDsReadyCount.get());
            }
        });
        counterSet.addCounter("fullTextTupleWriteCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(Long.valueOf(AsynchronousStatementBufferFactory.this.textResultHandler.getResult()));
            }
        });
        counterSet.addCounter("toldTriplesWriteCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.getStatementCount());
            }
        });
        counterSet.addCounter("toldTriplesRestartSafeCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.toldTriplesRestartSafeCount.get());
            }
        });
        counterSet.addCounter("toldTriplesRestartSafePerSec", new Instrument<Long>(){

            @Override
            protected void sample() {
                long elapsed = AsynchronousStatementBufferFactory.this.getElapsedMillis();
                double tps = (long)((double)AsynchronousStatementBufferFactory.this.toldTriplesRestartSafeCount.get() / (double)elapsed * 1000.0);
                this.setValue((long)tps);
            }
        });
        counterSet.addCounter("documentRestartSafeCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.documentRestartSafeCount.get());
            }
        });
        counterSet.addCounter("documentErrorCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.documentErrorCount.get());
            }
        });
        CounterSet workflowLatchSet = counterSet.makePath("workflowLatch");
        workflowLatchSet.addCounter("parser", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.workflowLatch_parser.get());
            }
        });
        workflowLatchSet.addCounter("bufferTids", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.workflowLatch_bufferTids.get());
            }
        });
        workflowLatchSet.addCounter("bufferOther", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.workflowLatch_bufferOther.get());
            }
        });
        workflowLatchSet.addCounter("document", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.workflowLatch_document.get());
            }
        });
        CounterSet bufferGuardSet = counterSet.makePath("bufferGuard");
        bufferGuardSet.addCounter("guardTerm2Id", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.guardLatch_term2Id.get());
            }
        });
        bufferGuardSet.addCounter("guardOther", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.guardLatch_other.get());
            }
        });
        bufferGuardSet.addCounter("guardNotify", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.guardLatch_notify.get());
            }
        });
        CounterSet pauseSet = counterSet.makePath("pause");
        pauseSet.addCounter("outstandingStatementCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.outstandingStatementCount.get());
            }
        });
        pauseSet.addCounter("unbufferedStatementCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.unbufferedStatementCount.get());
            }
        });
        pauseSet.addCounter("pauseParserPoolStatementThreshold", new OneShotInstrument<Long>(this.pauseParserPoolStatementThreshold));
        pauseSet.addCounter("pausedThreadCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.pausedThreadCount.get());
            }
        });
        pauseSet.addCounter("poolPausedCount", new Instrument<Long>(){

            @Override
            protected void sample() {
                this.setValue(AsynchronousStatementBufferFactory.this.poolPausedCount.get());
            }
        });
        counterSet.makePath("services").attach(this.serviceStatisticsTask.getCounters());
        return counterSet;
    }

    static <V extends BigdataValue> IChunkedIterator<V> newValuesIterator(final LexiconRelation r, Iterator<V> itr, int chunkSize) {
        return new ChunkedWrappedIterator<BigdataValue>(new Striterator(itr).addFilter(new Filter(){
            private static final long serialVersionUID = 1L;

            @Override
            public boolean isValid(Object obj) {
                return r.getInlineIV((Value)obj) == null;
            }
        }), chunkSize, BigdataValue.class);
    }

    private static <V extends BigdataValue> IChunkedIterator<V> newId2TIterator(final LexiconRelation r, Iterator<V> itr, int chunkSize) {
        return new ChunkedWrappedIterator<BigdataValue>(new Striterator(itr).addFilter(new Filter(){
            private static final long serialVersionUID = 1L;

            @Override
            public boolean isValid(Object obj) {
                BigdataValue v = (BigdataValue)obj;
                if (v instanceof BNode) {
                    return false;
                }
                return !r.isBlob(v);
            }
        }), chunkSize, BigdataValue.class);
    }

    private static <V extends BigdataValue> IChunkedIterator<V> newTextIterator(LexiconRelation r, Iterator<V> itr, int chunkSize, final boolean indexDatatypeLiterals) {
        return new ChunkedWrappedIterator<BigdataValue>(new Striterator(itr).addFilter(new Filter(){
            private static final long serialVersionUID = 1L;

            @Override
            public boolean isValid(Object obj) {
                if (!(obj instanceof BigdataLiteral)) {
                    return false;
                }
                BigdataLiteral lit = (BigdataLiteral)obj;
                return indexDatatypeLiterals || lit.getDatatype() == null;
            }
        }), chunkSize, BigdataValue.class);
    }

    private class ParserThreadPoolExecutor
    extends ThreadPoolExecutor {
        public ParserThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }

        private boolean isPaused() {
            return AsynchronousStatementBufferFactory.this.unbufferedStatementCount.get() > AsynchronousStatementBufferFactory.this.pauseParserPoolStatementThreshold;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            AsynchronousStatementBufferFactory.this.lock.lock();
            try {
                if (this.isPaused()) {
                    AsynchronousStatementBufferFactory.this.pausedThreadCount.incrementAndGet();
                    AsynchronousStatementBufferFactory.this.poolPausedCount.incrementAndGet();
                    if (log.isInfoEnabled()) {
                        log.info("PAUSE : " + AsynchronousStatementBufferFactory.this.toString());
                    }
                    while (this.isPaused()) {
                        AsynchronousStatementBufferFactory.this.unpaused.await();
                    }
                    AsynchronousStatementBufferFactory.this.pausedThreadCount.decrementAndGet();
                    if (log.isInfoEnabled()) {
                        log.info("RESUME: " + AsynchronousStatementBufferFactory.this.toString());
                    }
                }
            }
            catch (InterruptedException ie) {
                t.interrupt();
            }
            finally {
                AsynchronousStatementBufferFactory.this.lock.unlock();
            }
            super.beforeExecute(t, r);
        }
    }

    private class BufferOtherWritesTask
    implements Callable<Void> {
        private final AsynchronousStatementBufferImpl buffer;

        public BufferOtherWritesTask(AsynchronousStatementBufferImpl buffer) {
            if (buffer == null) {
                throw new IllegalArgumentException();
            }
            this.buffer = buffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            AsynchronousStatementBufferFactory.this.lock.lock();
            try {
                AsynchronousStatementBufferFactory.this.guardLatch_other.inc();
                AsynchronousStatementBufferFactory.this.workflowLatch_bufferTids.dec();
                AsynchronousStatementBufferFactory.this.workflowLatch_bufferOther.inc();
                AsynchronousStatementBufferFactory.this.assertSumOfLatchs();
            }
            finally {
                AsynchronousStatementBufferFactory.this.lock.unlock();
            }
            try {
                this.buffer.bufferOtherWrites();
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    AsynchronousStatementBufferFactory.this.guardLatch_other.dec();
                }
                finally {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                }
                return null;
            }
            catch (Throwable t) {
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    AsynchronousStatementBufferFactory.this.guardLatch_other.dec();
                    AsynchronousStatementBufferFactory.this.workflowLatch_bufferOther.dec();
                    AsynchronousStatementBufferFactory.this.documentError(this.buffer.getDocumentIdentifier(), t);
                    AsynchronousStatementBufferFactory.this.outstandingStatementCount.addAndGet(-this.buffer.statementCount);
                    if (AsynchronousStatementBufferFactory.this.unbufferedStatementCount.addAndGet(-this.buffer.statementCount) <= AsynchronousStatementBufferFactory.this.pauseParserPoolStatementThreshold) {
                        AsynchronousStatementBufferFactory.this.unpaused.signalAll();
                    }
                    throw new Exception(t);
                }
                catch (Throwable throwable) {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                    throw throwable;
                }
            }
        }
    }

    private class BufferTidWrites
    implements Callable<Void> {
        private final AsynchronousStatementBufferImpl buffer;

        public BufferTidWrites(AsynchronousStatementBufferImpl buffer) {
            if (buffer == null) {
                throw new IllegalArgumentException();
            }
            this.buffer = buffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            try {
                this.buffer.bufferTidWrites();
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    AsynchronousStatementBufferFactory.this.guardLatch_term2Id.dec();
                }
                finally {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                }
                return null;
            }
            catch (Throwable t) {
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    AsynchronousStatementBufferFactory.this.guardLatch_term2Id.dec();
                    AsynchronousStatementBufferFactory.this.workflowLatch_bufferTids.dec();
                    AsynchronousStatementBufferFactory.this.documentTIDsWaitingCount.decrementAndGet();
                    AsynchronousStatementBufferFactory.this.documentError(this.buffer.getDocumentIdentifier(), t);
                    AsynchronousStatementBufferFactory.this.outstandingStatementCount.addAndGet(-this.buffer.statementCount);
                    if (AsynchronousStatementBufferFactory.this.unbufferedStatementCount.addAndGet(-this.buffer.statementCount) <= AsynchronousStatementBufferFactory.this.pauseParserPoolStatementThreshold) {
                        AsynchronousStatementBufferFactory.this.unpaused.signalAll();
                    }
                    throw new Exception(t);
                }
                catch (Throwable throwable) {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                    throw throwable;
                }
            }
        }
    }

    protected class AsynchronousStatementBufferImpl
    implements IStatementBuffer<S> {
        private final R resource;
        private final AbstractTripleStore database;
        private final BigdataValueFactory valueFactory;
        private LinkedHashMap<Value, BigdataValue> values;
        private final AtomicReference<Map<String, BigdataBNode>> bnodes = new AtomicReference();
        private int statementCount;
        private UnsynchronizedUnboundedChunkBuffer<S> statements;

        @Override
        public final AbstractTripleStore getDatabase() {
            return this.database;
        }

        @Override
        public AbstractTripleStore getStatementStore() {
            return null;
        }

        @Override
        public boolean isEmpty() {
            return this.statementCount == 0;
        }

        @Override
        public int size() {
            return this.statementCount;
        }

        public R getDocumentIdentifier() {
            return this.resource;
        }

        protected AsynchronousStatementBufferImpl(R resource) {
            this.resource = resource;
            this.database = AsynchronousStatementBufferFactory.this.tripleStore;
            this.valueFactory = this.database.getValueFactory();
        }

        @Override
        public long flush() {
            return 0L;
        }

        @Override
        public void reset() {
            if (log.isInfoEnabled()) {
                log.info("resource=" + this.getDocumentIdentifier());
            }
            this.bnodes.set(null);
            this.values = null;
            this.statements = null;
            this.statementCount = 0;
        }

        @Override
        public void setBNodeMap(Map<String, BigdataBNode> bnodes) {
            if (bnodes == null) {
                throw new IllegalArgumentException();
            }
            if (!this.bnodes.compareAndSet(null, bnodes)) {
                throw new IllegalStateException();
            }
        }

        @Override
        public void add(Resource s, URI p, Value o) {
            this.add(s, p, o, null, StatementEnum.Explicit);
        }

        @Override
        public void add(Resource s, URI p, Value o, Resource c) {
            this.add(s, p, o, c, StatementEnum.Explicit);
        }

        @Override
        public void add(Resource s, URI p, Value o, Resource c, StatementEnum type) {
            this.handleStatement(s, p, o, c, type);
        }

        @Override
        public void add(S e) {
            this.add(e.getSubject(), e.getPredicate(), e.getObject(), e.getContext(), e instanceof BigdataStatement ? e.getStatementType() : null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private BigdataBNode getCanonicalBNode(BigdataBNodeImpl bnode) {
            String id = bnode.getID();
            Map<String, BigdataBNode> bnodes = this.bnodes.get();
            if (bnodes == null) {
                this.setBNodeMap(new HashMap<String, BigdataBNode>(AsynchronousStatementBufferFactory.this.bnodesInitialCapacity));
                bnodes = this.bnodes.get();
                if (bnodes == null) {
                    throw new AssertionError();
                }
            }
            if (bnodes instanceof ConcurrentHashMap) {
                BigdataBNode tmp = ((ConcurrentHashMap)bnodes).putIfAbsent(id, bnode);
                if (tmp != null) {
                    return tmp;
                }
                if (log.isTraceEnabled()) {
                    log.trace("added: " + bnode);
                }
                return bnode;
            }
            Map<String, BigdataBNode> map = bnodes;
            synchronized (map) {
                BigdataBNode tmp = bnodes.get(id);
                if (tmp != null) {
                    return tmp;
                }
                bnodes.put(id, bnode);
                if (log.isTraceEnabled()) {
                    log.trace("added: " + bnode);
                }
                return bnode;
            }
        }

        private BigdataValue getCanonicalValue(BigdataValue term0) {
            BigdataValue tmp;
            if (term0 == null) {
                return term0;
            }
            BigdataValue term = term0 instanceof BNode ? this.getCanonicalBNode((BigdataBNodeImpl)term0) : term0;
            if (this.values == null) {
                this.values = new LinkedHashMap(AsynchronousStatementBufferFactory.this.valuesInitialCapacity);
            }
            if ((tmp = this.values.get(term)) != null) {
                return tmp;
            }
            if (this.values.put(term, term) != null) {
                throw new AssertionError();
            }
            if (log.isTraceEnabled()) {
                log.trace("n=" + this.values.size() + ", added: " + term);
            }
            return term;
        }

        private void handleStatement(Resource s, URI p, Value o, Resource c, StatementEnum type) {
            this._handleStatement((Resource)((Object)this.getCanonicalValue(this.valueFactory.asValue(s))), (URI)((Object)this.getCanonicalValue(this.valueFactory.asValue(p))), this.getCanonicalValue(this.valueFactory.asValue(o)), (Resource)((Object)this.getCanonicalValue(this.valueFactory.asValue(c))), type);
        }

        private void _handleStatement(Resource s, URI p, Value o, Resource c, StatementEnum type) {
            BigdataStatement stmt = this.valueFactory.createStatement((BigdataResource)s, (BigdataURI)p, (BigdataValue)o, (BigdataResource)c, type);
            if (this.statements == null) {
                this.statements = new UnsynchronizedUnboundedChunkBuffer<BigdataStatement>(AsynchronousStatementBufferFactory.this.producerChunkSize, BigdataStatement.class);
            }
            this.statements.add(stmt);
            ++this.statementCount;
            if (log.isTraceEnabled()) {
                log.trace("n=" + this.statementCount + ", added: " + stmt);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void bufferTidWrites() throws Exception {
            if (log.isInfoEnabled()) {
                Map<String, BigdataBNode> bnodes = this.bnodes.get();
                int bnodeCount = bnodes == null ? 0 : bnodes.size();
                log.info("bnodeCount=" + bnodeCount + ", values=" + this.values.size() + ", statementCount=" + this.statementCount);
            }
            if (AsynchronousStatementBufferFactory.this.isAnyDone()) {
                throw new RuntimeException("Factory closed?");
            }
            KVOLatch tidsLatch = new KVOLatch(){

                @Override
                public String toString() {
                    return super.toString() + " : tidsLatch";
                }

                @Override
                protected void signal() throws InterruptedException {
                    super.signal();
                    AsynchronousStatementBufferFactory.this.documentTIDsWaitingCount.decrementAndGet();
                    AsynchronousStatementBufferFactory.this.documentTIDsReadyCount.incrementAndGet();
                    AsynchronousStatementBufferFactory.this.otherWriterService.submit(new BufferOtherWritesTask(AsynchronousStatementBufferImpl.this));
                }
            };
            tidsLatch.inc();
            try {
                AsyncTerm2IdIndexWriteTask task1 = new AsyncTerm2IdIndexWriteTask(tidsLatch, AsynchronousStatementBufferFactory.this.lexiconRelation, AsynchronousStatementBufferFactory.newValuesIterator(AsynchronousStatementBufferFactory.this.lexiconRelation, this.values.values().iterator(), AsynchronousStatementBufferFactory.this.producerChunkSize), AsynchronousStatementBufferFactory.this.buffer_t2id, AsynchronousStatementBufferFactory.this.buffer_blobs);
                task1.call();
            }
            finally {
                tidsLatch.dec();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void bufferOtherWrites() throws InterruptedException, ExecutionException {
            List futures;
            if (log.isDebugEnabled()) {
                log.debug("Writing on remaining indices.");
            }
            LinkedList<Callable<Void>> tasks = new LinkedList<Callable<Void>>();
            final int toldTriplesThisDocument = this.statementCount;
            KVOLatch documentRestartSafeLatch = new KVOLatch(){

                @Override
                public String toString() {
                    return super.toString() + " : documentRestartSafeLatch";
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                protected void signal() throws InterruptedException {
                    super.signal();
                    AsynchronousStatementBufferFactory.this.lock.lock();
                    try {
                        AsynchronousStatementBufferFactory.this.workflowLatch_bufferOther.dec();
                        AsynchronousStatementBufferFactory.this.workflowLatch_document.dec();
                        AsynchronousStatementBufferFactory.this.assertSumOfLatchs();
                        AsynchronousStatementBufferFactory.this.documentRestartSafeCount.incrementAndGet();
                        AsynchronousStatementBufferFactory.this.toldTriplesRestartSafeCount.addAndGet(toldTriplesThisDocument);
                        AsynchronousStatementBufferFactory.this.outstandingStatementCount.addAndGet(-toldTriplesThisDocument);
                        AsynchronousStatementBufferFactory.this.documentDone(AsynchronousStatementBufferImpl.this.getDocumentIdentifier());
                    }
                    finally {
                        AsynchronousStatementBufferFactory.this.lock.unlock();
                    }
                }
            };
            tasks.add(new AsyncId2TermIndexWriteTask(documentRestartSafeLatch, this.valueFactory, AsynchronousStatementBufferFactory.newId2TIterator(AsynchronousStatementBufferFactory.this.lexiconRelation, this.values.values().iterator(), AsynchronousStatementBufferFactory.this.producerChunkSize), AsynchronousStatementBufferFactory.this.buffer_id2t));
            if (AsynchronousStatementBufferFactory.this.buffer_text != null) {
                tasks.add(new AsyncTextIndexWriteTask(documentRestartSafeLatch, (BigdataValueCentricFullTextIndex)AsynchronousStatementBufferFactory.this.lexiconRelation.getSearchEngine(), AsynchronousStatementBufferFactory.newTextIterator(AsynchronousStatementBufferFactory.this.lexiconRelation, this.values.values().iterator(), AsynchronousStatementBufferFactory.this.producerChunkSize, AsynchronousStatementBufferFactory.this.indexDatatypeLiterals), AsynchronousStatementBufferFactory.this.buffer_text));
            }
            for (Map.Entry e : AsynchronousStatementBufferFactory.this.buffer_stmts.entrySet()) {
                SPOKeyOrder keyOrder = (SPOKeyOrder)e.getKey();
                IRunnableBuffer buffer = (IRunnableBuffer)e.getValue();
                tasks.add(new AsyncSPOIndexWriteTask(documentRestartSafeLatch, keyOrder, AsynchronousStatementBufferFactory.this.spoRelation, this.statements.iterator(), buffer));
            }
            documentRestartSafeLatch.inc();
            try {
                futures = AsynchronousStatementBufferFactory.this.tripleStore.getExecutorService().invokeAll(tasks);
            }
            finally {
                documentRestartSafeLatch.dec();
            }
            try {
                for (Future f : futures) {
                    f.get();
                }
            }
            finally {
                this.reset();
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    if (AsynchronousStatementBufferFactory.this.unbufferedStatementCount.addAndGet(-toldTriplesThisDocument) <= AsynchronousStatementBufferFactory.this.pauseParserPoolStatementThreshold) {
                        AsynchronousStatementBufferFactory.this.unpaused.signalAll();
                    }
                }
                finally {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                }
            }
        }
    }

    static class AsyncSPOIndexWriteTask
    implements Callable<Void> {
        protected static final transient Logger log = Logger.getLogger(AsyncSPOIndexWriteTask.class);
        private final KVOLatch latch;
        private final IKeyOrder<ISPO> keyOrder;
        private final IChunkedOrderedIterator src;
        private final IRunnableBuffer<KVO<ISPO>[]> writeBuffer;
        private final SPOTupleSerializer tupleSer;

        public AsyncSPOIndexWriteTask(KVOLatch latch, IKeyOrder<ISPO> keyOrder, SPORelation spoRelation, IChunkedOrderedIterator src, IRunnableBuffer<KVO<ISPO>[]> writeBuffer) {
            if (latch == null) {
                throw new IllegalArgumentException();
            }
            if (keyOrder == null) {
                throw new IllegalArgumentException();
            }
            if (writeBuffer == null) {
                throw new IllegalArgumentException();
            }
            this.latch = latch;
            this.keyOrder = keyOrder;
            this.src = src;
            this.writeBuffer = writeBuffer;
            this.tupleSer = (SPOTupleSerializer)spoRelation.getIndex((IKeyOrder<? extends ISPO>)keyOrder).getIndexMetadata().getTupleSerializer();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            long chunksOut = 0L;
            long elementsOut = 0L;
            this.latch.inc();
            try {
                while (this.src.hasNext()) {
                    ISPO[] chunk = this.src.nextChunk(this.keyOrder);
                    Object[] a = new KVOC[chunk.length];
                    for (int i = 0; i < chunk.length; ++i) {
                        ISPO spo = chunk[i];
                        if (spo == null) {
                            throw new IllegalArgumentException();
                        }
                        if (!spo.isFullyBound()) {
                            throw new IllegalArgumentException("Not fully bound: " + spo.toString());
                        }
                        byte[] key = this.tupleSer.serializeKey(spo);
                        byte[] val = this.tupleSer.serializeVal(spo);
                        a[i] = new KVOC<Object>(key, val, null, this.latch);
                    }
                    Arrays.sort(a);
                    this.writeBuffer.add((KVO<ISPO>[])a);
                    ++chunksOut;
                    elementsOut += (long)a.length;
                    if (log.isDebugEnabled()) {
                        log.debug("Wrote chunk: index=" + this.keyOrder + ", chunksOut=" + chunksOut + ", elementsOut=" + elementsOut + ", chunkSize=" + a.length);
                    }
                    if (!log.isTraceEnabled()) continue;
                    log.trace("Wrote: index=" + this.keyOrder + ", chunk=" + Arrays.toString(a));
                }
            }
            finally {
                this.latch.dec();
            }
            if (log.isDebugEnabled()) {
                log.debug("Done: index=" + this.keyOrder + ", chunksOut=" + chunksOut + ", elementsOut=" + elementsOut);
            }
            return null;
        }
    }

    static class AsyncTextIndexWriteTask
    implements Callable<Void> {
        protected static final transient Logger log = Logger.getLogger(AsyncTextIndexWriteTask.class);
        private final KVOLatch latch;
        private final BigdataValueCentricFullTextIndex textIndex;
        private final IChunkedIterator<BigdataValue> src;
        private final IRunnableBuffer<KVO<BigdataValue>[]> buffer;

        public AsyncTextIndexWriteTask(KVOLatch latch, BigdataValueCentricFullTextIndex textIndex, IChunkedIterator<BigdataValue> src, IRunnableBuffer<KVO<BigdataValue>[]> buffer) {
            if (latch == null) {
                throw new IllegalArgumentException();
            }
            if (textIndex == null) {
                throw new IllegalArgumentException();
            }
            if (src == null) {
                throw new IllegalArgumentException();
            }
            if (buffer == null) {
                throw new IllegalArgumentException();
            }
            this.latch = latch;
            this.textIndex = textIndex;
            this.src = src;
            this.buffer = buffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            this.latch.inc();
            try {
                int capacity = 100000;
                this.textIndex.index(100000, this.src);
            }
            finally {
                this.latch.dec();
            }
            return null;
        }
    }

    static class AsyncId2TermIndexWriteTask
    implements Callable<Void> {
        protected static final transient Logger log = Logger.getLogger(AsyncId2TermIndexWriteTask.class);
        private final KVOLatch latch;
        private final BigdataValueFactory valueFactory;
        private final IChunkedIterator<BigdataValue> src;
        private final IRunnableBuffer<KVO<BigdataValue>[]> buffer;

        public AsyncId2TermIndexWriteTask(KVOLatch latch, BigdataValueFactory valueFactory, IChunkedIterator<BigdataValue> src, IRunnableBuffer<KVO<BigdataValue>[]> buffer) {
            if (latch == null) {
                throw new IllegalArgumentException();
            }
            if (valueFactory == null) {
                throw new IllegalArgumentException();
            }
            if (src == null) {
                throw new IllegalArgumentException();
            }
            if (buffer == null) {
                throw new IllegalArgumentException();
            }
            this.latch = latch;
            this.valueFactory = valueFactory;
            this.src = src;
            this.buffer = buffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            BigdataValueSerializer<BigdataValue> ser = this.valueFactory.getValueSerializer();
            IKeyBuilder tmp = KeyBuilder.newInstance(8);
            DataOutputBuffer out = new DataOutputBuffer();
            ByteArrayBuffer tbuf = new ByteArrayBuffer();
            this.latch.inc();
            try {
                while (this.src.hasNext()) {
                    BigdataValue[] chunkIn = this.src.nextChunk();
                    KVOC[] chunkOut = new KVOC[chunkIn.length];
                    int i = 0;
                    for (BigdataValue v : chunkIn) {
                        assert (v != null);
                        if (v instanceof BNode) continue;
                        if (v.getIV() == null) {
                            throw new RuntimeException("No TID: " + v);
                        }
                        if (v.getIV().isInline()) continue;
                        byte[] key = v.getIV().encode(tmp.reset()).getKey();
                        byte[] val = ser.serialize((BigdataValueImpl)v, out.reset(), tbuf);
                        chunkOut[i++] = new KVOC<Object>(key, val, null, this.latch);
                    }
                    Object[] dense = KVO.dense(chunkOut, i);
                    Arrays.sort(dense);
                    this.buffer.add((KVO<BigdataValue>[])dense);
                }
            }
            finally {
                this.latch.dec();
            }
            return null;
        }
    }

    static class AsyncTerm2IdIndexWriteTask
    implements Callable<Void> {
        protected static final transient Logger log = Logger.getLogger(AsyncTerm2IdIndexWriteTask.class);
        private final KVOLatch latch;
        private final IChunkedIterator<BigdataValue> src;
        private final LexiconRelation lexiconRelation;
        private final Term2IdTupleSerializer tupleSerTerm2Id;
        private final IRunnableBuffer<KVO<BigdataValue>[]> bufferTerm2Id;
        private final IRunnableBuffer<KVO<BigdataValue>[]> bufferBlobs;

        public AsyncTerm2IdIndexWriteTask(KVOLatch latch, LexiconRelation r, IChunkedIterator<BigdataValue> src, IRunnableBuffer<KVO<BigdataValue>[]> bufferTerm2Id, IRunnableBuffer<KVO<BigdataValue>[]> bufferBlobs) {
            if (latch == null) {
                throw new IllegalArgumentException();
            }
            if (r == null) {
                throw new IllegalArgumentException();
            }
            if (src == null) {
                throw new IllegalArgumentException();
            }
            if (bufferTerm2Id == null && bufferBlobs == null) {
                throw new IllegalArgumentException();
            }
            this.latch = latch;
            this.lexiconRelation = r;
            this.tupleSerTerm2Id = bufferTerm2Id == null ? null : (Term2IdTupleSerializer)r.getIndex(LexiconKeyOrder.TERM2ID).getIndexMetadata().getTupleSerializer();
            this.src = src;
            this.bufferTerm2Id = bufferTerm2Id;
            this.bufferBlobs = bufferBlobs;
        }

        private boolean isBlob(BigdataValue v) {
            return this.lexiconRelation.isBlob(v);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            LexiconKeyBuilder keyBuilderTerm2Id = this.bufferTerm2Id == null ? null : this.tupleSerTerm2Id.getLexiconKeyBuilder();
            BigdataValueSerializer<BigdataValue> valSer = this.lexiconRelation.getValueFactory().getValueSerializer();
            BlobsIndexHelper h = new BlobsIndexHelper();
            IKeyBuilder keyBuilder = h.newKeyBuilder();
            DataOutputBuffer out = new DataOutputBuffer(512);
            ByteArrayBuffer tmp = new ByteArrayBuffer(512);
            this.latch.inc();
            try {
                ArrayList<KVOC<BigdataValue>> terms = null;
                ArrayList<KVOC<BigdataValue>> blobs = null;
                while (this.src.hasNext()) {
                    Object[] a;
                    BigdataValue[] chunkIn;
                    for (BigdataValue v : chunkIn = this.src.nextChunk()) {
                        if (this.bufferBlobs != null && this.isBlob(v)) {
                            byte[] key = h.makePrefixKey(keyBuilder.reset(), v);
                            byte[] val = valSer.serialize(v, out.reset(), tmp);
                            if (blobs == null) {
                                blobs = new ArrayList<KVOC<BigdataValue>>();
                            }
                            blobs.add(new KVOC<BigdataValue>(key, val, v, this.latch));
                            continue;
                        }
                        if (terms == null) {
                            terms = new ArrayList<KVOC<BigdataValue>>(chunkIn.length);
                        }
                        terms.add(new KVOC<BigdataValue>(keyBuilderTerm2Id.value2Key(v), null, v, this.latch));
                    }
                    if (terms != null && !terms.isEmpty()) {
                        a = terms.toArray(new KVOC[terms.size()]);
                        Arrays.sort(a);
                        if (log.isInfoEnabled()) {
                            log.info("Adding chunk to TERM2ID master: chunkSize=" + a.length);
                        }
                        this.bufferTerm2Id.add((KVO<BigdataValue>[])a);
                        terms.clear();
                    }
                    if (blobs == null || blobs.isEmpty()) continue;
                    a = blobs.toArray(new KVOC[blobs.size()]);
                    Arrays.sort(a);
                    if (log.isInfoEnabled()) {
                        log.info("Adding chunk to BLOBS master: chunkSize=" + a.length);
                    }
                    this.bufferBlobs.add((KVO<BigdataValue>[])a);
                    blobs.clear();
                }
            }
            finally {
                this.latch.dec();
            }
            return null;
        }
    }

    private static class BlobsWriteProcAsyncResultHandler
    implements IAsyncResultHandler<BlobsWriteProc.Result, Void, BigdataValue, KVO<BigdataValue>> {
        private final boolean readOnly;

        public BlobsWriteProcAsyncResultHandler(boolean readOnly) {
            this.readOnly = readOnly;
        }

        @Override
        public void aggregate(BlobsWriteProc.Result result, Split split) {
        }

        public void aggregateAsync(KVO<BigdataValue>[] chunk, BlobsWriteProc.Result result, Split split) {
            for (int i = 0; i < chunk.length; ++i) {
                KVOList tmp;
                int counter = result.counters[i];
                if (counter == Integer.MIN_VALUE) {
                    if (!this.readOnly) {
                        throw new AssertionError();
                    }
                    continue;
                }
                BigdataValue value = (BigdataValue)chunk[i].obj;
                BlobIV iv = new BlobIV(VTE.valueOf(value), value.hashCode(), (short)counter);
                value.setIV(iv);
                if (chunk[i] instanceof KVOList && !(tmp = (KVOList)chunk[i]).isDuplicateListEmpty()) {
                    tmp.map(new AssignTermId(iv));
                }
                if (!log.isDebugEnabled()) continue;
                log.debug("termId=" + iv + ", term=" + chunk[i].obj);
            }
        }

        @Override
        public Void getResult() {
            return null;
        }
    }

    private static class Term2IdWriteProcAsyncResultHandler
    implements IAsyncResultHandler<Term2IdWriteProc.Result, Void, BigdataValue, KVO<BigdataValue>> {
        private final boolean readOnly;

        public Term2IdWriteProcAsyncResultHandler(boolean readOnly) {
            this.readOnly = readOnly;
        }

        @Override
        public void aggregate(Term2IdWriteProc.Result result, Split split) {
        }

        public void aggregateAsync(KVO<BigdataValue>[] chunk, Term2IdWriteProc.Result result, Split split) {
            for (int i = 0; i < chunk.length; ++i) {
                KVOList tmp;
                IV iv = result.ivs[i];
                if (iv == null) {
                    if (!this.readOnly) {
                        throw new AssertionError();
                    }
                    continue;
                }
                ((BigdataValue)chunk[i].obj).setIV(iv);
                if (chunk[i] instanceof KVOList && !(tmp = (KVOList)chunk[i]).isDuplicateListEmpty()) {
                    tmp.map(new AssignTermId(iv));
                }
                if (!log.isDebugEnabled()) continue;
                log.debug("termId=" + iv + ", term=" + chunk[i].obj);
            }
        }

        @Override
        public Void getResult() {
            return null;
        }
    }

    private class RunnableFileSystemLoader
    implements Callable<Integer> {
        private int count = 0;
        final File fileOrDir;
        final FilenameFilter filter;
        final long retryMillis;

        public RunnableFileSystemLoader(File fileOrDir, FilenameFilter filter, long retryMillis) {
            if (fileOrDir == null) {
                throw new IllegalArgumentException();
            }
            if (retryMillis < 0L) {
                throw new IllegalArgumentException();
            }
            this.fileOrDir = fileOrDir;
            this.filter = filter;
            this.retryMillis = retryMillis;
        }

        @Override
        public Integer call() throws Exception {
            this.process2(this.fileOrDir);
            return this.count;
        }

        private void process2(File file) throws InterruptedException {
            if (file.isHidden()) {
                return;
            }
            if (file.isDirectory()) {
                File[] files;
                if (log.isInfoEnabled()) {
                    log.info("Scanning directory: " + file);
                }
                for (File f : files = this.filter == null ? file.listFiles() : file.listFiles(this.filter)) {
                    this.process2(f);
                }
            } else {
                if (log.isInfoEnabled()) {
                    log.info("Will load: " + file);
                }
                try {
                    AsynchronousStatementBufferFactory.this.submitOne(file, this.retryMillis);
                    ++this.count;
                    return;
                }
                catch (InterruptedException ex) {
                    throw ex;
                }
                catch (Exception ex) {
                    log.error(file, ex);
                }
            }
        }
    }

    protected class DeleteTask
    implements Runnable {
        private final R resource;

        public DeleteTask(R resource) {
            if (resource == null) {
                throw new IllegalArgumentException();
            }
            this.resource = resource;
        }

        @Override
        public void run() {
            AsynchronousStatementBufferFactory.this.deleteResource(this.resource);
        }
    }

    private class ServiceStatisticsTask
    implements Runnable {
        private final Map<String, ThreadPoolExecutorBaseStatisticsTask> tasks = new LinkedHashMap<String, ThreadPoolExecutorBaseStatisticsTask>();
        private final ScheduledFuture<?> serviceStatisticsFuture;

        public ServiceStatisticsTask(ScheduledExecutorService scheduledService) {
            this.tasks.put("parserService", new ThreadPoolExecutorBaseStatisticsTask(AsynchronousStatementBufferFactory.this.parserService));
            this.tasks.put("term2IdWriterService", new ThreadPoolExecutorBaseStatisticsTask(AsynchronousStatementBufferFactory.this.tidsWriterService));
            this.tasks.put("otherWriterService", new ThreadPoolExecutorBaseStatisticsTask(AsynchronousStatementBufferFactory.this.otherWriterService));
            this.tasks.put("notifyService", new ThreadPoolExecutorBaseStatisticsTask(AsynchronousStatementBufferFactory.this.notifyService));
            this.serviceStatisticsFuture = scheduledService.scheduleWithFixedDelay(this, 0L, 1000L, TimeUnit.MILLISECONDS);
        }

        protected void finalize() throws Exception {
            this.cancel();
        }

        public void cancel() {
            this.serviceStatisticsFuture.cancel(true);
        }

        @Override
        public void run() {
            for (Runnable runnable : this.tasks.values()) {
                try {
                    runnable.run();
                }
                catch (Throwable t) {
                    log.error(runnable, t);
                }
            }
        }

        public CounterSet getCounters() {
            CounterSet counterSet = new CounterSet();
            for (Map.Entry<String, ThreadPoolExecutorBaseStatisticsTask> e : this.tasks.entrySet()) {
                counterSet.makePath(e.getKey()).attach(e.getValue().getCounters());
            }
            return counterSet;
        }
    }

    protected class ParserTask
    implements Callable<Void> {
        private final R resource;
        private final String baseURL;
        private final RDFFormat rdfFormat;

        public ParserTask(R resource, String baseURL, RDFFormat rdfFormat) {
            if (resource == null) {
                throw new IllegalArgumentException();
            }
            if (baseURL == null) {
                throw new IllegalArgumentException();
            }
            this.resource = resource;
            this.baseURL = baseURL;
            this.rdfFormat = rdfFormat;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            AsynchronousStatementBufferImpl buffer = AsynchronousStatementBufferFactory.this.newStatementBuffer(this.resource);
            try {
                try (InputStream rdfStream = AsynchronousStatementBufferFactory.this.getInputStream(this.resource);
                     BufferedReader reader = new BufferedReader(new InputStreamReader(rdfStream));){
                    new PresortRioLoader(buffer).loadRdf(reader, this.baseURL, this.rdfFormat, AsynchronousStatementBufferFactory.this.defaultGraph == null ? this.baseURL : AsynchronousStatementBufferFactory.this.defaultGraph, AsynchronousStatementBufferFactory.this.parserOptions);
                }
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    AsynchronousStatementBufferFactory.this.documentsParsedCount.incrementAndGet();
                    AsynchronousStatementBufferFactory.this.guardLatch_term2Id.inc();
                    AsynchronousStatementBufferFactory.this.workflowLatch_parser.dec();
                    AsynchronousStatementBufferFactory.this.workflowLatch_bufferTids.inc();
                    AsynchronousStatementBufferFactory.this.documentTIDsWaitingCount.incrementAndGet();
                    AsynchronousStatementBufferFactory.this.assertSumOfLatchs();
                    AsynchronousStatementBufferFactory.this.tidsWriterService.submit(new BufferTidWrites(buffer));
                    AsynchronousStatementBufferFactory.this.outstandingStatementCount.addAndGet(buffer.statementCount);
                    AsynchronousStatementBufferFactory.this.unbufferedStatementCount.addAndGet(buffer.statementCount);
                }
                finally {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                }
            }
            catch (Throwable ex) {
                AsynchronousStatementBufferFactory.this.lock.lock();
                try {
                    AsynchronousStatementBufferFactory.this.workflowLatch_parser.dec();
                    AsynchronousStatementBufferFactory.this.documentError(this.resource, ex);
                    throw new Exception(ex);
                }
                catch (Throwable throwable) {
                    AsynchronousStatementBufferFactory.this.lock.unlock();
                    throw throwable;
                }
            }
            if (log.isInfoEnabled()) {
                log.info("resource=" + this.resource + " : " + this);
            }
            return null;
        }
    }
}

