001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.bam;
019
020 import org.apache.camel.bam.model.ActivityState;
021 import org.apache.camel.impl.ServiceSupport;
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024 import org.springframework.orm.jpa.JpaCallback;
025 import org.springframework.orm.jpa.JpaTemplate;
026 import org.springframework.transaction.support.TransactionTemplate;
027 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
028 import org.springframework.transaction.TransactionStatus;
029
030 import javax.persistence.EntityManager;
031 import javax.persistence.LockModeType;
032 import javax.persistence.PersistenceException;
033 import java.util.Date;
034 import java.util.List;
035
036 /**
037 * @version $Revision: $
038 */
039 public class ActivityMonitorEngine extends ServiceSupport implements Runnable {
040 private static final Log log = LogFactory.getLog(ActivityMonitorEngine.class);
041
042 private JpaTemplate template;
043 private TransactionTemplate transactionTemplate;
044 private ProcessRules rules;
045 private int escalateLevel = 0;
046 private long windowMillis = 1000L;
047 private Thread thread;
048
049 public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) {
050 this.template = template;
051 this.transactionTemplate = transactionTemplate;
052 this.rules = rules;
053 }
054
055 public void run() {
056 log.info("Starting to poll for timeout events");
057
058 while (!isStopped()) {
059 try {
060 long now = System.currentTimeMillis();
061 long nextPoll = now + windowMillis;
062 final Date timeNow = new Date(now);
063
064 transactionTemplate.execute(new TransactionCallbackWithoutResult() {
065 protected void doInTransactionWithoutResult(TransactionStatus status) {
066 List<ActivityState> list = template.find("select x from " + ActivityState.class.getName() + " x where x.escalationLevel = ?1 and x.timeOverdue < ?2", escalateLevel, timeNow);
067 for (ActivityState activityState : list) {
068 fireExpiredEvent(activityState);
069 }
070 }
071 });
072
073 long timeToSleep = nextPoll - System.currentTimeMillis();
074 if (timeToSleep > 0) {
075 log.debug("Sleeping for " + timeToSleep + " millis");
076 try {
077 Thread.sleep(timeToSleep);
078 }
079 catch (InterruptedException e) {
080 log.debug("Caught: " + e, e);
081 }
082 }
083 }
084 catch (Exception e) {
085 log.error("Caught: " + e, e);
086 }
087 }
088 }
089
090 protected void fireExpiredEvent(final ActivityState activityState) {
091 log.info("Trying to fire expiration of: " + activityState);
092
093 template.execute(new JpaCallback() {
094 public Object doInJpa(EntityManager entityManager) throws PersistenceException {
095 // lets try lock the object first
096 entityManager.lock(activityState, LockModeType.WRITE);
097 if (activityState.getEscalationLevel() == escalateLevel) {
098 try {
099 rules.processExpired(activityState);
100 }
101 catch (Exception e) {
102 log.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e);
103 }
104 activityState.setEscalationLevel(escalateLevel + 1);
105 }
106 return null;
107 }
108 });
109 }
110
111 protected void doStart() throws Exception {
112 thread = new Thread(this, "ActivityMonitorEngine");
113 thread.start();
114 }
115
116 protected void doStop() throws Exception {
117 if (thread != null) {
118 thread = null;
119 }
120 }
121 }