001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.component.cxf;
019
020 import org.apache.camel.RuntimeCamelException;
021 import org.apache.camel.Exchange;
022 import org.apache.camel.impl.DefaultProducer;
023 import org.apache.cxf.message.ExchangeImpl;
024 import org.apache.cxf.message.Message;
025 import org.apache.cxf.message.MessageImpl;
026 import org.apache.cxf.service.model.EndpointInfo;
027 import org.apache.cxf.transport.Conduit;
028 import org.apache.cxf.transport.Destination;
029 import org.apache.cxf.transport.MessageObserver;
030 import org.apache.cxf.transport.local.LocalConduit;
031 import org.apache.cxf.transport.local.LocalTransportFactory;
032
033 import java.io.IOException;
034 import java.util.concurrent.CountDownLatch;
035
036 /**
037 * Sends messages from Camel into the CXF endpoint
038 *
039 * @version $Revision: 534145 $
040 */
041 public class CxfProducer extends DefaultProducer {
042 private CxfEndpoint endpoint;
043 private final LocalTransportFactory transportFactory;
044 private Destination destination;
045 private Conduit conduit;
046 private ResultFuture future = new ResultFuture();
047
048 public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) {
049 super(endpoint);
050 this.endpoint = endpoint;
051 this.transportFactory = transportFactory;
052 }
053
054 public void process(Exchange exchange) {
055 CxfExchange cxfExchange = endpoint.toExchangeType(exchange);
056 process(cxfExchange);
057 }
058
059 public void process(CxfExchange exchange) {
060 try {
061 CxfBinding binding = endpoint.getBinding();
062 MessageImpl m = binding.createCxfMessage(exchange);
063 ExchangeImpl e = new ExchangeImpl();
064 e.setInMessage(m);
065 m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
066 m.setDestination(destination);
067 synchronized (conduit) {
068 conduit.prepare(m);
069
070 // now lets wait for the response
071 if (endpoint.isInOut()) {
072 Message response = future.getResponse();
073
074 // TODO - why do we need to ignore the returned message and get the out message from the exchange!
075 response = e.getOutMessage();
076 binding.storeCxfResponse(exchange, response);
077 }
078 }
079 }
080 catch (IOException e) {
081 throw new RuntimeCamelException(e);
082 }
083 }
084
085 @Override
086 protected void doStart() throws Exception {
087 super.doStart();
088 EndpointInfo endpointInfo = endpoint.getEndpointInfo();
089 destination = transportFactory.getDestination(endpointInfo);
090
091 // Set up a listener for the response
092 conduit = transportFactory.getConduit(endpointInfo);
093 conduit.setMessageObserver(future);
094 }
095
096 @Override
097 protected void doStop() throws Exception {
098 super.doStop();
099
100 if (conduit != null) {
101 conduit.close();
102 }
103 }
104
105 protected class ResultFuture implements MessageObserver {
106 Message response;
107 CountDownLatch latch = new CountDownLatch(1);
108
109 public Message getResponse() {
110 while (response == null) {
111 try {
112 latch.await();
113 }
114 catch (InterruptedException e) {
115 // ignore
116 }
117 }
118 return response;
119 }
120
121 public synchronized void onMessage(Message message) {
122 try {
123 message.remove(LocalConduit.DIRECT_DISPATCH);
124 this.response = message;
125 }
126 finally {
127 latch.countDown();
128 }
129 }
130 }
131 }