001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.cxf;
018
019 import java.io.IOException;
020 import java.util.concurrent.CountDownLatch;
021
022 import org.apache.camel.Exchange;
023 import org.apache.camel.RuntimeCamelException;
024 import org.apache.camel.impl.DefaultProducer;
025 import org.apache.cxf.message.ExchangeImpl;
026 import org.apache.cxf.message.Message;
027 import org.apache.cxf.message.MessageImpl;
028 import org.apache.cxf.service.model.EndpointInfo;
029 import org.apache.cxf.transport.Conduit;
030 import org.apache.cxf.transport.Destination;
031 import org.apache.cxf.transport.MessageObserver;
032 import org.apache.cxf.transport.local.LocalConduit;
033 import org.apache.cxf.transport.local.LocalTransportFactory;
034
035 /**
036 * Sends messages from Camel into the CXF endpoint
037 *
038 * @version $Revision: 563665 $
039 */
040 public class CxfProducer extends DefaultProducer {
041 private CxfEndpoint endpoint;
042 private final LocalTransportFactory transportFactory;
043 private Destination destination;
044 private Conduit conduit;
045 private ResultFuture future = new ResultFuture();
046
047 public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) {
048 super(endpoint);
049 this.endpoint = endpoint;
050 this.transportFactory = transportFactory;
051 }
052
053 public void process(Exchange exchange) {
054 CxfExchange cxfExchange = endpoint.toExchangeType(exchange);
055 process(cxfExchange);
056 }
057
058 public void process(CxfExchange exchange) {
059 try {
060 CxfBinding binding = endpoint.getBinding();
061 MessageImpl m = binding.createCxfMessage(exchange);
062 ExchangeImpl e = new ExchangeImpl();
063 e.setInMessage(m);
064 m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
065 m.setDestination(destination);
066 synchronized (conduit) {
067 conduit.prepare(m);
068
069 // now lets wait for the response
070 if (endpoint.isInOut()) {
071 Message response = future.getResponse();
072
073 // TODO - why do we need to ignore the returned message and
074 // get the out message from the exchange!
075 response = e.getOutMessage();
076 binding.storeCxfResponse(exchange, response);
077 }
078 }
079 } catch (IOException e) {
080 throw new RuntimeCamelException(e);
081 }
082 }
083
084 @Override
085 protected void doStart() throws Exception {
086 super.doStart();
087 EndpointInfo endpointInfo = endpoint.getEndpointInfo();
088 destination = transportFactory.getDestination(endpointInfo);
089
090 // Set up a listener for the response
091 conduit = transportFactory.getConduit(endpointInfo);
092 conduit.setMessageObserver(future);
093 }
094
095 @Override
096 protected void doStop() throws Exception {
097 super.doStop();
098
099 if (conduit != null) {
100 conduit.close();
101 }
102 }
103
104 protected class ResultFuture implements MessageObserver {
105 Message response;
106 CountDownLatch latch = new CountDownLatch(1);
107
108 public Message getResponse() {
109 while (response == null) {
110 try {
111 latch.await();
112 } catch (InterruptedException e) {
113 // ignore
114 }
115 }
116 return response;
117 }
118
119 public synchronized void onMessage(Message message) {
120 try {
121 message.remove(LocalConduit.DIRECT_DISPATCH);
122 this.response = message;
123 } finally {
124 latch.countDown();
125 }
126 }
127 }
128 }