001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.jeneva.server;
017
018 import java.io.ByteArrayOutputStream;
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.net.UnknownHostException;
022 import java.rmi.AccessException;
023 import java.rmi.AlreadyBoundException;
024 import java.rmi.NotBoundException;
025 import java.rmi.Remote;
026 import java.rmi.RemoteException;
027 import java.rmi.registry.LocateRegistry;
028 import java.rmi.registry.Registry;
029 import java.rmi.server.UnicastRemoteObject;
030 import java.util.LinkedList;
031 import java.util.Queue;
032 import java.util.concurrent.BrokenBarrierException;
033
034 import org.apache.commons.logging.Log;
035 import org.apache.commons.logging.LogFactory;
036
037 import de.kumpe.hadooptimizer.jeneva.client.ClientService;
038 import de.kumpe.hadooptimizer.jeneva.server.ServerTask.ClientTaskService;
039
040 public final class JenevaServer<I> {
041 static final Log log = LogFactory.getLog(JenevaServer.class);
042
043 private final class ServerServiceImpl implements ServerService {
044 @Override
045 public <V> V execute(final ServerTask<V> serverTask) throws Exception {
046 if (log.isDebugEnabled()) {
047 log.debug("Executing ServerTask: " + serverTask);
048 }
049 serverTask.port = port;
050 serverTask.clientTaskService = clientService;
051 return serverTask.call();
052 }
053 }
054
055 private final class ClientServiceImpl implements ClientService,
056 ClientTaskService {
057 private final Queue<Runnable> waitingTasks = new LinkedList<Runnable>();
058 private Runnable currentClientTask;
059
060 @Override
061 public synchronized Runnable nextTask() throws RemoteException,
062 InterruptedException {
063 while (null == currentClientTask) {
064 if (log.isTraceEnabled()) {
065 log.trace("Waiting for next client task...");
066 }
067 wait();
068 }
069 if (log.isDebugEnabled()) {
070 log.debug("Returning next client task...");
071 }
072 return currentClientTask;
073 }
074
075 @Override
076 public synchronized void addClientTask(final Runnable clientTask) {
077 if (log.isTraceEnabled()) {
078 log.trace("Adding next client task: " + clientTask);
079 }
080 waitingTasks.add(clientTask);
081 if (null == currentClientTask) {
082 setCurrentClientTask(waitingTasks.poll());
083 }
084 }
085
086 @Override
087 public synchronized void removeClientTask(final Runnable clientTask) {
088 if (log.isTraceEnabled()) {
089 log.trace("Removing next client task: " + clientTask);
090 }
091 waitingTasks.remove(clientTask);
092 if (clientTask.equals(currentClientTask)) {
093 setCurrentClientTask(null);
094 }
095 }
096
097 private synchronized void setCurrentClientTask(final Runnable clientTask) {
098 if (log.isTraceEnabled()) {
099 log.trace("Set current client task: " + clientTask);
100 }
101 currentClientTask = clientTask;
102 notifyAll();
103 }
104
105 @Override
106 public byte[] findClass(final String name)
107 throws ClassNotFoundException {
108 if (log.isDebugEnabled()) {
109 log.debug("Finding class " + name);
110 }
111
112 try {
113 final String classFile = "/" + name.replace('.', '/')
114 + ".class";
115 final InputStream input = getClass().getResourceAsStream(
116 classFile);
117 if (null == input) {
118 throw new ClassNotFoundException("cannot find class file");
119 }
120
121 final ByteArrayOutputStream output = new ByteArrayOutputStream(
122 input.available());
123 final byte[] buf = new byte[1024];
124 int read;
125 while (0 <= (read = input.read(buf))) {
126 output.write(buf, 0, read);
127 }
128
129 if (log.isTraceEnabled()) {
130 log.trace("Class found.");
131 }
132
133 return output.toByteArray();
134 } catch (final IOException e) {
135 throw new ClassNotFoundException("cannot read class file", e);
136 }
137 }
138 }
139
140 public static <I> void main(final String[] args) throws RemoteException,
141 AlreadyBoundException, InterruptedException,
142 BrokenBarrierException, UnknownHostException {
143
144 if (null == System.getSecurityManager()) {
145 if (log.isInfoEnabled()) {
146 log.info("Creating new SecurityManager...");
147 }
148 System.setSecurityManager(new SecurityManager());
149 }
150
151 new JenevaServer<I>(args).run();
152 if (log.isInfoEnabled()) {
153 log.info("Shutting down JenevaServer...");
154 }
155 System.exit(0);
156 }
157
158 public static ServerService getServerService() throws RemoteException,
159 NotBoundException {
160 final String host = getSystemProperty("jeneva.host");
161 final int port = Integer.parseInt(getSystemProperty("jeneva.port"));
162
163 return getServerService(host, port);
164 }
165
166 public static ServerService getServerService(final String host,
167 final int port) throws RemoteException, NotBoundException,
168 AccessException {
169 final Registry registry = LocateRegistry.getRegistry(host, port);
170
171 final ServerService serverService = (ServerService) registry
172 .lookup(ServerService.class.getSimpleName());
173 return serverService;
174 }
175
176 private final ServerServiceImpl serverService = new ServerServiceImpl();
177 private final ClientServiceImpl clientService = new ClientServiceImpl();
178 private final int port;
179
180 private boolean quit;
181
182 private JenevaServer(final String[] args) {
183 port = Integer.parseInt(getSystemProperty("jeneva.port"));
184 }
185
186 private void run() throws RemoteException, AlreadyBoundException {
187 if (log.isInfoEnabled()) {
188 log.info("Creating Registry...");
189 }
190 final Registry registry = LocateRegistry.createRegistry(port);
191
192 if (log.isInfoEnabled()) {
193 log.info("Exporting ClientService...");
194 }
195 final Remote clientServiceStub = UnicastRemoteObject.exportObject(
196 clientService, port);
197 if (log.isTraceEnabled()) {
198 log.trace("Stub: " + clientServiceStub);
199 }
200 if (log.isInfoEnabled()) {
201 log.info("Binding ClientService to Registry...");
202 }
203 registry.bind(ClientService.class.getSimpleName(), clientServiceStub);
204
205 if (log.isInfoEnabled()) {
206 log.info("Exporting ServerService...");
207 }
208 final Remote serverServiceStub = UnicastRemoteObject.exportObject(
209 serverService, port);
210 if (log.isTraceEnabled()) {
211 log.trace("Stub: " + serverServiceStub);
212 }
213 if (log.isInfoEnabled()) {
214 log.info("Binding ServerService to Registry...");
215 }
216 registry.bind(ServerService.class.getSimpleName(), serverServiceStub);
217
218 if (log.isInfoEnabled()) {
219 log.info("JenevaServer ready.");
220 }
221
222 waitForQuit();
223 }
224
225 private static String getSystemProperty(final String property) {
226 final String value = System.getProperty(property);
227 if (null == value) {
228 throw new NullPointerException(property
229 + " system property was not specified");
230 }
231 return value;
232 }
233
234 public synchronized void quit() {
235 quit = true;
236 notifyAll();
237 }
238
239 private synchronized void waitForQuit() {
240 try {
241 while (!quit) {
242 this.wait();
243 }
244 } catch (final InterruptedException e) {
245 // just return
246 }
247 }
248 }