/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.relation.rule.eval.pipeline;

import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IPredicate;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer;
import com.bigdata.relation.rule.eval.IJoinNexus;
import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask;
import com.bigdata.relation.rule.eval.pipeline.JoinStats;
import com.bigdata.relation.rule.eval.pipeline.JoinTaskSink;
import com.bigdata.service.AbstractScaleOutFederation;
import com.bigdata.service.IBigdataFederation;
import java.util.Iterator;
import org.apache.log4j.Logger;

class UnsyncDistributedOutputBuffer<E extends IBindingSet>
extends AbstractUnsynchronizedArrayBuffer<E> {
    private static final Logger log = Logger.getLogger(UnsyncDistributedOutputBuffer.class);
    private final DistributedJoinTask joinTask;
    private final int nextOrderIndex;
    final int nextTailIndex;
    final IBigdataFederation<?> fed;

    public UnsyncDistributedOutputBuffer(AbstractScaleOutFederation<?> fed, DistributedJoinTask joinTask, int capacity) {
        super(capacity, IBindingSet.class);
        if (fed == null) {
            throw new IllegalArgumentException();
        }
        if (joinTask == null) {
            throw new IllegalArgumentException();
        }
        this.fed = fed;
        this.joinTask = joinTask;
        this.nextOrderIndex = joinTask.orderIndex + 1;
        this.nextTailIndex = joinTask.getTailIndex(this.nextOrderIndex);
    }

    @Override
    protected void handleChunk(E[] chunk) {
        if (log.isDebugEnabled()) {
            log.debug("chunkSize=" + chunk.length);
        }
        int bindingSetsOut = 0;
        IPredicate nextPred = this.joinTask.rule.getTail(this.nextTailIndex);
        IJoinNexus joinNexus = this.joinTask.joinNexus;
        JoinStats stats = this.joinTask.stats;
        for (E bindingSet : chunk) {
            Iterator<PartitionLocator> itr = joinNexus.locatorScan(this.joinTask.fed, nextPred.asBound((IBindingSet)bindingSet));
            while (itr.hasNext()) {
                JoinTaskSink sink;
                PartitionLocator locator = itr.next();
                if (log.isTraceEnabled()) {
                    log.trace("adding bindingSet to buffer: nextOrderIndex=" + this.nextOrderIndex + ", partitionId=" + locator.getPartitionId() + ", bindingSet=" + bindingSet);
                }
                try {
                    sink = this.joinTask.getSink(locator);
                }
                catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
                if (sink.unsyncBuffer.add2((IBindingSet)bindingSet)) {
                    ++stats.bindingSetChunksOut;
                }
                ++bindingSetsOut;
            }
        }
        stats.bindingSetsOut += (long)bindingSetsOut;
    }
}

