001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.jeneva.server;
017    
018    import java.io.ByteArrayOutputStream;
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.net.UnknownHostException;
022    import java.rmi.AccessException;
023    import java.rmi.AlreadyBoundException;
024    import java.rmi.NotBoundException;
025    import java.rmi.Remote;
026    import java.rmi.RemoteException;
027    import java.rmi.registry.LocateRegistry;
028    import java.rmi.registry.Registry;
029    import java.rmi.server.UnicastRemoteObject;
030    import java.util.LinkedList;
031    import java.util.Queue;
032    import java.util.concurrent.BrokenBarrierException;
033    
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    import de.kumpe.hadooptimizer.jeneva.client.ClientService;
038    import de.kumpe.hadooptimizer.jeneva.server.ServerTask.ClientTaskService;
039    
040    public final class JenevaServer<I> {
041            static final Log log = LogFactory.getLog(JenevaServer.class);
042    
043            private final class ServerServiceImpl implements ServerService {
044                    @Override
045                    public <V> V execute(final ServerTask<V> serverTask) throws Exception {
046                            if (log.isDebugEnabled()) {
047                                    log.debug("Executing ServerTask: " + serverTask);
048                            }
049                            serverTask.port = port;
050                            serverTask.clientTaskService = clientService;
051                            return serverTask.call();
052                    }
053            }
054    
055            private final class ClientServiceImpl implements ClientService,
056                            ClientTaskService {
057                    private final Queue<Runnable> waitingTasks = new LinkedList<Runnable>();
058                    private Runnable currentClientTask;
059    
060                    @Override
061                    public synchronized Runnable nextTask() throws RemoteException,
062                                    InterruptedException {
063                            while (null == currentClientTask) {
064                                    if (log.isTraceEnabled()) {
065                                            log.trace("Waiting for next client task...");
066                                    }
067                                    wait();
068                            }
069                            if (log.isDebugEnabled()) {
070                                    log.debug("Returning next client task...");
071                            }
072                            return currentClientTask;
073                    }
074    
075                    @Override
076                    public synchronized void addClientTask(final Runnable clientTask) {
077                            if (log.isTraceEnabled()) {
078                                    log.trace("Adding next client task: " + clientTask);
079                            }
080                            waitingTasks.add(clientTask);
081                            if (null == currentClientTask) {
082                                    setCurrentClientTask(waitingTasks.poll());
083                            }
084                    }
085    
086                    @Override
087                    public synchronized void removeClientTask(final Runnable clientTask) {
088                            if (log.isTraceEnabled()) {
089                                    log.trace("Removing next client task: " + clientTask);
090                            }
091                            waitingTasks.remove(clientTask);
092                            if (clientTask.equals(currentClientTask)) {
093                                    setCurrentClientTask(null);
094                            }
095                    }
096    
097                    private synchronized void setCurrentClientTask(final Runnable clientTask) {
098                            if (log.isTraceEnabled()) {
099                                    log.trace("Set current client task: " + clientTask);
100                            }
101                            currentClientTask = clientTask;
102                            notifyAll();
103                    }
104    
105                    @Override
106                    public byte[] findClass(final String name)
107                                    throws ClassNotFoundException {
108                            if (log.isDebugEnabled()) {
109                                    log.debug("Finding class " + name);
110                            }
111    
112                            try {
113                                    final String classFile = "/" + name.replace('.', '/')
114                                                    + ".class";
115                                    final InputStream input = getClass().getResourceAsStream(
116                                                    classFile);
117                                    if (null == input) {
118                                            throw new ClassNotFoundException("cannot find class file");
119                                    }
120    
121                                    final ByteArrayOutputStream output = new ByteArrayOutputStream(
122                                                    input.available());
123                                    final byte[] buf = new byte[1024];
124                                    int read;
125                                    while (0 <= (read = input.read(buf))) {
126                                            output.write(buf, 0, read);
127                                    }
128    
129                                    if (log.isTraceEnabled()) {
130                                            log.trace("Class found.");
131                                    }
132    
133                                    return output.toByteArray();
134                            } catch (final IOException e) {
135                                    throw new ClassNotFoundException("cannot read class file", e);
136                            }
137                    }
138            }
139    
140            public static <I> void main(final String[] args) throws RemoteException,
141                            AlreadyBoundException, InterruptedException,
142                            BrokenBarrierException, UnknownHostException {
143    
144                    if (null == System.getSecurityManager()) {
145                            if (log.isInfoEnabled()) {
146                                    log.info("Creating new SecurityManager...");
147                            }
148                            System.setSecurityManager(new SecurityManager());
149                    }
150    
151                    new JenevaServer<I>(args).run();
152                    if (log.isInfoEnabled()) {
153                            log.info("Shutting down JenevaServer...");
154                    }
155                    System.exit(0);
156            }
157    
158            public static ServerService getServerService() throws RemoteException,
159                            NotBoundException {
160                    final String host = getSystemProperty("jeneva.host");
161                    final int port = Integer.parseInt(getSystemProperty("jeneva.port"));
162    
163                    return getServerService(host, port);
164            }
165    
166            public static ServerService getServerService(final String host,
167                            final int port) throws RemoteException, NotBoundException,
168                            AccessException {
169                    final Registry registry = LocateRegistry.getRegistry(host, port);
170    
171                    final ServerService serverService = (ServerService) registry
172                                    .lookup(ServerService.class.getSimpleName());
173                    return serverService;
174            }
175    
176            private final ServerServiceImpl serverService = new ServerServiceImpl();
177            private final ClientServiceImpl clientService = new ClientServiceImpl();
178            private final int port;
179    
180            private boolean quit;
181    
182            private JenevaServer(final String[] args) {
183                    port = Integer.parseInt(getSystemProperty("jeneva.port"));
184            }
185    
186            private void run() throws RemoteException, AlreadyBoundException {
187                    if (log.isInfoEnabled()) {
188                            log.info("Creating Registry...");
189                    }
190                    final Registry registry = LocateRegistry.createRegistry(port);
191    
192                    if (log.isInfoEnabled()) {
193                            log.info("Exporting ClientService...");
194                    }
195                    final Remote clientServiceStub = UnicastRemoteObject.exportObject(
196                                    clientService, port);
197                    if (log.isTraceEnabled()) {
198                            log.trace("Stub: " + clientServiceStub);
199                    }
200                    if (log.isInfoEnabled()) {
201                            log.info("Binding ClientService to Registry...");
202                    }
203                    registry.bind(ClientService.class.getSimpleName(), clientServiceStub);
204    
205                    if (log.isInfoEnabled()) {
206                            log.info("Exporting ServerService...");
207                    }
208                    final Remote serverServiceStub = UnicastRemoteObject.exportObject(
209                                    serverService, port);
210                    if (log.isTraceEnabled()) {
211                            log.trace("Stub: " + serverServiceStub);
212                    }
213                    if (log.isInfoEnabled()) {
214                            log.info("Binding ServerService to Registry...");
215                    }
216                    registry.bind(ServerService.class.getSimpleName(), serverServiceStub);
217    
218                    if (log.isInfoEnabled()) {
219                            log.info("JenevaServer ready.");
220                    }
221    
222                    waitForQuit();
223            }
224    
225            private static String getSystemProperty(final String property) {
226                    final String value = System.getProperty(property);
227                    if (null == value) {
228                            throw new NullPointerException(property
229                                            + " system property was not specified");
230                    }
231                    return value;
232            }
233    
234            public synchronized void quit() {
235                    quit = true;
236                    notifyAll();
237            }
238    
239            private synchronized void waitForQuit() {
240                    try {
241                            while (!quit) {
242                                    this.wait();
243                            }
244                    } catch (final InterruptedException e) {
245                            // just return
246                    }
247            }
248    }