1 /***************************************************************************************
2 * Copyright (c) Jonas BonŽr, Alexandre Vasseur. All rights reserved. *
3 * http://aspectwerkz.codehaus.org *
4 * ---------------------------------------------------------------------------------- *
5 * The software in this package is published under the terms of the LGPL license *
6 * a copy of which has been included with this distribution in the license.txt file. *
7 **************************************************************************************/
8 package org.codehaus.aspectwerkz.connectivity;
9
10 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
11 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
12 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
13 import org.codehaus.aspectwerkz.exception.WrappedRuntimeException;
14
15 import java.io.FileInputStream;
16 import java.io.IOException;
17 import java.net.InetAddress;
18 import java.net.ServerSocket;
19 import java.net.Socket;
20 import java.util.Properties;
21
22 /***
23 * Server that listens to a specified port for client requests. <p/>The implementation is based on sockets. <p/>The
24 * invoker spawns a specified number of listener threads in which each one of these spawns a new RemoteProxyServerThread
25 * for each client request that comes in. <p/>Uses a thread pool from util.concurrent.
26 *
27 * @author <a href="mailto:jboner@codehaus.org">Jonas BonŽr </a>
28 */
29 public class RemoteProxyServer implements Runnable {
30 private static String HOST_NAME;
31
32 private static int PORT;
33
34 private static boolean BOUNDED_THREAD_POOL;
35
36 private static boolean LISTENER_THREAD_RUN_AS_DAEMON;
37
38 private static int BACKLOG;
39
40 private static int NUM_LISTENER_THREADS;
41
42 private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY;
43
44 private static int CLIENT_THREAD_TIMEOUT;
45
46 private static int THREAD_POOL_MAX_SIZE;
47
48 private static int THREAD_POOL_MIN_SIZE;
49
50 private static int THREAD_POOL_INIT_SIZE;
51
52 private static int THREAD_POOL_KEEP_ALIVE_TIME;
53
54 private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED;
55
56 /***
57 * Initalize the server properties.
58 */
59 static {
60 Properties properties = new Properties();
61 try {
62 properties.load(new FileInputStream(System.getProperty("aspectwerkz.resource.bundle")));
63 } catch (Exception e) {
64 System.out.println("no aspectwerkz resource bundle found on classpath, using defaults");
65
66
67 }
68 String property = properties.getProperty("remote.server.hostname");
69 if (property == null) {
70 HOST_NAME = property;
71 } else {
72 HOST_NAME = property;
73 }
74 property = properties.getProperty("remote.server.port");
75 if (property == null) {
76 PORT = 7777;
77 } else {
78 PORT = Integer.parseInt(property);
79 }
80 property = properties.getProperty("remote.server.listener.threads.backlog");
81 if (property == null) {
82 BACKLOG = 200;
83 } else {
84 BACKLOG = Integer.parseInt(property);
85 }
86 property = properties.getProperty("remote.server.listener.threads.nr");
87 if (property == null) {
88 NUM_LISTENER_THREADS = 10;
89 } else {
90 NUM_LISTENER_THREADS = Integer.parseInt(property);
91 }
92 property = properties.getProperty("remote.server.client.threads.timeout");
93 if (property == null) {
94 CLIENT_THREAD_TIMEOUT = 60000;
95 } else {
96 CLIENT_THREAD_TIMEOUT = Integer.parseInt(property);
97 }
98 property = properties.getProperty("remote.server.thread.pool.max.size");
99 if (property == null) {
100 THREAD_POOL_MAX_SIZE = 100;
101 } else {
102 THREAD_POOL_MAX_SIZE = Integer.parseInt(property);
103 }
104 property = properties.getProperty("remote.server.thread.pool.min.size");
105 if (property == null) {
106 THREAD_POOL_MIN_SIZE = 10;
107 } else {
108 THREAD_POOL_MIN_SIZE = Integer.parseInt(property);
109 }
110 property = properties.getProperty("remote.server.thread.pool.init.size");
111 if (property == null) {
112 THREAD_POOL_INIT_SIZE = 10;
113 } else {
114 THREAD_POOL_INIT_SIZE = Integer.parseInt(property);
115 }
116 property = properties.getProperty("remote.server.thread.pool.keep.alive.time");
117 if (property == null) {
118 THREAD_POOL_KEEP_ALIVE_TIME = 300000;
119 } else {
120 THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property);
121 }
122 property = properties.getProperty("remote.server.thread.pool.type");
123 if ((property != null) && property.equals("dynamic")) {
124 BOUNDED_THREAD_POOL = false;
125 } else {
126 BOUNDED_THREAD_POOL = true;
127 }
128 property = properties.getProperty("remote.server.listener.threads.run.as.daemon");
129 if ((property != null) && property.equals("true")) {
130 LISTENER_THREAD_RUN_AS_DAEMON = true;
131 } else {
132 LISTENER_THREAD_RUN_AS_DAEMON = false;
133 }
134 property = properties.getProperty("remote.server.thread.pool.wait.when.blocked");
135 if ((property != null) && property.equals("true")) {
136 THREAD_POOL_WAIT_WHEN_BLOCKED = true;
137 } else {
138 THREAD_POOL_WAIT_WHEN_BLOCKED = false;
139 }
140 }
141
142 /***
143 * The server socket.
144 */
145 private ServerSocket m_serverSocket = null;
146
147 /***
148 * All listener threads.
149 */
150 private Thread[] m_listenerThreads = null;
151
152 /***
153 * The thread pool.
154 */
155 private PooledExecutor m_threadPool = null;
156
157 /***
158 * The class loader to use.
159 */
160 private ClassLoader m_loader = null;
161
162 /***
163 * The invoker instance.
164 */
165 private Invoker m_invoker = null;
166
167 /***
168 * Marks the server as running.
169 */
170 private boolean m_running = true;
171
172 /***
173 * Starts a server object and starts listening for client access.
174 *
175 * @param loader the classloader to use
176 * @param invoker the invoker that makes the method invocation in the client thread
177 */
178 public RemoteProxyServer(final ClassLoader loader, final Invoker invoker) {
179 m_invoker = invoker;
180 m_loader = loader;
181 }
182
183 /***
184 * Starts up the proxy server.
185 */
186 public void start() {
187 m_running = true;
188 try {
189 InetAddress bindAddress = InetAddress.getByName(HOST_NAME);
190 m_serverSocket = new ServerSocket(PORT, BACKLOG, bindAddress);
191 if (BOUNDED_THREAD_POOL) {
192 createBoundedThreadPool(
193 THREAD_POOL_MAX_SIZE,
194 THREAD_POOL_MIN_SIZE,
195 THREAD_POOL_INIT_SIZE,
196 THREAD_POOL_KEEP_ALIVE_TIME,
197 THREAD_POOL_WAIT_WHEN_BLOCKED
198 );
199 } else {
200 createDynamicThreadPool(THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE, THREAD_POOL_KEEP_ALIVE_TIME);
201 }
202 m_listenerThreads = new Thread[NUM_LISTENER_THREADS];
203 for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
204 m_listenerThreads[i] = new Thread(this);
205 m_listenerThreads[i].setName("AspectWerkz::Listener " + (i + 1));
206 m_listenerThreads[i].setDaemon(LISTENER_THREAD_RUN_AS_DAEMON);
207 m_listenerThreads[i].setPriority(LISTENER_THREAD_PRIORITY);
208 m_listenerThreads[i].start();
209 }
210 } catch (IOException e) {
211 throw new WrappedRuntimeException(e);
212 }
213 }
214
215 /***
216 * Stops the socket proxy server.
217 */
218 public void stop() {
219 m_running = false;
220 for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
221 m_listenerThreads[i].interrupt();
222 }
223 m_threadPool.shutdownNow();
224 }
225
226 /***
227 * Does the actual work of listening for a client request and spawns a new RemoteProxyServerThread to serve the
228 * client.
229 */
230 public void run() {
231 try {
232 while (m_running) {
233 final Socket clientSocket = m_serverSocket.accept();
234 synchronized (m_threadPool) {
235 m_threadPool.execute(
236 new RemoteProxyServerThread(
237 clientSocket,
238 m_loader,
239 m_invoker,
240 CLIENT_THREAD_TIMEOUT
241 )
242 );
243 }
244 }
245 m_serverSocket.close();
246 } catch (Exception e) {
247 throw new WrappedRuntimeException(e);
248 }
249 }
250
251 /***
252 * Creates a new bounded thread pool.
253 *
254 * @param threadPoolMaxSize
255 * @param threadPoolMinSize
256 * @param threadPoolInitSize
257 * @param keepAliveTime
258 * @param waitWhenBlocked
259 */
260 private void createBoundedThreadPool(final int threadPoolMaxSize,
261 final int threadPoolMinSize,
262 final int threadPoolInitSize,
263 final int keepAliveTime,
264 final boolean waitWhenBlocked) {
265 m_threadPool = new PooledExecutor(new BoundedBuffer(threadPoolInitSize), threadPoolMaxSize);
266 m_threadPool.setKeepAliveTime(keepAliveTime);
267 m_threadPool.createThreads(threadPoolInitSize);
268 m_threadPool.setMinimumPoolSize(threadPoolMinSize);
269 if (waitWhenBlocked) {
270 m_threadPool.waitWhenBlocked();
271 }
272 }
273
274 /***
275 * Creates a new dynamic thread pool
276 *
277 * @param threadPoolMinSize
278 * @param threadPoolInitSize
279 * @param keepAliveTime
280 */
281 private void createDynamicThreadPool(final int threadPoolMinSize,
282 final int threadPoolInitSize,
283 final int keepAliveTime) {
284 m_threadPool = new PooledExecutor(new LinkedQueue());
285 m_threadPool.setKeepAliveTime(keepAliveTime);
286 m_threadPool.createThreads(threadPoolInitSize);
287 m_threadPool.setMinimumPoolSize(threadPoolMinSize);
288 }
289 }