/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.relation.rule.eval.pipeline;

import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IPredicate;
import com.bigdata.bop.IVariable;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.IMutableRelation;
import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBuffer;
import com.bigdata.relation.rule.IRule;
import com.bigdata.relation.rule.eval.ActionEnum;
import com.bigdata.relation.rule.eval.IJoinNexus;
import com.bigdata.relation.rule.eval.ISolution;
import com.bigdata.relation.rule.eval.pipeline.IJoinMaster;
import com.bigdata.relation.rule.eval.pipeline.JoinTask;
import com.bigdata.relation.rule.eval.pipeline.JoinTaskFactoryTask;
import com.bigdata.relation.rule.eval.pipeline.JoinTaskSink;
import com.bigdata.relation.rule.eval.pipeline.UnsyncDistributedOutputBuffer;
import com.bigdata.relation.rule.eval.pipeline.UnsynchronizedSolutionBuffer;
import com.bigdata.service.AbstractDistributedFederation;
import com.bigdata.service.AbstractScaleOutFederation;
import com.bigdata.service.DataService;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.IDataService;
import com.bigdata.striterator.IKeyOrder;
import com.bigdata.util.concurrent.Computable;
import com.bigdata.util.concurrent.Memoizer;
import cutthecrap.utils.striterators.Filter;
import cutthecrap.utils.striterators.Resolver;
import cutthecrap.utils.striterators.Striterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class DistributedJoinTask
extends JoinTask {
    private static final boolean trace = false;
    protected final AbstractScaleOutFederation<?> fed;
    protected final IJoinNexus fedJoinNexus;
    protected Future<Void> futureProxy;
    private final IKeyOrder<?>[] keyOrders;
    private final String nextScaleOutIndexName;
    private final Vector<IAsynchronousIterator<IBindingSet[]>> sources = new Vector();
    private boolean sourcesExhausted = false;
    private final DataService dataService;
    private final IBuffer<ISolution[]> solutionBuffer;
    private final ReentrantLock lock = new ReentrantLock();
    private static final Computable<SinkRequest, JoinTaskSink> getSink = new Computable<SinkRequest, JoinTaskSink>(){

        @Override
        public JoinTaskSink compute(SinkRequest req) throws InterruptedException {
            try {
                return req.joinTask._getSink(req.locator);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    };
    private final SinkMemoizer memo;

    public DistributedJoinTask(IRule rule, IJoinNexus joinNexus, int[] order, int orderIndex, int partitionId, AbstractScaleOutFederation<?> fed, IJoinMaster master, UUID masterUUID, IAsynchronousIterator<IBindingSet[]> src, IKeyOrder[] keyOrders, DataService dataService, IVariable[][] requiredVars) {
        super(rule, joinNexus, order, orderIndex, partitionId, master, masterUUID, requiredVars);
        if (fed == null) {
            throw new IllegalArgumentException();
        }
        if (src == null) {
            throw new IllegalArgumentException();
        }
        if (dataService == null) {
            throw new IllegalArgumentException();
        }
        if (joinNexus instanceof IBigdataFederation) {
            throw new IllegalArgumentException();
        }
        this.fed = fed;
        this.keyOrders = keyOrders;
        this.dataService = dataService;
        this.fedJoinNexus = joinNexus.getJoinNexusFactory().newInstance(fed);
        if (this.lastJoin) {
            this.memo = null;
            this.nextScaleOutIndexName = null;
            ActionEnum action = this.fedJoinNexus.getAction();
            if (action.isMutation()) {
                IJoinNexus tmp = this.fedJoinNexus;
                IMutableRelation relation = (IMutableRelation)tmp.getHeadRelationView(rule.getHead());
                switch (action) {
                    case Insert: {
                        this.solutionBuffer = tmp.newInsertBuffer(relation);
                        break;
                    }
                    case Delete: {
                        this.solutionBuffer = tmp.newDeleteBuffer(relation);
                        break;
                    }
                    default: {
                        throw new AssertionError();
                    }
                }
            } else {
                try {
                    this.solutionBuffer = this.masterProxy.getSolutionBuffer();
                }
                catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
            }
        } else {
            IPredicate nextPredicate = rule.getTail(order[orderIndex + 1]);
            String namespace = nextPredicate.getOnlyRelationName();
            this.nextScaleOutIndexName = namespace + "." + keyOrders[order[orderIndex + 1]];
            this.solutionBuffer = null;
            this.memo = new SinkMemoizer(getSink);
        }
        this.addSource(src);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean addSource(IAsynchronousIterator<IBindingSet[]> source) {
        if (source == null) {
            throw new IllegalArgumentException();
        }
        this.lock.lock();
        try {
            if (this.sourcesExhausted) {
                if (INFO) {
                    log.info("source rejected: orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
                }
                boolean bl = false;
                return bl;
            }
            this.sources.add(source);
            ++this.stats.fanIn;
        }
        finally {
            this.lock.unlock();
        }
        if (DEBUG) {
            log.debug("orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId + ", fanIn=" + this.stats.fanIn + ", fanOut=" + this.stats.fanOut);
        }
        return true;
    }

    @Override
    protected final IBuffer<ISolution[]> getSolutionBuffer() {
        return this.solutionBuffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void closeSources() {
        if (INFO) {
            log.info(this.toString());
        }
        this.lock.lock();
        try {
            IAsynchronousIterator[] a;
            this.sourcesExhausted = true;
            for (IAsynchronousIterator source : a = this.sources.toArray(new IAsynchronousIterator[0])) {
                source.close();
            }
            this.removeFromSession();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeFromSession() {
        this.lock.lock();
        try {
            String namespace = JoinTaskFactoryTask.getJoinTaskNamespace(this.masterUUID, this.orderIndex, this.partitionId);
            this.dataService.getSession().remove(namespace, this);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected IBindingSet[] nextChunk() throws InterruptedException {
        if (this.sourcesExhausted) {
            return null;
        }
        if (DEBUG) {
            log.debug("Reading chunk of bindings from source(s): orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
        }
        int bindingSetCount = 0;
        LinkedList<IBindingSet[]> chunks = new LinkedList<IBindingSet[]>();
        int chunkCapacity = 100;
        while (!this.sourcesExhausted) {
            while (!this.halt && !this.sources.isEmpty() && bindingSetCount < 100) {
                IAsynchronousIterator[] sources = this.sources.toArray(new IAsynchronousIterator[0]);
                int nexhausted = 0;
                for (int i = 0; i < sources.length && bindingSetCount < 100; ++i) {
                    IAsynchronousIterator src = sources[i];
                    if (src.hasNext(1L, TimeUnit.MILLISECONDS)) {
                        IBindingSet[] chunk = (IBindingSet[])src.next(10L, TimeUnit.MILLISECONDS);
                        assert (chunk != null);
                        chunks.add(chunk);
                        bindingSetCount += chunk.length;
                        if (!DEBUG) continue;
                        log.debug("Read chunk from source: sources[" + i + "], chunkSize=" + chunk.length + ", orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
                        continue;
                    }
                    if (!src.isExhausted()) continue;
                    ++nexhausted;
                    if (DEBUG) {
                        log.debug("Source is exhausted: nexhausted=" + nexhausted);
                    }
                    if (!this.sources.remove(src)) {
                        throw new AssertionError((Object)("Could not find source: " + src));
                    }
                }
                if (nexhausted != sources.length) continue;
                this.lock.lock();
                try {
                    if (!this.sources.isEmpty()) break;
                    if (INFO) {
                        log.info("Sources are exhausted: orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
                    }
                    this.sourcesExhausted = true;
                    this.removeFromSession();
                    break;
                }
                finally {
                    this.lock.unlock();
                }
            }
            if (this.halt) {
                throw new RuntimeException((Throwable)this.firstCause.get());
            }
            if (chunks.isEmpty()) continue;
            return this.combineChunks(chunks, bindingSetCount);
        }
        if (DEBUG) {
            log.debug("Sources are exhausted: orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
        }
        return null;
    }

    protected IBindingSet[] combineChunks(List<IBindingSet[]> chunks, int bindingSetCount) {
        IBindingSet[] chunk;
        int chunkCount = chunks.size();
        assert (chunkCount > 0);
        assert (bindingSetCount > 0);
        if (chunkCount == 1) {
            chunk = chunks.get(0);
        } else {
            chunk = new IBindingSet[bindingSetCount];
            Iterator<IBindingSet[]> itr = chunks.iterator();
            int offset = 0;
            while (itr.hasNext()) {
                IBindingSet[] a = itr.next();
                System.arraycopy(a, 0, chunk, offset, a.length);
                offset += a.length;
            }
        }
        if (this.halt) {
            throw new RuntimeException((Throwable)this.firstCause.get());
        }
        if (DEBUG) {
            log.debug("Read chunk(s): nchunks=" + chunkCount + ", #bindingSets=" + chunk.length + ", orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
        }
        this.stats.bindingSetChunksIn += (long)chunkCount;
        this.stats.bindingSetsIn += (long)bindingSetCount;
        return chunk;
    }

    @Override
    protected AbstractUnsynchronizedArrayBuffer<IBindingSet> newUnsyncOutputBuffer() {
        int chunkCapacity = this.fedJoinNexus.getChunkCapacity();
        AbstractUnsynchronizedArrayBuffer unsyncOutputBuffer = this.lastJoin ? new UnsynchronizedSolutionBuffer(this, this.fedJoinNexus, chunkCapacity) : new UnsyncDistributedOutputBuffer(this.fed, this, chunkCapacity);
        return unsyncOutputBuffer;
    }

    @Override
    protected void flushAndCloseBuffersAndAwaitSinks() throws InterruptedException, ExecutionException {
        if (DEBUG) {
            log.debug("orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId + (this.lastJoin ? ", lastJoin" : ", sinkCount=" + this.memo.size()));
        }
        if (this.lastJoin) {
            assert (this.memo == null);
            if (DEBUG) {
                log.debug("\nWill flush buffer containing " + this.getSolutionBuffer().size() + " solutions.");
            }
            long counter = this.getSolutionBuffer().flush();
            if (DEBUG) {
                log.debug("\nFlushed buffer: mutationCount=" + counter);
            }
            if (this.joinNexus.getAction().isMutation()) {
                this.stats.mutationCount.addAndGet(counter);
            }
        } else {
            if (this.halt) {
                throw new RuntimeException((Throwable)this.firstCause.get());
            }
            LinkedList<FlushAndCloseSinkBufferTask> tasks = new LinkedList<FlushAndCloseSinkBufferTask>();
            Iterator<JoinTaskSink> itr = this.memo.getSinks();
            while (itr.hasNext()) {
                JoinTaskSink sink = itr.next();
                tasks.add(new FlushAndCloseSinkBufferTask(sink));
            }
            List futures = this.fed.getExecutorService().invokeAll(tasks);
            for (Future f : futures) {
                f.get();
            }
            Iterator<JoinTaskSink> itr2 = this.memo.getSinks();
            while (itr2.hasNext()) {
                if (this.halt) {
                    throw new RuntimeException((Throwable)this.firstCause.get());
                }
                JoinTaskSink sink = itr2.next();
                Future f = sink.getFuture();
                if (DEBUG) {
                    log.debug("Waiting for Future: sink=" + sink);
                }
                f.get();
            }
        }
        if (DEBUG) {
            log.debug("Done: orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId + (this.lastJoin ? "lastJoin" : ", sinkCount=" + this.memo.size()));
        }
    }

    @Override
    protected void cancelSinks() {
        if (this.lastJoin) {
            return;
        }
        if (DEBUG) {
            log.debug("orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId + ", sinkCount=" + this.memo.size());
        }
        Iterator<JoinTaskSink> itr = this.memo.getSinks();
        while (itr.hasNext()) {
            JoinTaskSink sink = itr.next();
            sink.unsyncBuffer.reset();
            sink.blockingBuffer.reset();
            sink.blockingBuffer.close();
            sink.getFuture().cancel(true);
        }
        if (DEBUG) {
            log.debug("Done: orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId + ", sinkCount=" + this.memo.size());
        }
    }

    protected JoinTaskSink getSink(PartitionLocator locator) throws InterruptedException, RuntimeException {
        return (JoinTaskSink)this.memo.compute(new SinkRequest(this, locator));
    }

    private JoinTaskSink _getSink(PartitionLocator locator) throws InterruptedException, ExecutionException {
        Future<? extends Object> factoryFuture;
        UUID sinkUUID;
        int nextOrderIndex = this.orderIndex + 1;
        if (DEBUG) {
            log.debug("Creating join task: nextOrderIndex=" + nextOrderIndex + ", indexName=" + this.nextScaleOutIndexName + ", partitionId=" + locator.getPartitionId());
        }
        IDataService dataService = (sinkUUID = locator.getDataServiceUUID()).equals(this.fed.getServiceUUID()) ? (IDataService)this.fed.getService() : this.fed.getDataService(sinkUUID);
        JoinTaskSink sink = new JoinTaskSink(this.fed, locator, this);
        IAsynchronousIterator<IBindingSet[]> sourceItrProxy = this.fed.isDistributed() ? ((AbstractDistributedFederation)this.fed).getProxy(sink.blockingBuffer.iterator(), this.joinNexus.getBindingSetSerializer(), this.joinNexus.getChunkOfChunksCapacity()) : sink.blockingBuffer.iterator();
        try {
            JoinTaskFactoryTask factoryTask = new JoinTaskFactoryTask(this.nextScaleOutIndexName, this.rule, this.joinNexus.getJoinNexusFactory(), this.order, nextOrderIndex, locator.getPartitionId(), this.masterProxy, this.masterUUID, sourceItrProxy, this.keyOrders, this.requiredVars);
            factoryFuture = dataService.submit(factoryTask);
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        sink.setFuture((Future)factoryFuture.get());
        ++this.stats.fanOut;
        return sink;
    }

    @Override
    protected void logCallError(Throwable t) {
        log.error("hostname=" + this.dataService.getHostname() + ", serviceName=" + this.dataService.getServiceName() + ", joinTask=" + this.toString() + ", rule=" + this.rule, t);
    }

    private static class SinkMemoizer
    extends Memoizer<SinkRequest, JoinTaskSink> {
        public SinkMemoizer(Computable<SinkRequest, JoinTaskSink> c) {
            super(c);
        }

        int size() {
            return this.cache.size();
        }

        Iterator<JoinTaskSink> getSinks() {
            return new Striterator(this.cache.values().iterator()).addFilter(new Filter(){
                private static final long serialVersionUID = 1L;

                @Override
                public boolean isValid(Object e) {
                    Future f = (Future)e;
                    if (!f.isDone()) {
                        return false;
                    }
                    try {
                        f.get();
                    }
                    catch (ExecutionException ex) {
                        return false;
                    }
                    catch (InterruptedException ex) {
                        return false;
                    }
                    return true;
                }
            }).addFilter(new Resolver(){
                private static final long serialVersionUID = 1L;

                @Override
                protected Object resolve(Object arg0) {
                    Future f = (Future)arg0;
                    try {
                        return f.get();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    catch (ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }

    private static class SinkRequest {
        final DistributedJoinTask joinTask;
        final PartitionLocator locator;

        public SinkRequest(DistributedJoinTask joinTask, PartitionLocator locator) {
            this.joinTask = joinTask;
            this.locator = locator;
        }

        public boolean equals(Object o) {
            if (!(o instanceof SinkRequest)) {
                return false;
            }
            SinkRequest r = (SinkRequest)o;
            return this.joinTask == r.joinTask && this.locator.equals(r.locator);
        }

        public int hashCode() {
            return this.locator.hashCode();
        }
    }

    private class FlushAndCloseSinkBufferTask
    implements Callable<Void> {
        private final JoinTaskSink sink;

        public FlushAndCloseSinkBufferTask(JoinTaskSink sink) {
            if (sink == null) {
                throw new IllegalArgumentException();
            }
            this.sink = sink;
        }

        @Override
        public Void call() throws Exception {
            if (DistributedJoinTask.this.halt) {
                throw new RuntimeException((Throwable)DistributedJoinTask.this.firstCause.get());
            }
            if (JoinTask.DEBUG) {
                JoinTask.log.debug("Closing sink: sink=" + this.sink + ", unsyncBufferSize=" + this.sink.unsyncBuffer.size() + ", blockingBufferSize=" + this.sink.blockingBuffer.size());
            }
            this.sink.unsyncBuffer.flush();
            this.sink.blockingBuffer.close();
            return null;
        }
    }
}

