package com.github.ddth.queue.impl.universal2;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.JdbcQueue;
import com.github.ddth.queue.utils.QueueException;
import com.github.ddth.queue.utils.QueueUtils;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.jdbc.core.JdbcTemplate;

/* loaded from: input_file:com/github/ddth/queue/impl/universal2/LessLockingUniversalMySQLQueue.class */
public class LessLockingUniversalMySQLQueue extends JdbcQueue {
    public static final String COL_QUEUE_ID = "queue_id";
    public static final String COL_EPHEMERAL_ID = "ephemeral_id";
    public static final String COL_ORG_TIMESTAMP = "msg_org_timestamp";
    public static final String COL_TIMESTAMP = "msg_timestamp";
    public static final String COL_NUM_REQUEUES = "msg_num_requeues";
    public static final String COL_CONTENT = "msg_content";
    private String SQL_GET_ORPHAN_MSGS;
    private String SQL_PUT_NEW_TO_QUEUE;
    private String SQL_REPUT_TO_QUEUE;
    private String SQL_REMOVE_FROM_EPHEMERAL;
    private String SQL_REQUEUE;
    private String SQL_REQUEUE_SILENT;
    private String SQL_UPDATE_EPHEMERAL_ID_TAKE;
    private String SQL_CLEAR_EPHEMERAL_ID;
    private String SQL_READ_BY_EPHEMERAL_ID;
    private Logger LOGGER = LoggerFactory.getLogger(LessLockingUniversalMySQLQueue.class);
    private boolean fifo = true;

    public LessLockingUniversalMySQLQueue setFifo(boolean z) {
        this.fifo = z;
        return this;
    }

    public LessLockingUniversalMySQLQueue markFifo(boolean z) {
        this.fifo = z;
        return this;
    }

    public boolean isFifo() {
        return this.fifo;
    }

