001 /****************************************************************
002 * Licensed to the Apache Software Foundation (ASF) under one *
003 * or more contributor license agreements. See the NOTICE file *
004 * distributed with this work for additional information *
005 * regarding copyright ownership. The ASF licenses this file *
006 * to you under the Apache License, Version 2.0 (the *
007 * "License"); you may not use this file except in compliance *
008 * with the License. You may obtain a copy of the License at *
009 * *
010 * http://www.apache.org/licenses/LICENSE-2.0 *
011 * *
012 * Unless required by applicable law or agreed to in writing, *
013 * software distributed under the License is distributed on an *
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015 * KIND, either express or implied. See the License for the *
016 * specific language governing permissions and limitations *
017 * under the License. *
018 ****************************************************************/
019
020 package org.apache.james.jspf.executor;
021
022 import org.apache.james.jspf.core.DNSLookupContinuation;
023 import org.apache.james.jspf.core.DNSResponse;
024 import org.apache.james.jspf.core.Logger;
025 import org.apache.james.jspf.core.SPFChecker;
026 import org.apache.james.jspf.core.SPFCheckerExceptionCatcher;
027 import org.apache.james.jspf.core.SPFSession;
028 import org.apache.james.jspf.core.exceptions.SPFResultException;
029 import org.apache.james.jspf.core.exceptions.TimeoutException;
030
031 import java.util.Collections;
032 import java.util.HashMap;
033 import java.util.LinkedList;
034 import java.util.Map;
035
036 /**
037 * Async implementation of SPFExecutor
038 *
039 */
040 public class StagedMultipleSPFExecutor implements SPFExecutor, Runnable {
041
042 private static final String ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION = "StagedMultipleSPFExecutor.continuation";
043
044 private static class ResponseQueueImpl extends LinkedList<IResponse> implements IResponseQueue {
045
046 private static final long serialVersionUID = 5714025260393791651L;
047
048 private int waitingThreads = 0;
049
050 /**
051 * @see org.apache.james.jspf.executor.IResponseQueue#insertResponse(org.apache.james.jspf.executor.IResponse)
052 */
053 public synchronized void insertResponse(IResponse r) {
054 addLast(r);
055 notify();
056 }
057
058 /**
059 * @see org.apache.james.jspf.executor.IResponseQueue#removeResponse()
060 */
061 public synchronized IResponse removeResponse() {
062 if ( (size() - waitingThreads <= 0) ) {
063 try { waitingThreads++; wait();}
064 catch (InterruptedException e) {Thread.interrupted();}
065 waitingThreads--;
066 }
067 return (IResponse)removeFirst(); }
068
069 }
070
071 // Use short as id because the id header is limited to 16 bit
072 // From RFC1035 4.1.1. Header section format :
073 //
074 // ID A 16 bit identifier assigned by the program that
075 // generates any kind of query. This identifier is copied
076 // the corresponding reply and can be used by the requester
077 // to match up replies to outstanding queries.
078 //
079 private static short id;
080
081 private synchronized int nextId() {
082 return id++;
083 }
084
085 private Logger log;
086 private DNSAsynchLookupService dnsProbe;
087 private Thread worker;
088 private Map<Integer,SPFSession> sessions;
089 private Map<Integer,FutureSPFResult>results;
090 private ResponseQueueImpl responseQueue;
091
092 public StagedMultipleSPFExecutor(Logger log, DNSAsynchLookupService service) {
093 this.log = log;
094 this.dnsProbe = service;
095
096 this.responseQueue = new ResponseQueueImpl();
097
098 this.sessions = Collections.synchronizedMap(new HashMap<Integer,SPFSession>());
099 this.results = Collections.synchronizedMap(new HashMap<Integer,FutureSPFResult>());
100
101 this.worker = new Thread(this);
102 this.worker.setDaemon(true);
103 this.worker.setName("SPFExecutor");
104 this.worker.start();
105 }
106
107 /**
108 * Execute the non-blocking part of the processing and returns.
109 * If the working queue is full (50 pending responses) this method will not return
110 * until the queue is again not full.
111 *
112 * @see org.apache.james.jspf.executor.SPFExecutor#execute(org.apache.james.jspf.core.SPFSession, org.apache.james.jspf.executor.FutureSPFResult)
113 */
114 public void execute(SPFSession session, FutureSPFResult result) {
115 execute(session, result, true);
116 }
117
118 public void execute(SPFSession session, FutureSPFResult result, boolean throttle) {
119 SPFChecker checker;
120 while ((checker = session.popChecker()) != null) {
121 // only execute checkers we added (better recursivity)
122 log.debug("Executing checker: " + checker);
123 try {
124 DNSLookupContinuation cont = checker.checkSPF(session);
125 // if the checker returns a continuation we return it
126 if (cont != null) {
127 invokeAsynchService(session, result, cont, throttle);
128 return;
129 }
130 } catch (Exception e) {
131 while (e != null) {
132 while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
133 checker = session.popChecker();
134 }
135 try {
136 ((SPFCheckerExceptionCatcher) checker).onException(e, session);
137 e = null;
138 } catch (SPFResultException ex) {
139 e = ex;
140 } finally {
141 checker = null;
142 }
143 }
144 }
145 }
146 result.setSPFResult(session);
147 }
148
149 /**
150 * throttle should be true only when the caller thread is the client and not the worker thread.
151 * We could even remove the throttle parameter and check the currentThread.
152 * This way the worker is never "blocked" while outside callers will be blocked if our
153 * queue is too big (so this is not fully "asynchronous").
154 */
155 private synchronized void invokeAsynchService(SPFSession session,
156 FutureSPFResult result, DNSLookupContinuation cont, boolean throttle) {
157 while (throttle && results.size() > 50) {
158 try {
159 this.wait(100);
160 } catch (InterruptedException e) {
161 }
162 }
163 int nextId = nextId();
164 sessions.put(new Integer(nextId), session);
165 results.put(new Integer(nextId), result);
166 session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
167 dnsProbe.getRecordsAsynch(cont.getRequest(), nextId, responseQueue);
168 }
169
170 public void run() {
171
172 while (true) {
173
174 IResponse resp = responseQueue.removeResponse();
175
176 Integer respId = (Integer)resp.getId();
177 SPFSession session = sessions.remove(respId);
178 FutureSPFResult result = results.remove(respId);
179
180 DNSLookupContinuation cont = (DNSLookupContinuation) session.getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
181
182 DNSResponse response;
183 if (resp.getException() != null) {
184 response = new DNSResponse((TimeoutException) resp.getException());
185 } else {
186 response = new DNSResponse(resp.getValue());
187 }
188
189
190 try {
191 cont = cont.getListener().onDNSResponse(response, session);
192
193 if (cont != null) {
194 invokeAsynchService(session, result, cont, false);
195 } else {
196 execute(session, result, false);
197 }
198
199 } catch (Exception e) {
200 SPFChecker checker = null;
201 while (e != null) {
202 while (checker == null || !(checker instanceof SPFCheckerExceptionCatcher)) {
203 checker = session.popChecker();
204 }
205 try {
206 ((SPFCheckerExceptionCatcher) checker).onException(e, session);
207 e = null;
208 } catch (SPFResultException ex) {
209 e = ex;
210 } finally {
211 checker = null;
212 }
213 }
214 execute(session, result, false);
215 }
216 }
217 }
218
219 }