package org.joyqueue.server.archive.store;

import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.hbase.HBaseClient;
import org.joyqueue.monitor.PointTracer;
import org.joyqueue.monitor.TraceStat;
import org.joyqueue.server.archive.store.QueryCondition;
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.server.archive.store.model.AchivePosition;
import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.server.archive.store.model.Query;
import org.joyqueue.server.archive.store.model.SendLog;
import org.joyqueue.toolkit.lang.Pair;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.security.Md5;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/server/archive/store/HBaseStore.class */
public class HBaseStore implements ArchiveStore {
    private static final Logger logger = LoggerFactory.getLogger(HBaseStore.class);
    private HBaseClient hBaseClient;
    private HBaseTopicAppMapping topicAppMapping;
    private String namespace;
    private static final byte endFlag = 58;
    private String consumeLogTable = "consume_log";
    private String sendLogTable = "send_log";
    private String positionTable = "archive_position";
    private byte[] cf = "cf".getBytes(Charset.forName("utf-8"));
    private byte[] col = "col".getBytes(Charset.forName("utf-8"));
    private boolean isStart = false;

    public HBaseStore() {
    }

    public HBaseStore(HBaseClient hBaseClient) {
        this.hBaseClient = hBaseClient;
    }

    public boolean isStarted() {
        return this.isStart;
    }

    public void start() {
        try {
            if (StringUtils.isEmpty(this.namespace)) {
                logger.error("archive namespace is null.");
            }
            logger.info("archive namespace is [{}]", this.namespace);
            if (this.hBaseClient == null) {
                this.hBaseClient = new HBaseClient();
            }
            this.hBaseClient.start();
            this.topicAppMapping = new HBaseTopicAppMapping(this.namespace, this.hBaseClient);
            this.isStart = true;
            logger.info("HBaseStore is started.");
        } catch (Throwable th) {
            this.isStart = false;
            logger.error(th.getMessage(), th);
        }
    }

    public void stop() {
        this.hBaseClient.stop();
        logger.info("HBaseClient is stopped.");
    }

