001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.bam.rules;
018
019 import java.util.ArrayList;
020 import java.util.Date;
021
022 import org.apache.camel.Exchange;
023 import org.apache.camel.Processor;
024 import org.apache.camel.Route;
025 import org.apache.camel.bam.TimeExpression;
026 import org.apache.camel.bam.model.ActivityState;
027 import org.apache.camel.bam.model.ProcessInstance;
028 import org.apache.camel.impl.DefaultExchange;
029 import org.apache.camel.impl.RouteContext;
030 import org.apache.camel.impl.ServiceSupport;
031 import org.apache.camel.model.OutputType;
032 import org.apache.camel.model.RouteType;
033 import org.apache.camel.util.Time;
034 import org.apache.commons.logging.Log;
035 import org.apache.commons.logging.LogFactory;
036
037 import static org.apache.camel.util.ServiceHelper.startServices;
038 import static org.apache.camel.util.ServiceHelper.stopServices;
039
040 /**
041 * A temporal rule for use within BAM
042 *
043 * @version $Revision: $
044 */
045 public class TemporalRule extends ServiceSupport {
046 private static final transient Log LOG = LogFactory.getLog(TemporalRule.class);
047 private TimeExpression first;
048 private TimeExpression second;
049 private long expectedMillis;
050 private long overdueMillis;
051 private Processor overdueAction;
052 private OutputType overdueProcessors = new OutputType();
053
054 public TemporalRule(TimeExpression first, TimeExpression second) {
055 this.first = first;
056 this.second = second;
057 }
058
059 public TemporalRule expectWithin(Time builder) {
060 return expectWithin(builder.toMillis());
061 }
062
063 public TemporalRule expectWithin(long millis) {
064 expectedMillis = millis;
065 return this;
066 }
067
068 public OutputType errorIfOver(Time builder) {
069 return errorIfOver(builder.toMillis());
070 }
071
072 public OutputType errorIfOver(long millis) {
073 overdueMillis = millis;
074 if (overdueProcessors == null) {
075 overdueProcessors = new OutputType();
076 }
077 return overdueProcessors;
078 }
079
080 public TimeExpression getFirst() {
081 return first;
082 }
083
084 public TimeExpression getSecond() {
085 return second;
086 }
087
088 public Processor getOverdueAction() throws Exception {
089 if (overdueAction == null && overdueProcessors != null) {
090
091 // TOOD refactor to avoid this messyness...
092 ArrayList<Route> list = new ArrayList<Route>();
093 RouteType route = new RouteType();
094 route.setCamelContext(first.getBuilder().getProcessBuilder().getContext());
095 RouteContext routeContext = new RouteContext(route, null, list);
096
097 overdueAction = overdueProcessors.createOutputsProcessor(routeContext);
098 }
099 return overdueAction;
100 }
101
102 public void processExchange(Exchange exchange, ProcessInstance instance) {
103 Date firstTime = first.evaluate(instance);
104 if (firstTime == null) {
105 // ignore as first event has not accurred yet
106 return;
107 }
108
109 // TODO now we might need to set the second activity state
110 // to 'grey' to indicate it now could happen?
111
112 // lets force the lazy creation of the second state
113 ActivityState secondState = second.getOrCreateActivityState(instance);
114 if (expectedMillis > 0L) {
115 Date expected = secondState.getTimeExpected();
116 if (expected == null) {
117 expected = add(firstTime, expectedMillis);
118 secondState.setTimeExpected(expected);
119 }
120 }
121 if (overdueMillis > 0L) {
122 Date overdue = secondState.getTimeOverdue();
123 if (overdue == null) {
124 overdue = add(firstTime, overdueMillis);
125 secondState.setTimeOverdue(overdue);
126 }
127 }
128 }
129
130 public void processExpired(ActivityState activityState) throws Exception {
131 Processor processor = getOverdueAction();
132 if (processor != null) {
133 Date now = new Date();
134 /*
135 TODO this doesn't work and returns null for some strange reason
136 ProcessInstance instance = activityState.getProcessInstance();
137 ActivityState secondState = second.getActivityState(instance);
138 if (secondState == null) {
139 log.error("Could not find the second state! Process is: "
140 + instance + " with first state: " + first.getActivityState(instance)
141 + " and the state I was called with was: " + activityState);
142 }
143 */
144
145 ActivityState secondState = activityState;
146 Date overdue = secondState.getTimeOverdue();
147 if (now.compareTo(overdue) >= 0) {
148 Exchange exchange = createExchange();
149 exchange.getIn().setBody(activityState);
150 processor.process(exchange);
151 } else {
152 LOG.warn("Process has not actually expired; the time is: " + now + " but the overdue time is: " + overdue);
153 }
154 }
155 }
156
157 protected Exchange createExchange() {
158 return new DefaultExchange(second.getBuilder().getProcessBuilder().getContext());
159 }
160
161 /**
162 * Returns the date in the future adding the given number of millis
163 *
164 * @param date
165 * @param millis
166 * @return the date in the future
167 */
168 protected Date add(Date date, long millis) {
169 return new Date(date.getTime() + millis);
170 }
171
172 protected void doStart() throws Exception {
173 startServices(getOverdueAction());
174 }
175
176 protected void doStop() throws Exception {
177 stopServices(getOverdueAction());
178 }
179 }