001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.bam.processor;
019
020 import org.apache.camel.bam.model.ActivityState;
021 import org.apache.camel.bam.rules.ProcessRules;
022 import org.apache.camel.impl.ServiceSupport;
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025 import org.springframework.orm.jpa.JpaCallback;
026 import org.springframework.orm.jpa.JpaTemplate;
027 import org.springframework.transaction.TransactionStatus;
028 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
029 import org.springframework.transaction.support.TransactionTemplate;
030
031 import javax.persistence.EntityManager;
032 import javax.persistence.LockModeType;
033 import javax.persistence.PersistenceException;
034 import java.util.Date;
035 import java.util.List;
036
037 /**
038 * A timer engine to monitor for expired activities and perform whatever actions are required.
039 *
040 * @version $Revision: $
041 */
042 public class ActivityMonitorEngine extends ServiceSupport implements Runnable {
043 private static final Log log = LogFactory.getLog(ActivityMonitorEngine.class);
044 private JpaTemplate template;
045 private TransactionTemplate transactionTemplate;
046 private ProcessRules rules;
047 private int escalateLevel = 0;
048 private long windowMillis = 1000L;
049 private Thread thread;
050 private boolean useLocking = false;
051
052 public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) {
053 this.template = template;
054 this.transactionTemplate = transactionTemplate;
055 this.rules = rules;
056 }
057
058 public boolean isUseLocking() {
059 return useLocking;
060 }
061
062 public void setUseLocking(boolean useLocking) {
063 this.useLocking = useLocking;
064 }
065
066 public void run() {
067 log.debug("Starting to poll for timeout events");
068
069 while (!isStopped()) {
070 try {
071 long now = System.currentTimeMillis();
072 long nextPoll = now + windowMillis;
073 final Date timeNow = new Date(now);
074
075 transactionTemplate.execute(new TransactionCallbackWithoutResult() {
076 protected void doInTransactionWithoutResult(TransactionStatus status) {
077 List<ActivityState> list = template.find("select x from " + ActivityState.class.getName() + " x where x.escalationLevel = ?1 and x.timeOverdue < ?2", escalateLevel, timeNow);
078 for (ActivityState activityState : list) {
079 fireExpiredEvent(activityState);
080 }
081 }
082 });
083
084 long timeToSleep = nextPoll - System.currentTimeMillis();
085 if (timeToSleep > 0) {
086 if (log.isDebugEnabled()) {
087 log.debug("Sleeping for " + timeToSleep + " millis");
088 }
089 try {
090 Thread.sleep(timeToSleep);
091 }
092 catch (InterruptedException e) {
093 log.debug("Caught: " + e, e);
094 }
095 }
096 }
097 catch (Exception e) {
098 log.error("Caught: " + e, e);
099 }
100 }
101 }
102
103 protected void fireExpiredEvent(final ActivityState activityState) {
104 if (log.isDebugEnabled()) {
105 log.debug("Trying to fire expiration of: " + activityState);
106 }
107
108 template.execute(new JpaCallback() {
109 public Object doInJpa(EntityManager entityManager) throws PersistenceException {
110 // lets try lock the object first
111 if (isUseLocking()) {
112 log.info("Attempting to lock: " + activityState);
113 entityManager.lock(activityState, LockModeType.WRITE);
114 log.info("Grabbed lock: " + activityState);
115 }
116
117 try {
118 rules.processExpired(activityState);
119 }
120 catch (Exception e) {
121 log.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e);
122 }
123 activityState.setEscalationLevel(escalateLevel + 1);
124 return null;
125 }
126 });
127 }
128
129 protected void doStart() throws Exception {
130 rules.start();
131 thread = new Thread(this, "ActivityMonitorEngine");
132 thread.start();
133 }
134
135 protected void doStop() throws Exception {
136 if (thread != null) {
137 thread = null;
138 }
139 rules.stop();
140 }
141 }