package com.jeesuite.kafka.producer.handler;

import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.utils.NodeNameHolder;
import java.io.IOException;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:com/jeesuite/kafka/producer/handler/SendCounterHandler.class */
public class SendCounterHandler implements ProducerEventHandler {
    private Map<String, AtomicLong[]> producerStats = new HashMap();
    private int currentStatHourOfDay = Calendar.getInstance().get(11);
    private ZkClient zkClient;
    private ScheduledExecutorService statScheduler;
    private String producerGroup;

    public SendCounterHandler(String str, String str2) {
        this.producerGroup = str == null ? NodeNameHolder.getNodeId() : str;
        this.zkClient = new ZkClient(str2, 10000, 10000, ZKStringSerializer$.MODULE$);
        initCollectionTimer();
    }

    @Override // com.jeesuite.kafka.producer.handler.ProducerEventHandler
    public void onSuccessed(String str, RecordMetadata recordMetadata) {
        updateProducerStat(str, false);
    }

    @Override // com.jeesuite.kafka.producer.handler.ProducerEventHandler
    public void onError(String str, DefaultMessage defaultMessage, boolean z) {
        updateProducerStat(str, true);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.statScheduler.shutdown();
        this.zkClient.close();
    }

    private void initCollectionTimer() {
        this.statScheduler = Executors.newScheduledThreadPool(1);
        this.statScheduler.scheduleAtFixedRate(new Runnable() { // from class: com.jeesuite.kafka.producer.handler.SendCounterHandler.1
            @Override // java.lang.Runnable
            public void run() {
                if (Calendar.getInstance().get(11) != SendCounterHandler.this.currentStatHourOfDay) {
                    SendCounterHandler.this.producerStats.clear();
                }
            }
        }, 1L, 5L, TimeUnit.SECONDS);
    }

    private void updateProducerStat(String str, boolean z) {
        if (!this.producerStats.containsKey(str)) {
            synchronized (this.producerStats) {
                this.producerStats.put(str, new AtomicLong[]{new AtomicLong(0L), new AtomicLong(0L)});
            }
        }
        if (z) {
            this.producerStats.get(str)[1].incrementAndGet();
        } else {
            this.producerStats.get(str)[0].incrementAndGet();
        }
    }

    private Map<String, Long[]> producerStats() {
        HashMap hashMap = new HashMap();
        for (String str : this.producerStats.keySet()) {
            AtomicLong[] atomicLongArr = this.producerStats.get(str);
            hashMap.put(str, new Long[]{Long.valueOf(atomicLongArr[0].get()), Long.valueOf(atomicLongArr[1].get())});
        }
        return hashMap;
    }
}
