package com.github.ddth.queue.impl.universal.idint;

import com.github.ddth.commons.utils.DPathUtils;
import com.github.ddth.dao.utils.DaoException;
import com.github.ddth.dao.utils.DuplicatedValueException;
import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.universal.base.BaseUniversalJdbcQueue;
import com.github.ddth.queue.impl.universal.msg.UniversalIdIntQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import com.github.ddth.queue.utils.QueueUtils;
import java.sql.Connection;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DuplicateKeyException;

/* loaded from: input_file:com/github/ddth/queue/impl/universal/idint/AbstractLessLockingUniversalJdbcQueue.class */
public class AbstractLessLockingUniversalJdbcQueue extends BaseUniversalJdbcQueue<UniversalIdIntQueueMessage, Long> {
    public static final boolean DEFAULT_FIFO = true;
    public static final String COL_QUEUE_ID = "queue_id";
    public static final String COL_EPHEMERAL_ID = "ephemeral_id";
    public static final String COL_ORG_TIMESTAMP = "msg_org_timestamp";
    public static final String COL_TIMESTAMP = "msg_timestamp";
    public static final String COL_NUM_REQUEUES = "msg_num_requeues";
    public static final String COL_CONTENT = "msg_content";
    private static final String FIELD_COUNT = "num_entries";
    protected String SQL_GET_ORPHAN_MSGS;
    protected String SQL_PUT_NEW_TO_QUEUE;
    protected String SQL_REPUT_TO_QUEUE;
    protected String SQL_REMOVE_FROM_EPHEMERAL;
    protected String SQL_REQUEUE;
    protected String SQL_REQUEUE_SILENT;
    protected String SQL_UPDATE_EPHEMERAL_ID_TAKE;
    protected String SQL_CLEAR_EPHEMERAL_ID;
    protected String SQL_READ_BY_EPHEMERAL_ID;
    private Logger LOGGER = LoggerFactory.getLogger(AbstractLessLockingUniversalJdbcQueue.class);
    private boolean fifo = true;
    private String SQL_COUNT = "SELECT COUNT(*) AS num_entries FROM {0} WHERE ephemeral_id=0";
    private String SQL_COUNT_EPHEMERAL = "SELECT COUNT(*) AS num_entries FROM {0} WHERE ephemeral_id!=0";

    public AbstractLessLockingUniversalJdbcQueue setFifo(boolean z) {
        this.fifo = z;
        return this;
    }

    public AbstractLessLockingUniversalJdbcQueue markFifo(boolean z) {
        this.fifo = z;
        return this;
    }

    public boolean isFifo() {
        return this.fifo;
    }

