001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.jeneva.client;
017    
018    import java.rmi.NotBoundException;
019    import java.rmi.RemoteException;
020    import java.rmi.registry.LocateRegistry;
021    import java.rmi.registry.Registry;
022    
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    
026    public final class JenevaClient<I> implements Runnable {
027            private static final Log log = LogFactory.getLog(JenevaClient.class);
028    
029            private static class RemoteClassLoader extends ClassLoader {
030                    private final ClientService clientService;
031    
032                    public RemoteClassLoader(final ClientService clientService,
033                                    final ClassLoader parent) {
034                            super(parent);
035                            this.clientService = clientService;
036                    }
037    
038                    @Override
039                    protected Class<?> findClass(final String name)
040                                    throws ClassNotFoundException {
041                            if (log.isDebugEnabled()) {
042                                    log.debug("Retrieving class: " + name);
043                            }
044                            try {
045                                    return super.findClass(name);
046                            } catch (final ClassNotFoundException e) {
047                                    // continue
048                            }
049                            try {
050                                    final byte[] bytes = clientService.findClass(name);
051                                    final Class<?> defineClass = defineClass(name, bytes, 0,
052                                                    bytes.length);
053                                    return defineClass;
054                            } catch (final RemoteException e) {
055                                    throw new ClassNotFoundException("remote exception", e);
056                            }
057                    }
058            }
059    
060            public static <I> void main(final String[] args) throws RemoteException,
061                            NotBoundException {
062                    if (log.isInfoEnabled()) {
063                            log.info("Starting JenevaClient...");
064                    }
065                    new JenevaClient<I>(args).start();
066            }
067    
068            private final ClientService clientService;
069    
070            private JenevaClient(final String[] args) throws RemoteException,
071                            NotBoundException {
072                    clientService = connectToClientService();
073            }
074    
075            private void start() {
076                    final ClassLoader contextClassLoader = Thread.currentThread()
077                                    .getContextClassLoader();
078    
079                    final int nrOfWorkers = Runtime.getRuntime().availableProcessors();
080                    final ThreadGroup workerThreadGroup = new ThreadGroup("Workers");
081                    for (int i = 0; i < nrOfWorkers; i++) {
082                            final ClassLoader classLoader = new RemoteClassLoader(
083                                            clientService, contextClassLoader);
084                            final Thread worker = new Thread(workerThreadGroup, this, "Worker "
085                                            + i);
086                            worker.setContextClassLoader(classLoader);
087                            worker.start();
088                    }
089            }
090    
091            private ClientService connectToClientService() throws RemoteException,
092                            NotBoundException {
093                    final String host = getSystemProperty("jeneva.host");
094                    final int port = Integer.parseInt(getSystemProperty("jeneva.port"));
095                    final Registry registry = LocateRegistry.getRegistry(host, port);
096    
097                    final ClientService clientService = (ClientService) registry
098                                    .lookup(ClientService.class.getSimpleName());
099    
100                    return clientService;
101            }
102    
103            private String getSystemProperty(final String property) {
104                    final String value = System.getProperty(property);
105                    if (null == value) {
106                            throw new NullPointerException(property
107                                            + " system property was not specified");
108                    }
109                    return value;
110            }
111    
112            @Override
113            public void run() {
114                    if (log.isInfoEnabled()) {
115                            log.info("Worker started.");
116                    }
117                    try {
118                            for (;;) {
119                                    if (log.isDebugEnabled()) {
120                                            log.debug("Fetching next task...");
121                                    }
122                                    final Runnable task = clientService.nextTask();
123                                    if (log.isTraceEnabled()) {
124                                            log.trace("Running task: " + task);
125                                    }
126                                    task.run();
127                            }
128                    } catch (final RemoteException e) {
129                            if (log.isErrorEnabled()) {
130                                    log.error("Remote exception:", e);
131                            }
132                    } catch (final InterruptedException e) {
133                            if (log.isWarnEnabled()) {
134                                    log.warn("Interrupted!");
135                            }
136                    } catch (final RuntimeException e) {
137                            if (log.isErrorEnabled()) {
138                                    log.error("Runtime exception:", e);
139                            }
140                    } finally {
141                            if (log.isInfoEnabled()) {
142                                    log.info("Worker finished.");
143                            }
144                    }
145            }
146    }