package com.github.ddth.queue.impl.universal.idint;

import com.github.ddth.commons.utils.MapUtils;
import com.github.ddth.dao.jdbc.utils.DefaultNamedParamsFilters;
import com.github.ddth.dao.jdbc.utils.DefaultNamedParamsSqlBuilders;
import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.universal.BaseUniversalJdbcQueue;
import com.github.ddth.queue.impl.universal.UniversalIdIntQueueMessage;
import com.github.ddth.queue.impl.universal.UniversalIdIntQueueMessageFactory;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.stream.Stream;

/* loaded from: input_file:com/github/ddth/queue/impl/universal/idint/UniversalSingleStorageJdbcQueue.class */
public class UniversalSingleStorageJdbcQueue extends BaseUniversalJdbcQueue<UniversalIdIntQueueMessage, Long> {
    public static final String COL_QUEUE_NAME = "queue_name";
    public static final String COL_QUEUE_ID = "queue_id";
    public static final String COL_ORG_TIMESTAMP = "msg_org_timestamp";
    public static final String COL_TIMESTAMP = "msg_timestamp";
    public static final String COL_NUM_REQUEUES = "msg_num_requeues";
    public static final String COL_CONTENT = "msg_content";
    private String SQL_PEEK_FROM_QUEUE;
    private String SQL_READ_FROM_EPHEMERAL;
    private String SQL_GET_ORPHAN_MSGS;
    private String SQL_PUT_NEW_TO_QUEUE;
    private String SQL_REPUT_TO_QUEUE;
    private String SQL_PUT_TO_EPHEMERAL;
    private String SQL_REMOVE_FROM_QUEUE;
    private String SQL_REMOVE_FROM_EPHEMERAL;

