001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import org.apache.camel.CamelContext;
020 import org.apache.camel.Component;
021 import org.apache.camel.Endpoint;
022 import org.apache.camel.Exchange;
023 import org.apache.camel.PollingConsumer;
024 import org.apache.camel.util.ObjectHelper;
025
026 import java.lang.reflect.ParameterizedType;
027 import java.lang.reflect.Type;
028 import java.util.concurrent.ScheduledExecutorService;
029 import java.util.concurrent.ScheduledThreadPoolExecutor;
030
031 /**
032 * A default endpoint useful for implementation inheritance
033 *
034 * @version $Revision: 541335 $
035 */
036 public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> {
037 private String endpointUri;
038 private final Component component;
039 private CamelContext context;
040 private ScheduledExecutorService executorService;
041
042 protected DefaultEndpoint(String endpointUri, Component component) {
043 this.endpointUri = endpointUri;
044 this.component = component;
045 this.context = component.getCamelContext();
046 }
047
048 public int hashCode() {
049 return endpointUri.hashCode() * 37 + 1;
050 }
051
052 @Override
053 public boolean equals(Object object) {
054 if (object instanceof DefaultEndpoint) {
055 DefaultEndpoint that = (DefaultEndpoint) object;
056 return ObjectHelper.equals(this.endpointUri, that.endpointUri);
057 }
058 return false;
059 }
060
061 @Override
062 public String toString() {
063 return "Endpoint[" + endpointUri + "]";
064 }
065
066 public String getEndpointUri() {
067 return endpointUri;
068 }
069
070 public CamelContext getContext() {
071 return context;
072 }
073
074 public Component getComponent() {
075 return component;
076 }
077
078 /**
079 * @return the executor
080 */
081 public synchronized ScheduledExecutorService getExecutorService() {
082 if (executorService == null) {
083 Component c = getComponent();
084 if (c != null && c instanceof DefaultComponent) {
085 DefaultComponent dc = (DefaultComponent) c;
086 executorService = dc.getExecutorService();
087 }
088 if (executorService == null) {
089 executorService = createExecutorService();
090 }
091 }
092 return executorService;
093 }
094
095 /**
096 * @param executorService the executor to set
097 */
098 public synchronized void setExecutorService(ScheduledExecutorService executorService) {
099 this.executorService = executorService;
100 }
101
102 public PollingConsumer<E> createPollingConsumer() throws Exception {
103 return new DefaultPollingConsumer<E>(this);
104 }
105
106 /**
107 * Converts the given exchange to the specified exchange type
108 */
109 public E convertTo(Class<E> type, Exchange exchange) {
110 // TODO we could infer type parameter
111 if (type.isInstance(exchange)) {
112 return type.cast(exchange);
113 }
114 return getContext().getExchangeConverter().convertTo(type, exchange);
115 }
116
117 public E createExchange(Exchange exchange) {
118 Class<E> exchangeType = getExchangeType();
119 if (exchangeType != null) {
120 if (exchangeType.isInstance(exchange)) {
121 return exchangeType.cast(exchange);
122 }
123 }
124 E answer = createExchange();
125 answer.copyFrom(exchange);
126 return answer;
127 }
128
129 public E toExchangeType(Exchange exchange) {
130 // TODO avoid cloning exchanges if E == Exchange!
131 return createExchange(exchange);
132 }
133
134 /**
135 * Returns the type of the exchange which is generated by this component
136 */
137 public Class<E> getExchangeType() {
138 Type type = getClass().getGenericSuperclass();
139 if (type instanceof ParameterizedType) {
140 ParameterizedType parameterizedType = (ParameterizedType) type;
141 Type[] arguments = parameterizedType.getActualTypeArguments();
142 if (arguments.length > 0) {
143 Type argumentType = arguments[0];
144 if (argumentType instanceof Class) {
145 return (Class<E>) argumentType;
146 }
147 }
148 }
149 return null;
150 }
151
152 protected ScheduledThreadPoolExecutor createExecutorService() {
153 return new ScheduledThreadPoolExecutor(10);
154 }
155 }