001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.cxf.transport;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.io.OutputStream;
023 import java.util.logging.Level;
024 import java.util.logging.Logger;
025
026 import org.apache.camel.CamelContext;
027 import org.apache.camel.Endpoint;
028 import org.apache.camel.Exchange;
029 import org.apache.camel.Processor;
030 import org.apache.camel.Producer;
031 import org.apache.cxf.Bus;
032 import org.apache.cxf.common.logging.LogUtils;
033 import org.apache.cxf.configuration.Configurable;
034 import org.apache.cxf.io.CachedOutputStream;
035 import org.apache.cxf.message.Message;
036 import org.apache.cxf.message.MessageImpl;
037 import org.apache.cxf.service.model.EndpointInfo;
038 import org.apache.cxf.transport.AbstractConduit;
039 import org.apache.cxf.transport.AbstractDestination;
040 import org.apache.cxf.transport.Conduit;
041 import org.apache.cxf.transport.ConduitInitiator;
042 import org.apache.cxf.transport.MessageObserver;
043 import org.apache.cxf.ws.addressing.EndpointReferenceType;
044 import org.apache.cxf.wsdl.EndpointReferenceUtils;
045
046 /**
047 * @version $Revision: 563665 $
048 */
049 public class CamelDestination extends AbstractDestination implements Configurable {
050 protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base";
051 private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class);
052 CamelContext camelContext;
053 String camelUri;
054 final ConduitInitiator conduitInitiator;
055 private CamelTransportBase base;
056 private Endpoint endpoint;
057
058 public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info) throws IOException {
059 super(getTargetReference(info, bus), info);
060 this.camelContext = camelContext;
061
062 base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
063
064 conduitInitiator = ci;
065
066 initConfig();
067 }
068
069 protected Logger getLogger() {
070 return LOG;
071 }
072
073 /**
074 * @param inMessage the incoming message
075 * @return the inbuilt backchannel
076 */
077 protected Conduit getInbuiltBackChannel(Message inMessage) {
078 return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage);
079 }
080
081 public void activate() {
082 getLogger().log(Level.INFO, "CamelDestination activate().... ");
083
084 try {
085 getLogger().log(Level.FINE, "establishing Camel connection");
086 endpoint = camelContext.getEndpoint(camelUri);
087 } catch (Exception ex) {
088 getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex);
089 }
090 }
091
092 public void deactivate() {
093 base.close();
094 }
095
096 public void shutdown() {
097 getLogger().log(Level.FINE, "CamelDestination shutdown()");
098 this.deactivate();
099 }
100
101 protected void incoming(Exchange exchange) {
102 getLogger().log(Level.FINE, "server received request: ", exchange);
103
104 byte[] bytes = base.unmarshal(exchange);
105
106 // get the message to be interceptor
107 MessageImpl inMessage = new MessageImpl();
108 inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
109 base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
110 // inMessage.put(CamelConstants.CAMEL_SERVER_RESPONSE_HEADERS, new
111 // CamelMessageHeadersType());
112 inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
113
114 inMessage.setDestination(this);
115
116 // handle the incoming message
117 incomingObserver.onMessage(inMessage);
118 }
119
120 public String getBeanName() {
121 return endpointInfo.getName().toString() + ".camel-destination";
122 }
123
124 private void initConfig() {
125 /*
126 * this.runtimePolicy = endpointInfo.getTraversedExtensor(new
127 * ServerBehaviorPolicyType(), ServerBehaviorPolicyType.class);
128 * this.serverConfig = endpointInfo.getTraversedExtensor(new
129 * ServerConfig(), ServerConfig.class); this.address =
130 * endpointInfo.getTraversedExtensor(new AddressType(),
131 * AddressType.class); this.sessionPool =
132 * endpointInfo.getTraversedExtensor(new SessionPoolType(),
133 * SessionPoolType.class);
134 */
135 }
136
137 protected class ConsumerProcessor implements Processor {
138 public void process(Exchange exchange) {
139 try {
140 incoming(exchange);
141 } catch (Throwable ex) {
142 getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
143 }
144 }
145 }
146
147 // this should deal with the cxf message
148 protected class BackChannelConduit extends AbstractConduit {
149 protected Message inMessage;
150
151 BackChannelConduit(EndpointReferenceType ref, Message message) {
152 super(ref);
153 inMessage = message;
154 }
155
156 /**
157 * Register a message observer for incoming messages.
158 *
159 * @param observer the observer to notify on receipt of incoming
160 */
161 public void setMessageObserver(MessageObserver observer) {
162 // shouldn't be called for a back channel conduit
163 }
164
165 /**
166 * Send an outbound message, assumed to contain all the name-value
167 * mappings of the corresponding input message (if any).
168 *
169 * @param message the message to be sent.
170 */
171 public void prepare(Message message) throws IOException {
172 // setup the message to be send back
173 message.put(CamelConstants.CAMEL_REQUEST_MESSAGE, inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
174 message.setContent(OutputStream.class, new CamelOutputStream(inMessage));
175 }
176
177 protected Logger getLogger() {
178 return LOG;
179 }
180
181 }
182
183 private class CamelOutputStream extends CachedOutputStream {
184 private Message inMessage;
185 private Producer<Exchange> replyTo;
186 private Producer<Exchange> sender;
187
188 // setup the ByteArrayStream
189 public CamelOutputStream(Message m) {
190 super();
191 inMessage = m;
192 }
193
194 // prepair the message and get the send out message
195 private void commitOutputMessage() throws IOException {
196
197 // setup the reply message
198 final String replyToUri = getReplyToDestination(inMessage);
199
200 base.template.send(replyToUri, new Processor() {
201 public void process(Exchange reply) {
202 base.marshal(currentStream.toString(), replyToUri, reply);
203
204 setReplyCorrelationID(inMessage, reply);
205
206 base.setMessageProperties(inMessage, reply);
207
208 getLogger().log(Level.FINE, "just server sending reply: ", reply);
209 }
210 });
211 }
212
213 @Override
214 protected void doFlush() throws IOException {
215 // Do nothing here
216 }
217
218 @Override
219 protected void doClose() throws IOException {
220 commitOutputMessage();
221 }
222
223 @Override
224 protected void onWrite() throws IOException {
225 // Do nothing here
226 }
227 }
228
229 protected String getReplyToDestination(Message inMessage) {
230 if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
231 return (String)inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
232 } else {
233 return base.getReplyDestination();
234 }
235 }
236
237 protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
238 Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
239 if (value != null) {
240 reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value);
241 }
242 }
243 }