    @Override // com.github.ddth.queue.impl.JdbcQueue, com.github.ddth.queue.impl.AbstractQueue
    public UniversalSingleStorageJdbcQueue init() throws Exception {
        String[] strArr = {"queue_id AS id", "msg_org_timestamp AS time", "msg_timestamp AS queue_time", "msg_num_requeues AS num_requeues", "msg_content AS data"};
        this.SQL_PEEK_FROM_QUEUE = new DefaultNamedParamsSqlBuilders.SelectBuilder().withColumns(strArr).withFilterWhere(new DefaultNamedParamsFilters.FilterFieldValue("queue_name", "=", "dummy")).withSorting(isFifo() ? MapUtils.createMap(new Object[]{"msg_org_timestamp", Boolean.FALSE}) : null).withLimit(1).withTableNames(new String[]{getTableName()}).build().clause;
        this.SQL_READ_FROM_EPHEMERAL = new DefaultNamedParamsSqlBuilders.SelectBuilder().withColumns(strArr).withFilterWhere(new DefaultNamedParamsFilters.FilterAnd().addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_name", "=", "dummy")).addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_id", "=", "dummy"))).withTableNames(new String[]{getTableNameEphemeral()}).build().clause;
        this.SQL_GET_ORPHAN_MSGS = new DefaultNamedParamsSqlBuilders.SelectBuilder().withColumns(strArr).withFilterWhere(new DefaultNamedParamsFilters.FilterAnd().addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_name", "=", "dummy")).addFilter(new DefaultNamedParamsFilters.FilterFieldValue("msg_timestamp", "<", "dummy"))).withTableNames(new String[]{getTableNameEphemeral()}).build().clause;
        this.SQL_PUT_NEW_TO_QUEUE = new DefaultNamedParamsSqlBuilders.InsertBuilder(getTableName(), MapUtils.createMap(new Object[]{"queue_name", "dummy", "msg_org_timestamp", "dummy", "msg_timestamp", "dummy", "msg_num_requeues", "dummy", "msg_content", "dummy"})).build().clause;
        this.SQL_REPUT_TO_QUEUE = new DefaultNamedParamsSqlBuilders.InsertBuilder(getTableName(), MapUtils.createMap(new Object[]{"queue_name", "dummy", "queue_id", "dummy", "msg_org_timestamp", "dummy", "msg_timestamp", "dummy", "msg_num_requeues", "dummy", "msg_content", "dummy"})).build().clause;
        this.SQL_PUT_TO_EPHEMERAL = new DefaultNamedParamsSqlBuilders.InsertBuilder(getTableNameEphemeral(), MapUtils.createMap(new Object[]{"queue_name", "dummy", "queue_id", "dummy", "msg_org_timestamp", "dummy", "msg_timestamp", "dummy", "msg_num_requeues", "dummy", "msg_content", "dummy"})).build().clause;
        this.SQL_REMOVE_FROM_QUEUE = new DefaultNamedParamsSqlBuilders.DeleteBuilder(getTableName(), new DefaultNamedParamsFilters.FilterAnd().addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_name", "=", "dummy")).addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_id", "=", "dummy"))).build().clause;
        this.SQL_REMOVE_FROM_EPHEMERAL = new DefaultNamedParamsSqlBuilders.DeleteBuilder(getTableNameEphemeral(), new DefaultNamedParamsFilters.FilterAnd().addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_name", "=", "dummy")).addFilter(new DefaultNamedParamsFilters.FilterFieldValue("queue_id", "=", "dummy"))).build().clause;
        if (getMessageFactory() == null) {
            setMessageFactory(UniversalIdIntQueueMessageFactory.INSTANCE);
        }
        super.init();
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalIdIntQueueMessage peekFromQueueStorage(Connection connection) {
        Map<String, Object> executeSelectOne = getJdbcHelper().executeSelectOne(connection, this.SQL_PEEK_FROM_QUEUE, MapUtils.createMap(new Object[]{"queue_name", getQueueName()}));
        if (executeSelectOne != null) {
            return createMessge(executeSelectOne);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalIdIntQueueMessage readFromEphemeralStorage(Connection connection, Long l) {
        Map<String, Object> executeSelectOne = getJdbcHelper().executeSelectOne(connection, this.SQL_READ_FROM_EPHEMERAL, MapUtils.createMap(new Object[]{"queue_name", getQueueName(), "queue_id", l}));
        if (executeSelectOne != null) {
            return createMessge(executeSelectOne);
        }
        return null;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected Collection<UniversalIdIntQueueMessage> getOrphanMessagesFromEphemeralStorage(Connection connection, long j) {
        Date date = new Date(System.currentTimeMillis() - j);
        ArrayList arrayList = new ArrayList();
        Stream executeSelectAsStream = getJdbcHelper().executeSelectAsStream(connection, this.SQL_GET_ORPHAN_MSGS, MapUtils.createMap(new Object[]{"queue_name", getQueueName(), "msg_timestamp", date}));
        try {
            executeSelectAsStream.forEach(map -> {
                arrayList.add(UniversalIdIntQueueMessage.newInstance((Map<String, Object>) map));
            });
            if (executeSelectAsStream != null) {
                executeSelectAsStream.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (executeSelectAsStream != null) {
                try {
                    executeSelectAsStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Map<String, Object> toMapForSqlBuilder(UniversalIdIntQueueMessage universalIdIntQueueMessage) {
        return MapUtils.createMap(new Object[]{"queue_name", getQueueName(), "queue_id", universalIdIntQueueMessage.getId(), "msg_org_timestamp", universalIdIntQueueMessage.getTimestamp(), "msg_timestamp", universalIdIntQueueMessage.getQueueTimestamp(), "msg_num_requeues", Integer.valueOf(universalIdIntQueueMessage.getNumRequeues()), "msg_content", universalIdIntQueueMessage.getContent()});
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean putToQueueStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        UniversalIdIntQueueMessage universalIdIntQueueMessage = (UniversalIdIntQueueMessage) ensureMessageType(iQueueMessage, UniversalIdIntQueueMessage.class);
        Long id = universalIdIntQueueMessage.getId();
        return (id == null || id.longValue() == 0) ? getJdbcHelper().execute(connection, this.SQL_PUT_NEW_TO_QUEUE, toMapForSqlBuilder(universalIdIntQueueMessage)) > 0 : getJdbcHelper().execute(connection, this.SQL_REPUT_TO_QUEUE, toMapForSqlBuilder(universalIdIntQueueMessage)) > 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean putToEphemeralStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return getJdbcHelper().execute(connection, this.SQL_PUT_TO_EPHEMERAL, toMapForSqlBuilder((UniversalIdIntQueueMessage) ensureMessageType(iQueueMessage, UniversalIdIntQueueMessage.class))) > 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean removeFromQueueStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return getJdbcHelper().execute(connection, this.SQL_REMOVE_FROM_QUEUE, MapUtils.createMap(new Object[]{"queue_name", getQueueName(), "queue_id", ((UniversalIdIntQueueMessage) ensureMessageType(iQueueMessage, UniversalIdIntQueueMessage.class)).getId()})) > 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean removeFromEphemeralStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return getJdbcHelper().execute(connection, this.SQL_REMOVE_FROM_EPHEMERAL, MapUtils.createMap(new Object[]{"queue_name", getQueueName(), "queue_id", ((UniversalIdIntQueueMessage) ensureMessageType(iQueueMessage, UniversalIdIntQueueMessage.class)).getId()})) > 0;
    }
}
