package io.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockReleaseAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:io/druid/indexing/common/task/RealtimeIndexTask.class */
public class RealtimeIndexTask extends AbstractTask {
    public static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
    private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
    private static final Random random = new Random();

    @JsonIgnore
    private final FireDepartment spec;

    @JsonIgnore
    private volatile Plumber plumber;

    @JsonIgnore
    private volatile Firehose firehose;

    @JsonIgnore
    private volatile FireDepartmentMetrics metrics;

    @JsonIgnore
    private volatile boolean gracefullyStopped;

    @JsonIgnore
    private volatile boolean finishingJob;

    @JsonIgnore
    private volatile Thread runThread;

    @JsonIgnore
    private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;

    /* loaded from: input_file:io/druid/indexing/common/task/RealtimeIndexTask$TaskActionSegmentPublisher.class */
    public static class TaskActionSegmentPublisher implements SegmentPublisher {
        final TaskToolbox taskToolbox;

        public TaskActionSegmentPublisher(TaskToolbox taskToolbox) {
            this.taskToolbox = taskToolbox;
        }

        public void publishSegment(DataSegment dataSegment) throws IOException {
            this.taskToolbox.publishSegments(ImmutableList.of(dataSegment));
        }
    }

    private static String makeTaskId(FireDepartment fireDepartment) {
        return makeTaskId(fireDepartment.getDataSchema().getDataSource(), fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), DateTimes.nowUtc(), random.nextInt());
    }

    static String makeTaskId(String str, int i, DateTime dateTime, int i2) {
        StringBuilder sb = new StringBuilder(8);
        for (int i3 = 0; i3 < 8; i3++) {
            sb.append((char) (97 + ((i2 >>> (i3 * 4)) & 15)));
        }
        return StringUtils.format("index_realtime_%s_%d_%s_%s", new Object[]{str, Integer.valueOf(i), dateTime, sb});
    }

    private static String makeDatasource(FireDepartment fireDepartment) {
        return fireDepartment.getDataSchema().getDataSource();
    }

    @JsonCreator
    public RealtimeIndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") FireDepartment fireDepartment, @JsonProperty("context") Map<String, Object> map) {
        super(str == null ? makeTaskId(fireDepartment) : str, StringUtils.format("index_realtime_%s", new Object[]{makeDatasource(fireDepartment)}), taskResource, makeDatasource(fireDepartment), map);
        this.plumber = null;
        this.firehose = null;
        this.metrics = null;
        this.gracefullyStopped = false;
        this.finishingJob = false;
        this.runThread = null;
        this.queryRunnerFactoryConglomerate = null;
        this.spec = fireDepartment;
    }

    @Override // io.druid.indexing.common.task.Task
    public int getPriority() {
        return ((Integer) getContextValue(Tasks.PRIORITY_KEY, 75)).intValue();
    }

    @Override // io.druid.indexing.common.task.Task
    public String getType() {
        return "index_realtime";
    }

    @Override // io.druid.indexing.common.task.AbstractTask, io.druid.indexing.common.task.Task
    public String getNodeType() {
        return "realtime";
    }

    @Override // io.druid.indexing.common.task.AbstractTask, io.druid.indexing.common.task.Task
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        if (this.plumber == null) {
            return null;
        }
        return new FinalizeResultsQueryRunner(this.plumber.getQueryRunner(query), this.queryRunnerFactoryConglomerate.findFactory(query).getToolchest());
    }

    @Override // io.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return true;
    }

    @Override // io.druid.indexing.common.task.Task
    public TaskStatus run(final TaskToolbox taskToolbox) throws Exception {
        this.runThread = Thread.currentThread();
        if (this.plumber != null) {
            throw new IllegalStateException("WTF?!? run with non-null plumber??!");
        }
        setupTimeoutAlert();
        TaskActionSegmentPublisher taskActionSegmentPublisher = new TaskActionSegmentPublisher(taskToolbox);
        final long longValue = ((Long) getContextValue(Tasks.LOCK_TIMEOUT_KEY, Long.valueOf(Tasks.DEFAULT_LOCK_TIMEOUT))).longValue();
        DataSegmentAnnouncer dataSegmentAnnouncer = new DataSegmentAnnouncer() { // from class: io.druid.indexing.common.task.RealtimeIndexTask.1
            public void announceSegment(DataSegment dataSegment) throws IOException {
                Preconditions.checkNotNull(taskToolbox.getTaskActionClient().submit(new LockAcquireAction(TaskLockType.EXCLUSIVE, dataSegment.getInterval(), longValue)), "Cannot acquire a lock for interval[%s]", new Object[]{dataSegment.getInterval()});
                taskToolbox.getSegmentAnnouncer().announceSegment(dataSegment);
            }

            public void unannounceSegment(DataSegment dataSegment) throws IOException {
                try {
                    taskToolbox.getSegmentAnnouncer().unannounceSegment(dataSegment);
                } finally {
                    taskToolbox.getTaskActionClient().submit(new LockReleaseAction(dataSegment.getInterval()));
                }
            }

            public void announceSegments(Iterable<DataSegment> iterable) throws IOException {
                for (DataSegment dataSegment : iterable) {
                    Preconditions.checkNotNull(taskToolbox.getTaskActionClient().submit(new LockAcquireAction(TaskLockType.EXCLUSIVE, dataSegment.getInterval(), longValue)), "Cannot acquire a lock for interval[%s]", new Object[]{dataSegment.getInterval()});
                }
                taskToolbox.getSegmentAnnouncer().announceSegments(iterable);
            }

            public void unannounceSegments(Iterable<DataSegment> iterable) throws IOException {
                try {
                    taskToolbox.getSegmentAnnouncer().unannounceSegments(iterable);
                    Iterator<DataSegment> it = iterable.iterator();
                    while (it.hasNext()) {
                        taskToolbox.getTaskActionClient().submit(new LockReleaseAction(it.next().getInterval()));
                    }
                } catch (Throwable th) {
                    Iterator<DataSegment> it2 = iterable.iterator();
                    while (it2.hasNext()) {
                        taskToolbox.getTaskActionClient().submit(new LockReleaseAction(it2.next().getInterval()));
                    }
                    throw th;
                }
            }
        };
        VersioningPolicy versioningPolicy = new VersioningPolicy() { // from class: io.druid.indexing.common.task.RealtimeIndexTask.2
            public String getVersion(Interval interval) {
                try {
                    return ((TaskLock) Preconditions.checkNotNull(taskToolbox.getTaskActionClient().submit(new LockAcquireAction(TaskLockType.EXCLUSIVE, interval, longValue)), "Cannot acquire a lock for interval[%s]", new Object[]{interval})).getVersion();
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        };
        DataSchema dataSchema = this.spec.getDataSchema();
        RealtimeIOConfig iOConfig = this.spec.getIOConfig();
        RealtimeTuningConfig withVersioningPolicy = this.spec.getTuningConfig().withBasePersistDirectory(taskToolbox.getPersistDir()).withVersioningPolicy(versioningPolicy);
        FireDepartment fireDepartment = new FireDepartment(dataSchema, iOConfig, withVersioningPolicy);
        this.metrics = fireDepartment.getMetrics();
        RealtimeMetricsMonitor realtimeMetricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment), ImmutableMap.of("taskId", new String[]{getId()}));
        this.queryRunnerFactoryConglomerate = taskToolbox.getQueryRunnerFactoryConglomerate();
        this.plumber = new RealtimePlumberSchool(taskToolbox.getEmitter(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentPusher(), dataSegmentAnnouncer, taskActionSegmentPublisher, taskToolbox.getSegmentHandoffNotifierFactory(), taskToolbox.getQueryExecutorService(), taskToolbox.getIndexMergerV9(), taskToolbox.getIndexIO(), taskToolbox.getCache(), taskToolbox.getCacheConfig(), taskToolbox.getObjectMapper()).findPlumber(dataSchema, withVersioningPolicy, this.metrics);
        Supplier supplier = null;
        File firehoseTemporaryDir = taskToolbox.getFirehoseTemporaryDir();
        LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ? taskToolbox.getLookupNodeService() : new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(taskToolbox.getDruidNode(), "peon", ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
        try {
            try {
                taskToolbox.getDataSegmentServerAnnouncer().announce();
                taskToolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
                this.plumber.startJob();
                taskToolbox.getMonitorScheduler().addMonitor(realtimeMetricsMonitor);
                FileUtils.forceMkdir(firehoseTemporaryDir);
                FirehoseFactory firehoseFactory = this.spec.getIOConfig().getFirehoseFactory();
                boolean isFirehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
                synchronized (this) {
                    if (!this.gracefullyStopped) {
                        this.firehose = firehoseFactory.connect(this.spec.getDataSchema().getParser(), firehoseTemporaryDir);
                        supplier = Committers.supplierFromFirehose(this.firehose);
                    }
                }
                while (this.firehose != null && ((!this.gracefullyStopped || isFirehoseDrainableByClosing) && this.firehose.hasMore())) {
                    Plumbers.addNextRow(supplier, this.firehose, this.plumber, withVersioningPolicy.isReportParseExceptions(), this.metrics);
                }
                try {
                    if (1 != 0) {
                        try {
                            try {
                                if (this.firehose != null) {
                                    log.info("Persisting remaining data.", new Object[0]);
                                    final Committer committer = (Committer) supplier.get();
                                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                                    this.plumber.persist(new Committer() { // from class: io.druid.indexing.common.task.RealtimeIndexTask.3
                                        public Object getMetadata() {
                                            return committer.getMetadata();
                                        }

                                        public void run() {
                                            try {
                                                committer.run();
                                            } finally {
                                                countDownLatch.countDown();
                                            }
                                        }
                                    });
                                    countDownLatch.await();
                                }
                                if (this.gracefullyStopped) {
                                    log.info("Gracefully stopping.", new Object[0]);
                                } else {
                                    log.info("Finishing the job.", new Object[0]);
                                    synchronized (this) {
                                        if (this.gracefullyStopped) {
                                            log.info("Gracefully stopping.", new Object[0]);
                                        } else {
                                            this.finishingJob = true;
                                        }
                                    }
                                    if (this.finishingJob) {
                                        this.plumber.finishJob();
                                    }
                                }
                                if (this.firehose != null) {
                                    CloseQuietly.close(this.firehose);
                                }
                                taskToolbox.getMonitorScheduler().removeMonitor(realtimeMetricsMonitor);
                            } catch (Exception e) {
                                log.makeAlert(e, "Failed to finish realtime task", new Object[0]).emit();
                                throw e;
                            }
                        } catch (InterruptedException e2) {
                            log.debug(e2, "Interrupted while finishing the job", new Object[0]);
                            if (this.firehose != null) {
                                CloseQuietly.close(this.firehose);
                            }
                            taskToolbox.getMonitorScheduler().removeMonitor(realtimeMetricsMonitor);
                        }
                    }
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                    taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    log.info("Job done!", new Object[0]);
                    return TaskStatus.success(getId());
                } catch (Throwable th) {
                    if (this.firehose != null) {
                        CloseQuietly.close(this.firehose);
                    }
                    taskToolbox.getMonitorScheduler().removeMonitor(realtimeMetricsMonitor);
                    throw th;
                }
            } catch (Throwable th2) {
                log.makeAlert(th2, "Exception aborted realtime processing[%s]", new Object[]{dataSchema.getDataSource()}).emit();
                throw th2;
            }
        } catch (Throwable th3) {
            try {
                if (1 != 0) {
                    try {
                        try {
                            if (this.firehose != null) {
                                log.info("Persisting remaining data.", new Object[0]);
                                final Committer committer2 = (Committer) supplier.get();
                                final CountDownLatch countDownLatch2 = new CountDownLatch(1);
                                this.plumber.persist(new Committer() { // from class: io.druid.indexing.common.task.RealtimeIndexTask.3
                                    public Object getMetadata() {
                                        return committer2.getMetadata();
                                    }

                                    public void run() {
                                        try {
                                            committer2.run();
                                        } finally {
                                            countDownLatch2.countDown();
                                        }
                                    }
                                });
                                countDownLatch2.await();
                            }
                            if (this.gracefullyStopped) {
                                log.info("Gracefully stopping.", new Object[0]);
                            } else {
                                log.info("Finishing the job.", new Object[0]);
                                synchronized (this) {
                                    if (this.gracefullyStopped) {
                                        log.info("Gracefully stopping.", new Object[0]);
                                    } else {
                                        this.finishingJob = true;
                                    }
                                    if (this.finishingJob) {
                                        this.plumber.finishJob();
                                    }
                                }
                            }
                            if (this.firehose != null) {
                                CloseQuietly.close(this.firehose);
                            }
                            taskToolbox.getMonitorScheduler().removeMonitor(realtimeMetricsMonitor);
                        } catch (Exception e3) {
                            log.makeAlert(e3, "Failed to finish realtime task", new Object[0]).emit();
                            throw e3;
                        }
                    } catch (InterruptedException e4) {
                        log.debug(e4, "Interrupted while finishing the job", new Object[0]);
                        if (this.firehose != null) {
                            CloseQuietly.close(this.firehose);
                        }
                        taskToolbox.getMonitorScheduler().removeMonitor(realtimeMetricsMonitor);
                    }
                }
                taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                throw th3;
            } catch (Throwable th4) {
                if (this.firehose != null) {
                    CloseQuietly.close(this.firehose);
                }
                taskToolbox.getMonitorScheduler().removeMonitor(realtimeMetricsMonitor);
                throw th4;
            }
        }
    }

    @Override // io.druid.indexing.common.task.AbstractTask, io.druid.indexing.common.task.Task
    public boolean canRestore() {
        return true;
    }

    @Override // io.druid.indexing.common.task.AbstractTask, io.druid.indexing.common.task.Task
    public void stopGracefully() {
        try {
            synchronized (this) {
                if (!this.gracefullyStopped) {
                    this.gracefullyStopped = true;
                    if (this.firehose == null) {
                        log.info("stopGracefully: Firehose not started yet, so nothing to stop.", new Object[0]);
                    } else if (this.finishingJob) {
                        log.info("stopGracefully: Interrupting finishJob.", new Object[0]);
                        this.runThread.interrupt();
                    } else if (isFirehoseDrainableByClosing(this.spec.getIOConfig().getFirehoseFactory())) {
                        log.info("stopGracefully: Draining firehose.", new Object[0]);
                        this.firehose.close();
                    } else {
                        log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.", new Object[0]);
                        this.runThread.interrupt();
                    }
                }
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @JsonIgnore
    public Firehose getFirehose() {
        return this.firehose;
    }

    @JsonIgnore
    public FireDepartmentMetrics getMetrics() {
        return this.metrics;
    }

    @JsonProperty("spec")
    public FireDepartment getRealtimeIngestionSchema() {
        return this.spec;
    }

    protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
        return (firehoseFactory instanceof EventReceiverFirehoseFactory) || ((firehoseFactory instanceof TimedShutoffFirehoseFactory) && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory())) || ((firehoseFactory instanceof ClippedFirehoseFactory) && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
    }

    private void setupTimeoutAlert() {
        if (this.spec.getTuningConfig().getAlertTimeout() > 0) {
            new Timer("RealtimeIndexTask-Timer", true).schedule(new TimerTask() { // from class: io.druid.indexing.common.task.RealtimeIndexTask.4
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    RealtimeIndexTask.log.makeAlert("RealtimeIndexTask for dataSource [%s] hasn't finished in configured time [%d] ms.", new Object[]{RealtimeIndexTask.this.spec.getDataSchema().getDataSource(), Long.valueOf(RealtimeIndexTask.this.spec.getTuningConfig().getAlertTimeout())}).emit();
                }
            }, this.spec.getTuningConfig().getAlertTimeout());
        }
    }
}
