/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.relation.rule.eval.pipeline;

import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IVariable;
import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer;
import com.bigdata.relation.accesspath.BlockingBuffer;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBuffer;
import com.bigdata.relation.rule.IRule;
import com.bigdata.relation.rule.eval.IJoinNexus;
import com.bigdata.relation.rule.eval.ISolution;
import com.bigdata.relation.rule.eval.pipeline.IJoinMaster;
import com.bigdata.relation.rule.eval.pipeline.JoinTask;
import com.bigdata.relation.rule.eval.pipeline.UnsyncLocalOutputBuffer;
import com.bigdata.relation.rule.eval.pipeline.UnsynchronizedSolutionBuffer;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class LocalJoinTask
extends JoinTask {
    private final IAsynchronousIterator<IBindingSet[]> source;
    private final IBuffer<ISolution[]> solutionBuffer;
    protected final BlockingBuffer<IBindingSet[]> syncBuffer;
    private volatile Future<? extends Object> sinkFuture;

    public LocalJoinTask(IRule rule, IJoinNexus joinNexus, int[] order, int orderIndex, IJoinMaster masterProxy, UUID masterUUID, IAsynchronousIterator<IBindingSet[]> source, IBuffer<ISolution[]> solutionBuffer, IVariable[][] requiredVars) {
        super(rule, joinNexus, order, orderIndex, -1, masterProxy, masterUUID, requiredVars);
        if (source == null) {
            throw new IllegalArgumentException();
        }
        if (this.lastJoin && solutionBuffer == null) {
            throw new IllegalArgumentException();
        }
        this.source = source;
        this.stats.fanIn = 1;
        if (this.lastJoin) {
            this.syncBuffer = null;
            this.solutionBuffer = solutionBuffer;
        } else {
            this.syncBuffer = new BlockingBuffer(joinNexus.getChunkOfChunksCapacity());
            this.solutionBuffer = null;
            this.stats.fanOut = 1;
        }
    }

    @Override
    protected IBuffer<ISolution[]> getSolutionBuffer() {
        if (!this.lastJoin) {
            throw new IllegalStateException();
        }
        return this.solutionBuffer;
    }

    @Override
    protected void closeSources() {
        if (INFO) {
            log.info(this.toString());
        }
        this.source.close();
    }

    @Override
    protected final AbstractUnsynchronizedArrayBuffer<IBindingSet> newUnsyncOutputBuffer() {
        if (this.lastJoin) {
            return new UnsynchronizedSolutionBuffer<IBindingSet>(this, this.joinNexus, this.joinNexus.getChunkCapacity());
        }
        return new UnsyncLocalOutputBuffer<IBindingSet>(this.stats, this.joinNexus.getChunkCapacity(), this.syncBuffer);
    }

    protected final void setSinkFuture(Future<? extends Object> f) {
        if (f == null) {
            throw new IllegalArgumentException();
        }
        if (this.sinkFuture != null) {
            throw new IllegalStateException();
        }
        this.sinkFuture = f;
    }

    @Override
    protected void flushAndCloseBuffersAndAwaitSinks() throws InterruptedException, ExecutionException {
        if (DEBUG) {
            log.debug("orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
        }
        if (this.halt) {
            throw new RuntimeException((Throwable)this.firstCause.get());
        }
        if (this.lastJoin) {
            long counter = this.getSolutionBuffer().flush();
            if (this.joinNexus.getAction().isMutation()) {
                this.stats.mutationCount.addAndGet(counter);
            }
        } else {
            this.syncBuffer.close();
            assert (!this.syncBuffer.isOpen());
            if (this.halt) {
                throw new RuntimeException((Throwable)this.firstCause.get());
            }
            try {
                this.sinkFuture.get();
            }
            catch (Throwable t) {
                this.halt(t);
            }
        }
    }

    @Override
    protected void cancelSinks() {
        if (DEBUG) {
            log.debug("orderIndex=" + this.orderIndex + ", partitionId=" + this.partitionId);
        }
        if (!this.lastJoin) {
            this.syncBuffer.reset();
            this.sinkFuture.cancel(true);
        }
    }

    @Override
    protected IBindingSet[] nextChunk() throws InterruptedException {
        if (DEBUG) {
            log.debug("orderIndex=" + this.orderIndex);
        }
        while (!this.source.isExhausted()) {
            if (this.halt) {
                throw new RuntimeException((Throwable)this.firstCause.get());
            }
            if (!this.source.hasNext(10L, TimeUnit.MILLISECONDS)) continue;
            IBindingSet[] chunk = (IBindingSet[])this.source.next();
            ++this.stats.bindingSetChunksIn;
            this.stats.bindingSetsIn += (long)chunk.length;
            if (DEBUG) {
                log.debug("Read chunk from source: chunkSize=" + chunk.length + ", orderIndex=" + this.orderIndex);
            }
            return chunk;
        }
        if (DEBUG) {
            log.debug("Source exhausted: orderIndex=" + this.orderIndex);
        }
        return null;
    }
}

