/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.relation.rule.eval.pipeline;

import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IPredicate;
import com.bigdata.journal.TimestampUtility;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.accesspath.IBuffer;
import com.bigdata.relation.accesspath.ThickAsynchronousIterator;
import com.bigdata.relation.rule.IRule;
import com.bigdata.relation.rule.eval.IJoinNexus;
import com.bigdata.relation.rule.eval.ISolution;
import com.bigdata.relation.rule.eval.pipeline.IJoinMaster;
import com.bigdata.relation.rule.eval.pipeline.JoinMasterTask;
import com.bigdata.relation.rule.eval.pipeline.JoinTaskFactoryTask;
import com.bigdata.service.AbstractDistributedFederation;
import com.bigdata.service.AbstractScaleOutFederation;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.IDataService;
import com.bigdata.util.concurrent.ExecutionExceptions;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class DistributedJoinMasterTask
extends JoinMasterTask
implements Serializable {
    private static final long serialVersionUID = 7096223893807015958L;
    private final IJoinMaster masterProxy;
    private final IBuffer<ISolution[]> solutionBufferProxy;

    private void writeObject(ObjectOutputStream out) throws IOException {
        if (!this.joinNexus.getAction().isMutation()) {
            throw new UnsupportedOperationException("Join master may not be executed remotely for query.");
        }
        out.defaultWriteObject();
    }

    public DistributedJoinMasterTask(IRule rule, IJoinNexus joinNexus, IBuffer<ISolution[]> buffer) {
        super(rule, joinNexus, buffer);
        if (!(joinNexus.getIndexManager() instanceof IBigdataFederation) || !((IBigdataFederation)joinNexus.getIndexManager()).isScaleOut()) {
            throw new UnsupportedOperationException();
        }
        if (joinNexus.getAction().isMutation()) {
            if (!TimestampUtility.isReadOnly(joinNexus.getReadTimestamp())) {
                throw new UnsupportedOperationException();
            }
        } else if (joinNexus.getReadTimestamp() == 0L) {
            log.warn("Unisolated scale-out query");
        }
        if (joinNexus.getIndexManager() instanceof AbstractDistributedFederation) {
            AbstractDistributedFederation fed = (AbstractDistributedFederation)joinNexus.getIndexManager();
            this.masterProxy = fed.getProxy(this, true);
            this.solutionBufferProxy = joinNexus.getAction().isMutation() ? null : fed.getProxy(this.solutionBuffer);
        } else {
            this.masterProxy = this;
            this.solutionBufferProxy = this.solutionBuffer;
        }
    }

    @Override
    public IBuffer<ISolution[]> getSolutionBuffer() throws IOException {
        if (this.joinNexus.getAction().isMutation()) {
            throw new UnsupportedOperationException();
        }
        return this.solutionBufferProxy;
    }

    @Override
    protected final List<Future<Void>> start() throws Exception {
        IBindingSet initialBindingSet = this.joinNexus.newBindingSet(this.rule);
        List<Future> factoryTaskFutures = this.mapBindingSet(initialBindingSet);
        List<Future<Void>> joinTaskFutures = this.awaitFactoryFutures(factoryTaskFutures);
        return joinTaskFutures;
    }

    protected List<Future> mapBindingSet(IBindingSet bindingSet) throws Exception {
        IPredicate predicate = this.rule.getTail(this.order[0]).asBound(bindingSet);
        AbstractScaleOutFederation fed = (AbstractScaleOutFederation)this.joinNexus.getIndexManager();
        String scaleOutIndexName = predicate.getOnlyRelationName() + "." + this.ruleState.getKeyOrder()[this.order[0]];
        Iterator<PartitionLocator> itr = this.joinNexus.locatorScan(fed, predicate);
        LinkedList<Future> futures = new LinkedList<Future>();
        while (itr.hasNext()) {
            Future<? extends Object> f;
            PartitionLocator locator = itr.next();
            int partitionId = locator.getPartitionId();
            if (log.isDebugEnabled()) {
                log.debug("Will submit JoinTask: partitionId=" + partitionId);
            }
            ThickAsynchronousIterator<IBindingSet[]> sourceItr = this.newBindingSetIterator(bindingSet);
            JoinTaskFactoryTask factoryTask = new JoinTaskFactoryTask(scaleOutIndexName, this.rule, this.joinNexusFactory, this.order, 0, partitionId, this.masterProxy, this.masterUUID, sourceItr, this.ruleState.getKeyOrder(), this.ruleState.getRequiredVars());
            IDataService dataService = fed.getDataService(locator.getDataServiceUUID());
            try {
                f = dataService.submit(factoryTask);
            }
            catch (Exception ex) {
                throw new ExecutionException("Could not submit: task=" + factoryTask, ex);
            }
            futures.add(f);
        }
        return futures;
    }

    protected List<Future<Void>> awaitFactoryFutures(List<Future> factoryTaskFutures) throws InterruptedException, ExecutionExceptions {
        int size = factoryTaskFutures.size();
        if (log.isDebugEnabled()) {
            log.debug("#futures=" + size);
        }
        int ndone = 0;
        ArrayList<Future<Void>> joinTaskFutures = new ArrayList<Future<Void>>(size);
        Iterator<Future> itr = factoryTaskFutures.iterator();
        LinkedList<ExecutionException> causes = new LinkedList<ExecutionException>();
        while (itr.hasNext()) {
            Future future;
            Future factoryTaskFuture = itr.next();
            if (log.isDebugEnabled()) {
                log.debug("Waiting for factoryTask");
            }
            try {
                if (!causes.isEmpty()) {
                    factoryTaskFuture.cancel(true);
                }
                future = (Future)factoryTaskFuture.get();
            }
            catch (ExecutionException ex) {
                causes.add(ex);
                log.error(ex, ex);
                continue;
            }
            if (causes.isEmpty()) {
                joinTaskFutures.add(future);
            } else {
                future.cancel(true);
            }
            ++ndone;
            if (!log.isDebugEnabled()) continue;
            log.debug("ndone=" + ndone + " of " + size);
        }
        if (!causes.isEmpty()) {
            for (Future future : joinTaskFutures) {
                future.cancel(true);
            }
            throw new ExecutionExceptions(causes);
        }
        if (log.isDebugEnabled()) {
            log.debug("All factory tasks done: #futures=" + size);
        }
        return joinTaskFutures;
    }
}

