package de.codepfleger.flume.parquet.sink;

import de.codepfleger.flume.parquet.serializer.ParquetSerializer;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.Iterator;
import java.util.Random;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/codepfleger/flume/parquet/sink/HDFSParquetSink.class */
public class HDFSParquetSink extends AbstractSink implements Configurable {
    public static final String EVENTS_PER_TRANSACTION_KEY = "eventsPerTransaction";
    public static final String FILE_PATH_KEY = "filePath";
    public static final String FILE_NAME_KEY = "fileName";
    public static final String FILE_SIZE_KEY = "fileSize";
    public static final String FILE_PAGE_SIZE_KEY = "pageSize";
    public static final String FILE_BLOCK_SIZE_KEY = "blockSize";
    public static final String FILE_COMPRESSION_KEY = "fileCompression";
    public static final String FILE_QUEUE_SIZE_KEY = "fileQueueSize";
    public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds";
    private final Object lock = new Object();
    private final Random random = new Random();
    private SerializerLinkedHashMap serializers;
    private Configuration configuration;
    private CompressionCodecName compressionCodec;
    private int eventsPerTransaction;
    private int timeoutSeconds;
    private String fileName;
    private String filePath;
    private Integer uncompressedFileSize;
    private Integer uncompressedPageSize;
    private Integer uncompressedBlockSize;
    private String serializerType;
    private Context serializerContext;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSParquetSink.class);
    private static final AtomicBoolean processingEnabled = new AtomicBoolean(false);

    public synchronized void start() {
        super.start();
        ShutdownHookManager.get().addShutdownHook(new Runnable() { // from class: de.codepfleger.flume.parquet.sink.HDFSParquetSink.1
            @Override // java.lang.Runnable
            public void run() {
                this.stop();
            }
        }, Integer.MAX_VALUE);
        processingEnabled.getAndSet(true);
    }

    public synchronized void stop() {
        processingEnabled.getAndSet(false);
        synchronized (this.lock) {
            if (this.serializers != null && !this.serializers.isEmpty()) {
                Iterator<SerializerMapEntry> it = this.serializers.values().iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (IOException e) {
                        LOG.error(e.getMessage(), e);
                    }
                }
                this.serializers.clear();
            }
        }
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        if (!processingEnabled.get()) {
            return Sink.Status.READY;
        }
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            for (int i = 0; i < this.eventsPerTransaction; i++) {
                try {
                    Event take = channel.take();
                    if (take != null) {
                        getSerializer(take).write(take);
                    }
                } catch (Throwable th) {
                    transaction.rollback();
                    Sink.Status status = Sink.Status.BACKOFF;
                    transaction.close();
                    return status;
                }
            }
            transaction.commit();
            Sink.Status status2 = Sink.Status.READY;
            transaction.close();
            return status2;
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    private EventSerializer getSerializer(Event event) throws IOException {
        ParquetSerializer serializer;
        String replaceWildcards = replaceWildcards(this.filePath, event);
        String replaceWildcards2 = replaceWildcards(this.fileName, event);
        String str = replaceWildcards + replaceWildcards2;
        synchronized (this.lock) {
            SerializerMapEntry serializerMapEntry = this.serializers.get(str);
            if (isSerializerInvalid(serializerMapEntry)) {
                serializerMapEntry.close();
                this.serializers.remove(str);
                serializerMapEntry = null;
            }
            if (serializerMapEntry == null) {
                serializerMapEntry = createSerializer(replaceWildcards, replaceWildcards2);
                this.serializers.put(str, serializerMapEntry);
            }
            serializer = serializerMapEntry.getSerializer();
        }
        return serializer;
    }

    private boolean isSerializerInvalid(SerializerMapEntry serializerMapEntry) {
        if (serializerMapEntry != null) {
            return serializerMapEntry.getSerializer().getWriter().getDataSize() > ((long) this.uncompressedFileSize.intValue()) || new Date().getTime() > serializerMapEntry.getStartTime() + ((long) (this.timeoutSeconds * 1000));
        }
        return false;
    }

    private SerializerMapEntry createSerializer(String str, String str2) throws IOException {
        ParquetSerializer parquetSerializer = (ParquetSerializer) EventSerializerFactory.getInstance(this.serializerType, this.serializerContext, (OutputStream) null);
        String replaceRandomSalt = replaceRandomSalt(str2);
        String str3 = str + "_" + replaceRandomSalt;
        String str4 = str + replaceRandomSalt;
        Path path = new Path(str3);
        path.getFileSystem(this.configuration);
        parquetSerializer.initialize(AvroParquetWriter.builder(path).withSchema(parquetSerializer.getSchema()).withCompressionCodec(this.compressionCodec).withPageSize(this.uncompressedPageSize.intValue()).withRowGroupSize(this.uncompressedBlockSize.intValue()).build());
        return new SerializerMapEntry(path, this.configuration, str4, parquetSerializer);
    }

    private String replaceRandomSalt(String str) {
        int abs = Math.abs(this.random.nextInt());
        return str.contains("%[n]") ? str.replace("%[n]", "" + abs) : str + "." + abs;
    }

    private String replaceWildcards(String str, Event event) {
        return BucketPath.escapeString(str, event.getHeaders(), (TimeZone) null, false, 0, 1, true);
    }

    public void configure(Context context) {
        this.filePath = context.getString(FILE_PATH_KEY);
        if (this.filePath == null) {
            throw new IllegalStateException("filePath missing");
        }
        this.fileName = context.getString(FILE_NAME_KEY);
        if (this.fileName == null) {
            throw new IllegalStateException("fileName missing");
        }
        this.serializerType = context.getString("serializer");
        if (this.serializerType == null) {
            throw new IllegalStateException("serializer missing");
        }
        this.compressionCodec = CompressionCodecName.fromConf(context.getString(FILE_COMPRESSION_KEY, CompressionCodecName.SNAPPY.name()));
        this.eventsPerTransaction = context.getInteger(EVENTS_PER_TRANSACTION_KEY, 10).intValue();
        this.uncompressedFileSize = context.getInteger(FILE_SIZE_KEY, 500000);
        this.uncompressedPageSize = context.getInteger(FILE_PAGE_SIZE_KEY, 1048576);
        this.uncompressedBlockSize = context.getInteger(FILE_BLOCK_SIZE_KEY, 134217728);
        this.timeoutSeconds = context.getInteger(TIMEOUT_SECONDS_KEY, 3600).intValue();
        this.serializers = new SerializerLinkedHashMap(context.getInteger(FILE_QUEUE_SIZE_KEY, 2).intValue());
        this.serializerContext = new Context(context.getSubProperties("serializer."));
        this.configuration = new Configuration();
        this.configuration.setBoolean("fs.automatic.close", false);
    }
}
