/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.service.ndx.pipeline;

import com.bigdata.relation.accesspath.BlockingBuffer;
import com.bigdata.relation.accesspath.BufferClosedException;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.resources.StaleLocatorException;
import com.bigdata.service.ndx.pipeline.AbstractMasterStats;
import com.bigdata.service.ndx.pipeline.AbstractSubtask;
import com.bigdata.service.ndx.pipeline.AbstractSubtaskStats;
import com.bigdata.service.ndx.pipeline.IMasterTask;
import com.bigdata.service.ndx.pipeline.IdleTimeoutException;
import com.bigdata.service.ndx.pipeline.MasterExhaustedException;
import com.bigdata.service.ndx.pipeline.SubtaskOp;
import com.bigdata.util.concurrent.AbstractHaltableProcess;
import java.lang.reflect.Array;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

public abstract class AbstractMasterTask<H extends AbstractMasterStats<L, ? extends AbstractSubtaskStats>, E, S extends AbstractSubtask, L>
extends AbstractHaltableProcess
implements Callable<H>,
IMasterTask<E, H> {
    protected static final transient Logger log = Logger.getLogger(AbstractMasterTask.class);
    protected final BlockingBuffer<E[]> buffer;
    private final BlockingQueue<E[]> redirectQueue = new LinkedBlockingQueue<E[]>();
    protected final IAsynchronousIterator<E[]> src;
    private final ConcurrentHashMap<L, S> sinks;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition subtaskDone = this.lock.newCondition();
    private final BlockingQueue<S> finishedSubtaskQueue = new LinkedBlockingQueue<S>();
    public final H stats;
    protected final long sinkIdleTimeoutNanos;
    protected final long sinkPollTimeoutNanos;
    private static final long offerWarningTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(5000L);

    public final int getRedirectQueueSize() {
        return this.redirectQueue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void redirectChunk(E[] chunk) throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            this.redirectQueue.put(chunk);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public BlockingBuffer<E[]> getBuffer() {
        return this.buffer;
    }

    public void mapOperationOverSubtasks(SubtaskOp<S> op) throws InterruptedException, ExecutionException {
        Iterator<S> itr = this.sinks.values().iterator();
        while (itr.hasNext()) {
            try {
                op.call((AbstractSubtask)itr.next());
            }
            catch (Exception ex) {
                throw new ExecutionException(ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifySubtaskDone(AbstractSubtask subtask) throws InterruptedException {
        if (subtask == null) {
            throw new IllegalArgumentException();
        }
        if (subtask.buffer.isOpen()) {
            throw new IllegalStateException();
        }
        this.lock.lockInterruptibly();
        try {
            this.moveSinkToFinishedQueueAtomically(subtask.locator, subtask);
            this.subtaskDone.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public H getStats() {
        return this.stats;
    }

    public AbstractMasterTask(H stats, BlockingBuffer<E[]> buffer, long sinkIdleTimeoutNanos, long sinkPollTimeoutNanos) {
        if (stats == null) {
            throw new IllegalArgumentException();
        }
        if (buffer == null) {
            throw new IllegalArgumentException();
        }
        if (sinkIdleTimeoutNanos <= 0L) {
            throw new IllegalArgumentException();
        }
        if (sinkPollTimeoutNanos <= 0L) {
            throw new IllegalArgumentException();
        }
        this.stats = stats;
        this.buffer = buffer;
        this.sinkIdleTimeoutNanos = sinkIdleTimeoutNanos;
        this.sinkPollTimeoutNanos = sinkPollTimeoutNanos;
        this.src = buffer.iterator();
        this.sinks = new ConcurrentHashMap();
        ((AbstractMasterStats)stats).addMaster(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public H call() throws Exception {
        boolean reopen = this.sinkIdleTimeoutNanos != 0L;
        try {
            while (true) {
                Object[] a;
                block11: {
                    this.halted();
                    this.drainFutures();
                    a = (Object[])this.redirectQueue.poll();
                    if (a == null) {
                        if (this.src.hasNext(this.buffer.getChunkTimeout(), TimeUnit.NANOSECONDS)) {
                            a = (Object[])this.src.next();
                            break block11;
                        } else {
                            if (this.buffer.isOpen() || !this.buffer.isEmpty()) continue;
                            this.awaitAll();
                            return this.stats;
                        }
                    }
                    if (log.isInfoEnabled()) {
                        log.info("Read chunk from redirectQueue");
                    }
                }
                if (a.length == 0) continue;
                H h = this.stats;
                synchronized (h) {
                    ((AbstractMasterStats)this.stats).chunksIn.incrementAndGet();
                    ((AbstractMasterStats)this.stats).elementsIn.addAndGet(a.length);
                }
                this.handleChunk(a, reopen);
            }
        }
        catch (Throwable t) {
            log.error("Cancelling: job=" + this + ", cause=" + t, t);
            try {
                this.cancelAll(true);
                throw new RuntimeException(t);
            }
            catch (Throwable t2) {
                log.error(t2);
            }
            throw new RuntimeException(t);
        }
    }

    protected abstract void handleChunk(E[] var1, boolean var2) throws InterruptedException;

    protected boolean nothingPending() {
        return true;
    }

    protected void willShutdown() throws InterruptedException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitAll() throws InterruptedException, ExecutionException {
        if (this.buffer.isOpen()) {
            throw new IllegalStateException();
        }
        this.willShutdown();
        while (true) {
            Object[] a;
            this.halted();
            this.lock.lockInterruptibly();
            try {
                a = (Object[])this.redirectQueue.poll();
                if (a == null) {
                    if (this.finishedSubtaskQueue.isEmpty() && this.sinks.isEmpty() && this.redirectQueue.isEmpty() && this.nothingPending()) {
                        if (log.isInfoEnabled()) {
                            log.info("All subtasks are done: " + this);
                        }
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Waiting for " + this.sinks.size() + " subtasks : " + this);
                    }
                    this.drainFutures();
                    if (!this.finishedSubtaskQueue.isEmpty()) {
                        this.subtaskDone.await(50L, TimeUnit.MILLISECONDS);
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
            if (a == null) continue;
            this.handleChunk(a, true);
        }
    }

    private void cancelAll(boolean mayInterruptIfRunning) throws InterruptedException {
        Future f;
        log.warn("Cancelling job: " + this);
        this.buffer.close();
        this.buffer.clear();
        for (AbstractSubtask sink : this.sinks.values()) {
            f = sink.buffer.getFuture();
            if (f.isDone()) continue;
            f.cancel(mayInterruptIfRunning);
        }
        for (AbstractSubtask sink : this.sinks.values()) {
            f = sink.buffer.getFuture();
            try {
                f.get();
            }
            catch (InterruptedException ex) {
                throw ex;
            }
            catch (ExecutionException ex) {
                log.warn("sink=" + sink + " : " + ex);
            }
        }
        this.sinks.clear();
        this.finishedSubtaskQueue.clear();
        this.redirectQueue.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected S getSink(L locator, boolean reopen) throws InterruptedException {
        if (locator == null) {
            throw new IllegalArgumentException();
        }
        this.halted();
        AbstractSubtask sink = (AbstractSubtask)this.sinks.get(locator);
        if (sink != null && sink.buffer.isOpen()) {
            return (S)sink;
        }
        this.lock.lockInterruptibly();
        try {
            if (reopen && sink != null && !sink.buffer.isOpen()) {
                if (log.isInfoEnabled()) {
                    log.info("Reopening sink (was closed): " + this + ", locator=" + locator);
                }
                this.moveSinkToFinishedQueueAtomically(locator, sink);
                sink = null;
            }
            if (sink == null) {
                if (log.isInfoEnabled()) {
                    log.info("Creating output buffer: " + this + ", locator=" + locator);
                }
                BlockingBuffer<E[]> out = this.newSubtaskBuffer();
                sink = this.newSubtask(locator, out);
                AbstractSubtask oldval = this.sinks.put(locator, sink);
                assert (oldval == null) : "locator=" + locator;
                FutureTask ft = new FutureTask(sink);
                out.setFuture(ft);
                this.submitSubtask(ft);
                ((AbstractMasterStats)this.stats).subtaskStartCount.incrementAndGet();
            }
            AbstractSubtask abstractSubtask = sink;
            return (S)abstractSubtask;
        }
        finally {
            this.lock.unlock();
        }
    }

    protected abstract BlockingBuffer<E[]> newSubtaskBuffer();

    protected abstract S newSubtask(L var1, BlockingBuffer<E[]> var2);

    protected abstract void submitSubtask(FutureTask<? extends AbstractSubtaskStats> var1);

    private void drainFutures() throws InterruptedException, ExecutionException {
        while (true) {
            this.halted();
            AbstractSubtask sink = (AbstractSubtask)this.finishedSubtaskQueue.peek();
            if (sink == null) {
                return;
            }
            if (sink.buffer.isOpen()) {
                throw new IllegalStateException(sink.toString());
            }
            Future f = sink.buffer.getFuture();
            if (!sink.buffer.getFuture().isDone()) {
                return;
            }
            try {
                f.get();
                if (sink != this.finishedSubtaskQueue.remove()) {
                    throw new AssertionError();
                }
                continue;
            }
            catch (ExecutionException ex) {
                throw this.halt(ex);
            }
            finally {
                ((AbstractMasterStats)this.stats).subtaskEndCount.incrementAndGet();
                if (!log.isDebugEnabled()) continue;
                log.debug("subtaskEndCount incremented: " + sink.locator);
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void moveSinkToFinishedQueueAtomically(L locator, AbstractSubtask sink) throws InterruptedException {
        if (locator == null) {
            throw new IllegalArgumentException();
        }
        if (sink == null) {
            throw new IllegalArgumentException();
        }
        this.lock.lockInterruptibly();
        try {
            this.finishedSubtaskQueue.put(sink);
            if (this.sinks.remove(locator, sink) && log.isDebugEnabled()) {
                log.debug("Removed output buffer: " + locator);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addToOutputBuffer(L locator, E[] a, int fromIndex, int toIndex, boolean reopen) throws InterruptedException {
        int n = toIndex - fromIndex;
        if (n == 0) {
            return;
        }
        Object[] b = (Object[])Array.newInstance(a.getClass().getComponentType(), n);
        System.arraycopy(a, fromIndex, b, 0, n);
        long begin = System.nanoTime();
        boolean added = false;
        while (!added) {
            this.halted();
            S sink = this.getSink(locator, reopen);
            try {
                added = ((AbstractSubtask)sink).buffer.add(b, offerWarningTimeoutNanos, TimeUnit.NANOSECONDS);
                long now = System.nanoTime();
                if (added) {
                    ((AbstractSubtask)sink).lastChunkNanos = now;
                    continue;
                }
                log.warn("Sink is slow: elapsed=" + TimeUnit.NANOSECONDS.toMillis(now - begin) + "ms, sink=" + sink);
            }
            catch (BufferClosedException ex) {
                if (ex.getCause() instanceof StaleLocatorException) {
                    if (log.isInfoEnabled()) {
                        log.info("Sink closed asynchronously by stale locator exception: " + sink);
                    }
                    this.redirectChunk(b);
                    added = true;
                    continue;
                }
                if (ex.getCause() instanceof IdleTimeoutException || ex.getCause() instanceof MasterExhaustedException) {
                    if (log.isInfoEnabled()) {
                        log.info("Sink closed asynchronously: cause=" + ex.getCause() + ", sink=" + sink);
                    }
                    reopen = true;
                    continue;
                }
                throw ex;
            }
        }
        H h = this.stats;
        synchronized (h) {
            ((AbstractMasterStats)this.stats).chunksTransferred.incrementAndGet();
            ((AbstractMasterStats)this.stats).elementsTransferred.addAndGet(b.length);
            ((AbstractMasterStats)this.stats).elementsOnSinkQueues.addAndGet(b.length);
            ((AbstractMasterStats)this.stats).elapsedSinkOfferNanos += System.nanoTime() - begin;
        }
    }
}

