001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.jpa;
018
019 import java.lang.reflect.Method;
020 import java.util.List;
021
022 import javax.persistence.EntityManager;
023 import javax.persistence.LockModeType;
024 import javax.persistence.PersistenceException;
025 import javax.persistence.Query;
026
027 import org.apache.camel.Exchange;
028 import org.apache.camel.Processor;
029 import org.apache.camel.impl.ScheduledPollConsumer;
030 import org.apache.camel.util.ObjectHelper;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 import org.springframework.orm.jpa.JpaCallback;
035
036 /**
037 * @version $Revision: 563665 $
038 */
039 public class JpaConsumer extends ScheduledPollConsumer<Exchange> {
040 private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class);
041 private final JpaEndpoint endpoint;
042 private final TransactionStrategy template;
043 private QueryFactory queryFactory;
044 private DeleteHandler<Object> deleteHandler;
045 private String query;
046 private String namedQuery;
047 private String nativeQuery;
048
049 public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
050 super(endpoint, processor);
051 this.endpoint = endpoint;
052 this.template = endpoint.createTransactionStrategy();
053 }
054
055 protected void poll() throws Exception {
056 template.execute(new JpaCallback() {
057 public Object doInJpa(EntityManager entityManager) throws PersistenceException {
058 Query query = getQueryFactory().createQuery(entityManager);
059 configureParameters(query);
060 List results = query.getResultList();
061 for (Object result : results) {
062 if (LOG.isDebugEnabled()) {
063 LOG.debug("Processing new entity: " + result);
064 }
065
066 if (lockEntity(result, entityManager)) {
067 // lets turn the result into an exchange and fire it
068 // into the processor
069 Exchange exchange = createExchange(result);
070 try {
071 getProcessor().process(exchange);
072 } catch (Exception e) {
073 throw new PersistenceException(e);
074 }
075 getDeleteHandler().deleteObject(entityManager, result);
076 }
077 }
078 entityManager.flush();
079 return null;
080 }
081 });
082 }
083
084 // Properties
085 // -------------------------------------------------------------------------
086 public JpaEndpoint getEndpoint() {
087 return endpoint;
088 }
089
090 public QueryFactory getQueryFactory() {
091 if (queryFactory == null) {
092 queryFactory = createQueryFactory();
093 if (queryFactory == null) {
094 throw new IllegalArgumentException("No queryType property configured on this consumer, nor an entityType configured on the endpoint so cannot consume");
095 }
096 }
097 return queryFactory;
098 }
099
100 public void setQueryFactory(QueryFactory queryFactory) {
101 this.queryFactory = queryFactory;
102 }
103
104 public DeleteHandler getDeleteHandler() {
105 if (deleteHandler == null) {
106 deleteHandler = createDeleteHandler();
107 }
108 return deleteHandler;
109 }
110
111 public void setDeleteHandler(DeleteHandler deleteHandler) {
112 this.deleteHandler = deleteHandler;
113 }
114
115 public String getNamedQuery() {
116 return namedQuery;
117 }
118
119 public void setNamedQuery(String namedQuery) {
120 this.namedQuery = namedQuery;
121 }
122
123 public String getNativeQuery() {
124 return nativeQuery;
125 }
126
127 public void setNativeQuery(String nativeQuery) {
128 this.nativeQuery = nativeQuery;
129 }
130
131 public String getQuery() {
132 return query;
133 }
134
135 public void setQuery(String query) {
136 this.query = query;
137 }
138
139 // Implementation methods
140 // -------------------------------------------------------------------------
141
142 /**
143 * A strategy method to lock an object with an exclusive lock so that it can
144 * be processed
145 *
146 * @param entity the entity to be locked
147 * @param entityManager
148 * @return true if the entity was locked
149 */
150 protected boolean lockEntity(Object entity, EntityManager entityManager) {
151 if (!getEndpoint().isConsumeDelete() || !getEndpoint().isConsumeLockEntity()) {
152 return true;
153 }
154 try {
155 if (LOG.isDebugEnabled()) {
156 LOG.debug("Acquiring exclusive lock on entity: " + entity);
157 }
158 entityManager.lock(entity, LockModeType.WRITE);
159 return true;
160 } catch (Exception e) {
161 if (LOG.isDebugEnabled()) {
162 LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e);
163 }
164 return false;
165 }
166 }
167
168 protected QueryFactory createQueryFactory() {
169 if (query != null) {
170 return QueryBuilder.query(query);
171 } else if (namedQuery != null) {
172 return QueryBuilder.namedQuery(namedQuery);
173 } else if (nativeQuery != null) {
174 return QueryBuilder.nativeQuery(nativeQuery);
175 } else {
176 Class<?> entityType = endpoint.getEntityType();
177 if (entityType == null) {
178 return null;
179 } else {
180 return QueryBuilder.query("select x from " + entityType.getName() + " x");
181 }
182 }
183 }
184
185 protected DeleteHandler<Object> createDeleteHandler() {
186 // TODO auto-discover an annotation in the entity bean to indicate the
187 // process completed method call?
188 Class<?> entityType = getEndpoint().getEntityType();
189 if (entityType != null) {
190 List<Method> methods = ObjectHelper.findMethodsWithAnnotation(entityType, Consumed.class);
191 if (methods.size() > 1) {
192 throw new IllegalArgumentException("Only one method can be annotated with the @Consumed annotation but found: " + methods);
193 } else if (methods.size() == 1) {
194 final Method method = methods.get(0);
195
196 return new DeleteHandler<Object>() {
197 public void deleteObject(EntityManager entityManager, Object entityBean) {
198 ObjectHelper.invokeMethod(method, entityBean);
199 }
200 };
201 }
202 }
203 if (getEndpoint().isConsumeDelete()) {
204 return new DeleteHandler<Object>() {
205 public void deleteObject(EntityManager entityManager, Object entityBean) {
206 entityManager.remove(entityBean);
207 }
208 };
209 } else {
210 return new DeleteHandler<Object>() {
211 public void deleteObject(EntityManager entityManager, Object entityBean) {
212 // do nothing
213 }
214 };
215 }
216 }
217
218 protected void configureParameters(Query query) {
219 int maxResults = endpoint.getMaximumResults();
220 if (maxResults > 0) {
221 query.setMaxResults(maxResults);
222 }
223 }
224
225 protected Exchange createExchange(Object result) {
226 Exchange exchange = endpoint.createExchange();
227 exchange.getIn().setBody(result);
228 return exchange;
229 }
230 }