    public void putConsumeLog(List<ConsumeLog> list, PointTracer pointTracer) throws JoyQueueException {
        LinkedList linkedList = new LinkedList();
        try {
            for (ConsumeLog consumeLog : list) {
                consumeLog.setAppId(this.topicAppMapping.getAppId(consumeLog.getApp()));
                linkedList.add(HBaseSerializer.convertConsumeLogToKVBytes(consumeLog));
            }
            TraceStat begin = pointTracer.begin("org.joyqueue.server.archive.store.HBaseStore.putConsumeLog");
            this.hBaseClient.put(this.namespace, this.consumeLogTable, this.cf, this.col, linkedList);
            pointTracer.end(begin);
        } catch (IOException e) {
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    public void putSendLog(List<SendLog> list, PointTracer pointTracer) throws JoyQueueException {
        try {
            LinkedList linkedList = new LinkedList();
            for (SendLog sendLog : list) {
                int topicId = this.topicAppMapping.getTopicId(sendLog.getTopic());
                int appId = this.topicAppMapping.getAppId(sendLog.getApp());
                sendLog.setTopicId(topicId);
                sendLog.setAppId(appId);
                linkedList.add(HBaseSerializer.convertSendLogToKVBytes(sendLog));
                linkedList.add(HBaseSerializer.convertSendLogToKVBytes4BizId(sendLog));
            }
            TraceStat begin = pointTracer.begin("org.joyqueue.server.archive.store.HBaseStore.putSendLog");
            this.hBaseClient.put(this.namespace, this.sendLogTable, this.cf, this.col, linkedList);
            pointTracer.end(begin);
        } catch (Exception e) {
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    public void putPosition(AchivePosition achivePosition) throws JoyQueueException {
        try {
            this.hBaseClient.put(this.namespace, this.positionTable, this.cf, this.col, Bytes.toBytes(achivePosition.getTopic() + ":" + ((int) achivePosition.getPartition())), Bytes.toBytes(achivePosition.getIndex()));
        } catch (IOException e) {
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    public Long getPosition(String str, short s) throws JoyQueueException {
        try {
            byte[] bArr = this.hBaseClient.get(this.namespace, this.positionTable, this.cf, this.col, Bytes.toBytes(str + ":" + ((int) s)));
            if (bArr != null) {
                return Long.valueOf(Bytes.toLong(bArr));
            }
            return null;
        } catch (IOException e) {
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    public List<SendLog> scanSendLog(Query query) throws JoyQueueException {
        if (this.hBaseClient == null) {
            logger.error("hBaseClient is null,archive no service");
            throw new JoyQueueException(JoyQueueCode.CN_SERVICE_NOT_AVAILABLE, new Object[]{"hBaseClient is null"});
        }
        LinkedList linkedList = new LinkedList();
        try {
            List<Pair> scan = this.hBaseClient.scan(this.namespace, buildScanParameters(query));
            boolean isNotEmpty = StringUtils.isNotEmpty(((QueryCondition) query.getQueryCondition()).getStartRowKey().getBusinessId());
            for (Pair pair : scan) {
                SendLog readSendLog4BizId = isNotEmpty ? HBaseSerializer.readSendLog4BizId(pair) : HBaseSerializer.readSendLog(pair);
                readSendLog4BizId.setClientIpStr(toIpString(readSendLog4BizId.getClientIp()));
                readSendLog4BizId.setRowKeyStart(HBaseSerializer.byteArrayToHexStr((byte[]) pair.getKey()));
                readSendLog4BizId.setTopic(this.topicAppMapping.getTopicName(readSendLog4BizId.getTopicId()));
                readSendLog4BizId.setApp(this.topicAppMapping.getAppName(readSendLog4BizId.getAppId()));
                linkedList.add(readSendLog4BizId);
            }
            return linkedList;
        } catch (Exception e) {
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    private String toIpString(byte[] bArr) {
        if (bArr.length != 16) {
            logger.error("Client IP byte array length error.");
        }
        boolean z = true;
        int i = 6;
        while (true) {
            if (i < 16) {
                if (bArr[i] != 0) {
                    z = false;
                    break;
                }
                i++;
            } else {
                break;
            }
        }
        StringBuilder sb = new StringBuilder();
        if (!z) {
            IpUtil.toAddress(bArr, sb);
            return sb.toString();
        }
        byte[] bArr2 = new byte[6];
        System.arraycopy(bArr, 0, bArr2, 0, bArr2.length);
        IpUtil.toAddress(bArr2, sb);
        return sb.toString();
    }

    private HBaseClient.ScanParameters buildScanParameters(Query query) throws GeneralSecurityException, JoyQueueException {
        QueryCondition queryCondition = (QueryCondition) query.getQueryCondition();
        HBaseClient.ScanParameters scanParameters = new HBaseClient.ScanParameters();
        scanParameters.setTableName(this.sendLogTable);
        scanParameters.setCf(this.cf);
        scanParameters.setCol(this.col);
        scanParameters.setRowCount(queryCondition.getCount());
        byte[] startRowKeyByteArr = queryCondition.getStartRowKeyByteArr();
        if (startRowKeyByteArr != null) {
            scanParameters.setStartRowKey(startRowKeyByteArr);
        } else {
            scanParameters.setStartRowKey(createRowKey(queryCondition.getStartRowKey()));
        }
        scanParameters.setStopRowKey(createRowKey(queryCondition.getStopRowKey()));
        return scanParameters;
    }

    private byte[] createRowKey(QueryCondition.RowKey rowKey) throws GeneralSecurityException, JoyQueueException {
        ByteBuffer allocate = ByteBuffer.allocate(44);
        int topicId = this.topicAppMapping.getTopicId(rowKey.getTopic());
        long time = rowKey.getTime();
        String businessId = rowKey.getBusinessId();
        String messageId = rowKey.getMessageId();
        allocate.putInt(topicId);
        if (StringUtils.isNotEmpty(businessId)) {
            allocate.put(Md5.INSTANCE.encrypt(businessId.getBytes(), (byte[]) null));
            allocate.putLong(time);
        } else {
            allocate.putLong(time);
            allocate.put(new byte[16]);
        }
        if (messageId != null) {
            allocate.put(new BigInteger(messageId, 16).toByteArray());
        } else {
            allocate.put(new byte[16]);
        }
        return allocate.array();
    }

    private Filter createFilter(QueryCondition.RowKey rowKey, byte[] bArr) {
        if (!StringUtils.isNotEmpty(rowKey.getBusinessId())) {
            return null;
        }
        LinkedList linkedList = new LinkedList();
        org.apache.hadoop.hbase.util.Pair pair = new org.apache.hadoop.hbase.util.Pair();
        for (int i = 4; i < 12; i++) {
            bArr[i] = Bytes.toBytes("?")[0];
        }
        for (int i2 = 28; i2 < 44; i2++) {
            bArr[i2] = Bytes.toBytes("?")[0];
        }
        pair.setFirst(bArr);
        ByteBuffer allocate = ByteBuffer.allocate(44);
        for (int i3 = 0; i3 < 4; i3++) {
            allocate.put((byte) 0);
        }
        for (int i4 = 0; i4 < 8; i4++) {
            allocate.put((byte) 1);
        }
        for (int i5 = 0; i5 < 16; i5++) {
            allocate.put((byte) 0);
        }
        for (int i6 = 0; i6 < 16; i6++) {
            allocate.put((byte) 1);
        }
        pair.setSecond(allocate.array());
        linkedList.add(pair);
        return new FuzzyRowFilter(linkedList);
    }

    public SendLog getOneSendLog(Query query) throws JoyQueueException {
        QueryCondition.RowKey rowKey = ((QueryCondition) query.getQueryCondition()).getRowKey();
        try {
            ByteBuffer allocate = ByteBuffer.allocate(44);
            allocate.putInt(this.topicAppMapping.getTopicId(rowKey.getTopic()));
            allocate.putLong(rowKey.getTime());
            allocate.put(Md5.INSTANCE.encrypt(rowKey.getBusinessId().getBytes(Charset.forName("utf-8")), (byte[]) null));
            allocate.put(HBaseSerializer.hexStrToByteArray(rowKey.getMessageId()));
            SendLog readSendLog = HBaseSerializer.readSendLog(this.hBaseClient.getKV(this.namespace, this.sendLogTable, this.cf, this.col, allocate.array()));
            StringBuilder sb = new StringBuilder();
            IpUtil.toAddress(readSendLog.getClientIp(), sb);
            readSendLog.setClientIpStr(sb.toString());
            readSendLog.setTopic(this.topicAppMapping.getTopicName(readSendLog.getTopicId()));
            readSendLog.setApp(this.topicAppMapping.getAppName(readSendLog.getAppId()));
            return readSendLog;
        } catch (Exception e) {
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    public List<ConsumeLog> scanConsumeLog(String str, Integer num) throws JoyQueueException {
        if (this.hBaseClient == null) {
            logger.error("hBaseClient is null,archive no service");
            throw new JoyQueueException(JoyQueueCode.CN_SERVICE_NOT_AVAILABLE, new Object[]{"hBaseClient is null"});
        }
        LinkedList linkedList = new LinkedList();
        try {
            HBaseClient.ScanParameters scanParameters = new HBaseClient.ScanParameters();
            scanParameters.setTableName(this.consumeLogTable);
            scanParameters.setCf(this.cf);
            scanParameters.setCol(this.col);
            byte[] hexStrToByteArray = HBaseSerializer.hexStrToByteArray(str);
            scanParameters.setStartRowKey(hexStrToByteArray);
            ByteBuffer allocate = ByteBuffer.allocate(hexStrToByteArray.length + 1);
            allocate.put(hexStrToByteArray);
            allocate.put((byte) 58);
            scanParameters.setStopRowKey(allocate.array());
            scanParameters.setRowCount(num.intValue());
            Iterator it = this.hBaseClient.scan(this.namespace, scanParameters).iterator();
            while (it.hasNext()) {
                ConsumeLog readConsumeLog = HBaseSerializer.readConsumeLog((Pair) it.next());
                readConsumeLog.setMessageId(HBaseSerializer.byteArrayToHexStr(readConsumeLog.getBytesMessageId()));
                StringBuilder sb = new StringBuilder();
                IpUtil.toAddress(readConsumeLog.getClientIp(), sb);
                readConsumeLog.setClientIpStr(sb.toString());
                readConsumeLog.setApp(this.topicAppMapping.getAppName(readConsumeLog.getAppId()));
                linkedList.add(readConsumeLog);
            }
            return linkedList;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, e, new Object[0]);
        }
    }

    public void setNameSpace(String str) {
        this.namespace = str;
    }
}