    public boolean getFifo() {
        return this.fifo;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    public String getTableNameEphemeral() {
        return getTableName();
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue, com.github.ddth.queue.impl.AbstractQueue
    public LessLockingUniversalMySQLQueue init() {
        super.init();
        this.SQL_REQUEUE = "UPDATE {0} SET {1}=null, {2}={2}+1, {3}=? WHERE {4}=?";
        this.SQL_REQUEUE = MessageFormat.format(this.SQL_REQUEUE, getTableName(), "ephemeral_id", "msg_num_requeues", "msg_timestamp", "queue_id");
        this.SQL_REQUEUE_SILENT = "UPDATE {0} SET {1}=null WHERE {2}=?";
        this.SQL_REQUEUE_SILENT = MessageFormat.format(this.SQL_REQUEUE_SILENT, getTableName(), "ephemeral_id", "queue_id");
        this.SQL_UPDATE_EPHEMERAL_ID_TAKE = "UPDATE {0} SET {1}=? WHERE {1} IS null" + (this.fifo ? " ORDER BY queue_id DESC" : "") + " LIMIT 1";
        this.SQL_UPDATE_EPHEMERAL_ID_TAKE = MessageFormat.format(this.SQL_UPDATE_EPHEMERAL_ID_TAKE, getTableName(), "ephemeral_id");
        this.SQL_CLEAR_EPHEMERAL_ID = "UPDATE {0} SET {1}=null WHERE {2}=?";
        this.SQL_CLEAR_EPHEMERAL_ID = MessageFormat.format(this.SQL_CLEAR_EPHEMERAL_ID, getTableName(), "ephemeral_id", "queue_id");
        this.SQL_READ_BY_EPHEMERAL_ID = "SELECT {1}, {2}, {3}, {4}, {5} FROM {0} WHERE {6}=?";
        this.SQL_READ_BY_EPHEMERAL_ID = MessageFormat.format(this.SQL_READ_BY_EPHEMERAL_ID, getTableName(), "queue_id AS qid", "msg_org_timestamp AS orgt", "msg_timestamp AS t", "msg_num_requeues AS numq", "msg_content AS data", "ephemeral_id");
        this.SQL_GET_ORPHAN_MSGS = "SELECT {1}, {2}, {3}, {4}, {5} FROM {0} WHERE ephemeral_id IS NOT null AND msg_timestamp<?";
        this.SQL_GET_ORPHAN_MSGS = MessageFormat.format(this.SQL_GET_ORPHAN_MSGS, getTableNameEphemeral(), "queue_id AS qid", "msg_org_timestamp AS orgt", "msg_timestamp AS t", "msg_num_requeues AS numq", "msg_content AS data");
        this.SQL_PUT_NEW_TO_QUEUE = "INSERT INTO {0} ({1}, {2}, {3}, {4}) VALUES (?, ?, ?, ?)";
        this.SQL_PUT_NEW_TO_QUEUE = MessageFormat.format(this.SQL_PUT_NEW_TO_QUEUE, getTableName(), "msg_org_timestamp", "msg_timestamp", "msg_num_requeues", "msg_content");
        this.SQL_REPUT_TO_QUEUE = "INSERT INTO {0} ({1}, {2}, {3}, {4}, {5}) VALUES (?, ?, ?, ?, ?)";
        this.SQL_REPUT_TO_QUEUE = MessageFormat.format(this.SQL_REPUT_TO_QUEUE, getTableName(), "queue_id", "msg_org_timestamp", "msg_timestamp", "msg_num_requeues", "msg_content");
        this.SQL_REMOVE_FROM_EPHEMERAL = "DELETE FROM {0} WHERE queue_id=?";
        this.SQL_REMOVE_FROM_EPHEMERAL = MessageFormat.format(this.SQL_REMOVE_FROM_EPHEMERAL, getTableNameEphemeral());
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalQueueMessage readFromQueueStorage(JdbcTemplate jdbcTemplate) {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalQueueMessage readFromEphemeralStorage(JdbcTemplate jdbcTemplate, IQueueMessage iQueueMessage) {
        return null;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected Collection<IQueueMessage> getOrphanFromEphemeralStorage(JdbcTemplate jdbcTemplate, long j) {
        List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(this.SQL_GET_ORPHAN_MSGS, new Object[]{new Date(j)});
        if (queryForList == null || queryForList.size() <= 0) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (Map<String, Object> map : queryForList) {
            UniversalQueueMessage universalQueueMessage = new UniversalQueueMessage();
            universalQueueMessage.fromMap(map);
            arrayList.add(universalQueueMessage);
        }
        return arrayList;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean putToQueueStorage(JdbcTemplate jdbcTemplate, IQueueMessage iQueueMessage) {
        if (!(iQueueMessage instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage universalQueueMessage = (UniversalQueueMessage) iQueueMessage;
        String qId = universalQueueMessage.qId();
        if (StringUtils.isEmpty(qId)) {
            qId = QueueUtils.IDGEN.generateId128Hex();
        }
        return jdbcTemplate.update(this.SQL_REPUT_TO_QUEUE, new Object[]{qId, universalQueueMessage.qOriginalTimestamp(), universalQueueMessage.qTimestamp(), Integer.valueOf(universalQueueMessage.qNumRequeues()), universalQueueMessage.content()}) > 0;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean putToEphemeralStorage(JdbcTemplate jdbcTemplate, IQueueMessage iQueueMessage) {
        return true;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean removeFromQueueStorage(JdbcTemplate jdbcTemplate, IQueueMessage iQueueMessage) {
        return true;
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean removeFromEphemeralStorage(JdbcTemplate jdbcTemplate, IQueueMessage iQueueMessage) {
        if (iQueueMessage instanceof UniversalQueueMessage) {
            return jdbcTemplate.update(this.SQL_REMOVE_FROM_EPHEMERAL, new Object[]{((UniversalQueueMessage) iQueueMessage).qId()}) > 0;
        }
        throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _queueWithRetries(Connection connection, IQueueMessage iQueueMessage, int i, int i2) throws SQLException {
        if (!(iQueueMessage instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage universalQueueMessage = (UniversalQueueMessage) iQueueMessage;
        try {
            JdbcTemplate jdbcTemplate = jdbcTemplate(connection);
            Date date = new Date();
            universalQueueMessage.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
            return putToQueueStorage(jdbcTemplate, universalQueueMessage);
        } catch (PessimisticLockingFailureException e) {
            if (i > i2) {
                throw new QueueException((Throwable) e);
            }
            return _queueWithRetries(connection, universalQueueMessage, i + 1, i2);
        } catch (DuplicateKeyException e2) {
            this.LOGGER.warn(e2.getMessage(), e2);
            return true;
        } catch (Exception e3) {
            throw new QueueException(e3);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _requeueWithRetries(Connection connection, IQueueMessage iQueueMessage, int i, int i2) throws SQLException {
        if (!(iQueueMessage instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage universalQueueMessage = (UniversalQueueMessage) iQueueMessage;
        try {
            return jdbcTemplate(connection).update(this.SQL_REQUEUE, new Object[]{new Date(), universalQueueMessage.qId()}) > 0;
        } catch (Exception e) {
            throw new QueueException(e);
        } catch (DuplicateKeyException e2) {
            this.LOGGER.warn(e2.getMessage(), e2);
            return true;
        } catch (PessimisticLockingFailureException e3) {
            if (i > i2) {
                throw new QueueException((Throwable) e3);
            }
            return _requeueSilentWithRetries(connection, universalQueueMessage, i + 1, i2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _requeueSilentWithRetries(Connection connection, IQueueMessage iQueueMessage, int i, int i2) throws SQLException {
        if (!(iQueueMessage instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage universalQueueMessage = (UniversalQueueMessage) iQueueMessage;
        try {
            return jdbcTemplate(connection).update(this.SQL_REQUEUE_SILENT, new Object[]{universalQueueMessage.qId()}) > 0;
        } catch (DuplicateKeyException e) {
            this.LOGGER.warn(e.getMessage(), e);
            return true;
        } catch (PessimisticLockingFailureException e2) {
            if (i > i2) {
                throw new QueueException((Throwable) e2);
            }
            return _requeueSilentWithRetries(connection, universalQueueMessage, i + 1, i2);
        } catch (Exception e3) {
            throw new QueueException(e3);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected void _finishWithRetries(Connection connection, IQueueMessage iQueueMessage, int i, int i2) throws SQLException {
        if (!(iQueueMessage instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage universalQueueMessage = (UniversalQueueMessage) iQueueMessage;
        try {
            removeFromEphemeralStorage(jdbcTemplate(connection), universalQueueMessage);
        } catch (Exception e) {
            throw new QueueException(e);
        } catch (PessimisticLockingFailureException e2) {
            if (i > i2) {
                throw new QueueException((Throwable) e2);
            }
            _finishWithRetries(connection, universalQueueMessage, i + 1, i2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.github.ddth.queue.impl.JdbcQueue
    public UniversalQueueMessage _takeWithRetries(Connection connection, int i, int i2) throws SQLException {
        List queryForList;
        try {
            JdbcTemplate jdbcTemplate = jdbcTemplate(connection);
            UniversalQueueMessage universalQueueMessage = null;
            String generateId128Hex = QueueUtils.IDGEN.generateId128Hex();
            if (jdbcTemplate.update(this.SQL_UPDATE_EPHEMERAL_ID_TAKE, new Object[]{generateId128Hex}) > 0 && (queryForList = jdbcTemplate.queryForList(this.SQL_READ_BY_EPHEMERAL_ID, new Object[]{generateId128Hex})) != null && queryForList.size() > 0) {
                Map<String, Object> map = (Map) queryForList.get(0);
                universalQueueMessage = new UniversalQueueMessage();
                universalQueueMessage.fromMap(map);
            }
            return universalQueueMessage;
        } catch (PessimisticLockingFailureException e) {
            if (i > i2) {
                throw new QueueException((Throwable) e);
            }
            return _takeWithRetries(connection, i + 1, i2);
        } catch (Exception e2) {
            throw new QueueException(e2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue
    protected boolean _moveFromEphemeralToQueueStorageWithRetries(IQueueMessage iQueueMessage, Connection connection, int i, int i2) throws SQLException {
        if (!(iQueueMessage instanceof UniversalQueueMessage)) {
            throw new IllegalArgumentException("This method requires an argument of type [" + UniversalQueueMessage.class.getName() + "]!");
        }
        UniversalQueueMessage universalQueueMessage = (UniversalQueueMessage) iQueueMessage;
        try {
            return jdbcTemplate(connection).update(this.SQL_CLEAR_EPHEMERAL_ID, new Object[]{universalQueueMessage.qId()}) > 0;
        } catch (PessimisticLockingFailureException e) {
            if (i > i2) {
                throw new QueueException((Throwable) e);
            }
            return _moveFromEphemeralToQueueStorageWithRetries(universalQueueMessage, connection, i + 1, i2);
        } catch (Exception e2) {
            throw new QueueException(e2);
        }
    }

    @Override // com.github.ddth.queue.impl.JdbcQueue, com.github.ddth.queue.IQueue
    public UniversalQueueMessage take() {
        return (UniversalQueueMessage) super.take();
    }
}
