001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.component.cxf.transport;
019
020 import org.apache.camel.CamelContext;
021 import org.apache.camel.Endpoint;
022 import org.apache.camel.Exchange;
023 import org.apache.camel.Processor;
024 import org.apache.camel.Producer;
025 import org.apache.cxf.Bus;
026 import org.apache.cxf.common.logging.LogUtils;
027 import org.apache.cxf.configuration.Configurable;
028 import org.apache.cxf.io.AbstractCachedOutputStream;
029 import org.apache.cxf.message.Message;
030 import org.apache.cxf.message.MessageImpl;
031 import org.apache.cxf.service.model.EndpointInfo;
032 import org.apache.cxf.transport.AbstractConduit;
033 import org.apache.cxf.transport.AbstractDestination;
034 import org.apache.cxf.transport.Conduit;
035 import org.apache.cxf.transport.ConduitInitiator;
036 import org.apache.cxf.transport.MessageObserver;
037 import org.apache.cxf.ws.addressing.EndpointReferenceType;
038 import org.apache.cxf.wsdl.EndpointReferenceUtils;
039
040 import java.io.ByteArrayInputStream;
041 import java.io.IOException;
042 import java.io.InputStream;
043 import java.io.OutputStream;
044 import java.util.logging.Level;
045 import java.util.logging.Logger;
046
047 /**
048 * @version $Revision: 535478 $
049 */
050 public class CamelDestination extends AbstractDestination implements Configurable {
051 protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base";
052 private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class);
053 CamelContext camelContext;
054 String camelUri;
055 final ConduitInitiator conduitInitiator;
056 private CamelTransportBase base;
057 private Endpoint endpoint;
058
059 public CamelDestination(CamelContext camelContext, Bus bus,
060 ConduitInitiator ci,
061 EndpointInfo info) throws IOException {
062 super(getTargetReference(info, bus), info);
063 this.camelContext = camelContext;
064
065 base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
066
067 conduitInitiator = ci;
068
069 initConfig();
070 }
071
072 protected Logger getLogger() {
073 return LOG;
074 }
075
076 /**
077 * @param inMessage the incoming message
078 * @return the inbuilt backchannel
079 */
080 protected Conduit getInbuiltBackChannel(Message inMessage) {
081 return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
082 inMessage);
083 }
084
085 public void activate() {
086 getLogger().log(Level.INFO, "CamelDestination activate().... ");
087
088 try {
089 getLogger().log(Level.FINE, "establishing Camel connection");
090 endpoint = camelContext.getEndpoint(camelUri);
091 }
092 catch (Exception ex) {
093 getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex);
094 }
095 }
096
097 public void deactivate() {
098 base.close();
099 }
100
101 public void shutdown() {
102 getLogger().log(Level.FINE, "CamelDestination shutdown()");
103 this.deactivate();
104 }
105
106 protected void incoming(Exchange exchange) {
107 getLogger().log(Level.FINE, "server received request: ", exchange);
108
109 byte[] bytes = base.unmarshal(exchange);
110
111 // get the message to be interceptor
112 MessageImpl inMessage = new MessageImpl();
113 inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
114 base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
115 //inMessage.put(CamelConstants.CAMEL_SERVER_RESPONSE_HEADERS, new CamelMessageHeadersType());
116 inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
117
118 inMessage.setDestination(this);
119
120 //handle the incoming message
121 incomingObserver.onMessage(inMessage);
122 }
123
124 public String getBeanName() {
125 return endpointInfo.getName().toString() + ".camel-destination";
126 }
127
128 private void initConfig() {
129 /*
130 this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
131 ServerBehaviorPolicyType.class);
132 this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
133 this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
134 this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
135 */
136 }
137
138 protected class ConsumerProcessor implements Processor {
139 public void process(Exchange exchange) {
140 try {
141 incoming(exchange);
142 }
143 catch (Throwable ex) {
144 getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
145 }
146 }
147 }
148
149 // this should deal with the cxf message
150 protected class BackChannelConduit extends AbstractConduit {
151 protected Message inMessage;
152
153 BackChannelConduit(EndpointReferenceType ref, Message message) {
154 super(ref);
155 inMessage = message;
156 }
157
158 /**
159 * Register a message observer for incoming messages.
160 *
161 * @param observer the observer to notify on receipt of incoming
162 */
163 public void setMessageObserver(MessageObserver observer) {
164 // shouldn't be called for a back channel conduit
165 }
166
167 /**
168 * Send an outbound message, assumed to contain all the name-value
169 * mappings of the corresponding input message (if any).
170 *
171 * @param message the message to be sent.
172 */
173 public void prepare(Message message) throws IOException {
174 // setup the message to be send back
175 message.put(CamelConstants.CAMEL_REQUEST_MESSAGE,
176 inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
177 message.setContent(OutputStream.class,
178 new CamelOutputStream(inMessage));
179 }
180
181 protected Logger getLogger() {
182 return LOG;
183 }
184
185 }
186
187 private class CamelOutputStream extends AbstractCachedOutputStream {
188 private Message inMessage;
189 private Producer<Exchange> replyTo;
190 private Producer<Exchange> sender;
191
192 // setup the ByteArrayStream
193 public CamelOutputStream(Message m) {
194 super();
195 inMessage = m;
196 }
197
198 // prepair the message and get the send out message
199 private void commitOutputMessage() throws IOException {
200
201 //setup the reply message
202 final String replyToUri = getReplyToDestination(inMessage);
203
204 base.template.send(replyToUri, new Processor() {
205 public void process(Exchange reply) {
206 base.marshal(currentStream.toString(), replyToUri, reply);
207
208 setReplyCorrelationID(inMessage, reply);
209
210 base.setMessageProperties(inMessage, reply);
211
212 getLogger().log(Level.FINE, "just server sending reply: ", reply);
213 }
214 });
215 }
216
217 @Override
218 protected void doFlush() throws IOException {
219 // Do nothing here
220 }
221
222 @Override
223 protected void doClose() throws IOException {
224 commitOutputMessage();
225 }
226
227 @Override
228 protected void onWrite() throws IOException {
229 // Do nothing here
230 }
231 }
232
233 protected String getReplyToDestination(Message inMessage) {
234 if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
235 return (String) inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
236 }
237 else {
238 return base.getReplyDestination();
239 }
240 }
241
242 protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
243 Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
244 if (value != null) {
245 reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value);
246 }
247 }
248 }