package org.apache.sling.pipes.internal;

import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.sling.api.SlingHttpServletRequest;
import org.apache.sling.api.SlingHttpServletResponse;
import org.apache.sling.api.resource.NonExistingResource;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.pipes.OutputWriter;
import org.apache.sling.pipes.Pipe;
import org.apache.sling.pipes.PipeBindings;
import org.apache.sling.pipes.Plumber;
import org.apache.sling.pipes.SuperPipe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/pipes/internal/ManifoldPipe.class */
public class ManifoldPipe extends SuperPipe {
    public static final String RESOURCE_TYPE = "slingPipes/manifold";
    public static final String PN_QUEUE_SIZE = "queueSize";
    public static final String PN_NUM_THREADS = "numThreads";
    public static final String PN_EXECUTION_TIMEOUT = "executionTimeout";
    public static final int QUEUE_SIZE_DEFAULT = 10000;
    public static final int NUM_THREADS_DEFAULT = 5;
    public static final int EXECUTION_TIMEOUT_DEFAULT = 86400;
    private int numThreads;
    private int executionTimeout;
    private ArrayBlockingQueue<Resource> outputQueue;
    private static final Logger log = LoggerFactory.getLogger(ManifoldPipe.class);
    private static final Resource END_OF_STREAM = new NonExistingResource((ResourceResolver) null, "");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sling/pipes/internal/ManifoldPipe$ConcurrentIterator.class */
    public class ConcurrentIterator implements Iterator<Resource> {
        private ExecutorService executorService;
        private Resource nextItem = null;

        /* loaded from: input_file:org/apache/sling/pipes/internal/ManifoldPipe$ConcurrentIterator$StreamTerminator.class */
        private class StreamTerminator implements Runnable {
            private StreamTerminator() {
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    ConcurrentIterator.this.executorService.awaitTermination(ManifoldPipe.this.executionTimeout, TimeUnit.SECONDS);
                    ManifoldPipe.this.outputQueue.put(ManifoldPipe.END_OF_STREAM);
                } catch (InterruptedException e) {
                    ManifoldPipe.log.error("Interrupted while waiting for input exhaustion", e);
                    Thread.currentThread().interrupt();
                }
            }
        }

        ConcurrentIterator(int i) {
            this.executorService = Executors.newFixedThreadPool(i);
            Iterator it = ManifoldPipe.this.subpipes.iterator();
            while (it.hasNext()) {
                this.executorService.execute(new PipeThread((Pipe) it.next()));
            }
            this.executorService.shutdown();
            new Thread(new StreamTerminator()).start();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            peekNext();
            return this.nextItem != ManifoldPipe.END_OF_STREAM;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Resource next() {
            peekNext();
            if (this.nextItem == ManifoldPipe.END_OF_STREAM) {
                throw new NoSuchElementException();
            }
            Resource resource = this.nextItem;
            this.nextItem = null;
            return resource;
        }

        private void peekNext() {
            if (this.nextItem == null) {
                try {
                    this.nextItem = (Resource) ManifoldPipe.this.outputQueue.take();
                } catch (InterruptedException e) {
                    ManifoldPipe.log.error("Interrupted while retrieving output", e);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/sling/pipes/internal/ManifoldPipe$PipeThread.class */
    private class PipeThread implements Runnable {
        Pipe pipe;

        PipeThread(Pipe pipe) {
            this.pipe = pipe;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ManifoldPipe.this.plumber.execute(this.pipe.getResource().getResourceResolver().clone((Map) null), this.pipe, (Map<String, Object>) null, (OutputWriter) new ThreadOutputWriter(), true);
            } catch (Exception e) {
                ManifoldPipe.log.error("Error while running pipe %s", this.pipe.getName(), e);
            }
        }
    }

    /* loaded from: input_file:org/apache/sling/pipes/internal/ManifoldPipe$ThreadOutputWriter.class */
    private class ThreadOutputWriter extends OutputWriter {
        private ThreadOutputWriter() {
        }

        @Override // org.apache.sling.pipes.OutputWriter
        protected void writeItem(Resource resource) {
            try {
                ManifoldPipe.this.outputQueue.put(resource);
            } catch (InterruptedException e) {
                ManifoldPipe.log.error("Interrupted while running pipe %s", this.pipe.getName(), e);
                Thread.currentThread().interrupt();
            }
        }

        @Override // org.apache.sling.pipes.OutputWriter
        public boolean handleRequest(SlingHttpServletRequest slingHttpServletRequest) {
            return false;
        }

        @Override // org.apache.sling.pipes.OutputWriter
        protected void initResponse(SlingHttpServletResponse slingHttpServletResponse) {
        }

        @Override // org.apache.sling.pipes.OutputWriter
        public void starts() {
        }

        @Override // org.apache.sling.pipes.OutputWriter
        public void ends() {
        }
    }

    public ManifoldPipe(Plumber plumber, Resource resource, PipeBindings pipeBindings) {
        super(plumber, resource, pipeBindings);
        int intValue = ((Integer) this.properties.get(PN_QUEUE_SIZE, Integer.valueOf(QUEUE_SIZE_DEFAULT))).intValue();
        this.numThreads = ((Integer) this.properties.get(PN_NUM_THREADS, 5)).intValue();
        this.executionTimeout = ((Integer) this.properties.get(PN_EXECUTION_TIMEOUT, Integer.valueOf(EXECUTION_TIMEOUT_DEFAULT))).intValue();
        this.outputQueue = new ArrayBlockingQueue<>(intValue);
    }

    @Override // org.apache.sling.pipes.SuperPipe
    public void buildChildren() {
        Iterator listChildren = getConfiguration().listChildren();
        while (listChildren.hasNext()) {
            Resource resource = (Resource) listChildren.next();
            Pipe pipe = this.plumber.getPipe(resource, this.bindings);
            if (pipe == null) {
                log.error("configured pipe {} is either not registered, or not computable by the plumber", resource.getPath());
            } else {
                pipe.setParent(getParent());
                this.subpipes.add(pipe);
            }
        }
    }

    @Override // org.apache.sling.pipes.SuperPipe
    protected Iterator<Resource> computeSubpipesOutput() {
        return new ConcurrentIterator(this.numThreads);
    }
}