    public boolean getFifo() {
        return this.fifo;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    public String getTableNameEphemeral() {
        return getTableName();
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue, com.github.ddth.queue.impl.AbstractQueue
    public AbstractLessLockingUniversalJdbcQueue init() {
        super.init();
        this.SQL_COUNT = MessageFormat.format(this.SQL_COUNT, getTableName());
        this.SQL_COUNT_EPHEMERAL = MessageFormat.format(this.SQL_COUNT_EPHEMERAL, getTableNameEphemeral());
        return this;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected int queueSize(Connection connection) {
        Integer num = (Integer) DPathUtils.getValue(getJdbcHelper().executeSelectOne(connection, this.SQL_COUNT, new Object[0]), FIELD_COUNT, Integer.class);
        if (num != null) {
            return num.intValue();
        }
        return 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected int ephemeralSize(Connection connection) {
        Integer num = (Integer) DPathUtils.getValue(getJdbcHelper().executeSelectOne(connection, this.SQL_COUNT_EPHEMERAL, new Object[0]), FIELD_COUNT, Integer.class);
        if (num != null) {
            return num.intValue();
        }
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalIdIntQueueMessage readFromQueueStorage(Connection connection) {
        return null;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected UniversalIdIntQueueMessage readFromEphemeralStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return null;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected Collection<UniversalIdIntQueueMessage> getOrphanFromEphemeralStorage(Connection connection, long j) {
        Date date = new Date(System.currentTimeMillis() - j);
        ArrayList arrayList = new ArrayList();
        Stream executeSelectAsStream = getJdbcHelper().executeSelectAsStream(connection, this.SQL_GET_ORPHAN_MSGS, new Object[]{date});
        Throwable th = null;
        try {
            try {
                executeSelectAsStream.forEach(map -> {
                    arrayList.add(UniversalIdIntQueueMessage.newInstance((Map<String, Object>) map));
                });
                if (executeSelectAsStream != null) {
                    if (0 != 0) {
                        try {
                            executeSelectAsStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeSelectAsStream.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (executeSelectAsStream != null) {
                if (th != null) {
                    try {
                        executeSelectAsStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeSelectAsStream.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean putToQueueStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        Long qId = iQueueMessage.qId();
        return (qId == null || qId.longValue() == 0) ? getJdbcHelper().execute(connection, this.SQL_PUT_NEW_TO_QUEUE, new Object[]{iQueueMessage.qOriginalTimestamp(), iQueueMessage.qTimestamp(), Integer.valueOf(iQueueMessage.qNumRequeues()), iQueueMessage.qData()}) > 0 : getJdbcHelper().execute(connection, this.SQL_REPUT_TO_QUEUE, new Object[]{qId, iQueueMessage.qOriginalTimestamp(), iQueueMessage.qTimestamp(), Integer.valueOf(iQueueMessage.qNumRequeues()), iQueueMessage.qData()}) > 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean putToEphemeralStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return true;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean removeFromQueueStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return true;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean removeFromEphemeralStorage(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage) {
        return getJdbcHelper().execute(connection, this.SQL_REMOVE_FROM_EPHEMERAL, new Object[]{iQueueMessage.qId()}) > 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _queueWithRetries(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage, int i, int i2) {
        try {
            Date date = new Date();
            iQueueMessage.qNumRequeues2(0).qOriginalTimestamp2(date).qTimestamp2(date);
            return putToQueueStorage(connection, iQueueMessage);
        } catch (DuplicatedValueException e) {
            this.LOGGER.warn(e.getMessage(), e);
            return true;
        } catch (Exception e2) {
            if (e2 instanceof QueueException) {
                throw ((QueueException) e2);
            }
            throw new QueueException(e2);
        } catch (DaoException e3) {
            if (e3.getCause() instanceof DuplicateKeyException) {
                this.LOGGER.warn(e3.getMessage(), e3);
                return true;
            }
            if (!(e3.getCause() instanceof ConcurrencyFailureException)) {
                throw e3;
            }
            if (i > i2) {
                throw new QueueException((Throwable) e3);
            }
            return _queueWithRetries(connection, iQueueMessage, i + 1, i2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _requeueWithRetries(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage, int i, int i2) {
        try {
            return getJdbcHelper().execute(connection, this.SQL_REQUEUE, new Object[]{new Date(), iQueueMessage.qId()}) > 0;
        } catch (DuplicatedValueException e) {
            this.LOGGER.warn(e.getMessage(), e);
            return true;
        } catch (Exception e2) {
            if (e2 instanceof QueueException) {
                throw ((QueueException) e2);
            }
            throw new QueueException(e2);
        } catch (DaoException e3) {
            if (e3.getCause() instanceof DuplicateKeyException) {
                this.LOGGER.warn(e3.getMessage(), e3);
                return true;
            }
            if (!(e3.getCause() instanceof ConcurrencyFailureException)) {
                throw e3;
            }
            if (i > i2) {
                throw new QueueException((Throwable) e3);
            }
            return _requeueSilentWithRetries(connection, iQueueMessage, i + 1, i2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _requeueSilentWithRetries(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage, int i, int i2) {
        try {
            return getJdbcHelper().execute(connection, this.SQL_REQUEUE_SILENT, new Object[]{iQueueMessage.qId()}) > 0;
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        } catch (DuplicatedValueException e2) {
            this.LOGGER.warn(e2.getMessage(), e2);
            return true;
        } catch (DaoException e3) {
            if (e3.getCause() instanceof DuplicateKeyException) {
                this.LOGGER.warn(e3.getMessage(), e3);
                return true;
            }
            if (!(e3.getCause() instanceof ConcurrencyFailureException)) {
                throw e3;
            }
            if (i > i2) {
                throw new QueueException((Throwable) e3);
            }
            return _requeueSilentWithRetries(connection, iQueueMessage, i + 1, i2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected void _finishWithRetries(Connection connection, IQueueMessage<Long, byte[]> iQueueMessage, int i, int i2) {
        try {
            removeFromEphemeralStorage(connection, iQueueMessage);
        } catch (Exception e) {
            if (!(e instanceof QueueException)) {
                throw new QueueException(e);
            }
        } catch (DaoException e2) {
            if (e2.getCause() instanceof ConcurrencyFailureException) {
                if (i > i2) {
                    throw new QueueException((Throwable) e2);
                }
                _finishWithRetries(connection, iQueueMessage, i + 1, i2);
            }
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalIdIntQueueMessage _takeWithRetries(Connection connection, int i, int i2) {
        Map executeSelectOne;
        try {
            UniversalIdIntQueueMessage universalIdIntQueueMessage = null;
            long generateId64 = QueueUtils.IDGEN.generateId64();
            if (getJdbcHelper().execute(connection, this.SQL_UPDATE_EPHEMERAL_ID_TAKE, new Object[]{Long.valueOf(generateId64)}) > 0 && (executeSelectOne = getJdbcHelper().executeSelectOne(connection, this.SQL_READ_BY_EPHEMERAL_ID, new Object[]{Long.valueOf(generateId64)})) != null) {
                universalIdIntQueueMessage = UniversalIdIntQueueMessage.newInstance((Map<String, Object>) executeSelectOne);
            }
            return universalIdIntQueueMessage;
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        } catch (DaoException e2) {
            if (!(e2.getCause() instanceof ConcurrencyFailureException)) {
                throw e2;
            }
            if (i > i2) {
                throw new QueueException((Throwable) e2);
            }
            return _takeWithRetries(connection, i + 1, i2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _moveFromEphemeralToQueueStorageWithRetries(IQueueMessage<Long, byte[]> iQueueMessage, Connection connection, int i, int i2) {
        try {
            return getJdbcHelper().execute(connection, this.SQL_CLEAR_EPHEMERAL_ID, new Object[]{iQueueMessage.qId()}) > 0;
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        } catch (DaoException e2) {
            if (!(e2.getCause() instanceof ConcurrencyFailureException)) {
                throw e2;
            }
            if (i > i2) {
                throw new QueueException((Throwable) e2);
            }
            return _moveFromEphemeralToQueueStorageWithRetries(iQueueMessage, connection, i + 1, i2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected /* bridge */ /* synthetic */ IQueueMessage readFromEphemeralStorage(Connection connection, IQueueMessage iQueueMessage) {
        return readFromEphemeralStorage(connection, (IQueueMessage<Long, byte[]>) iQueueMessage);
    }
}
