001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.jeneva.client;
017
018 import java.rmi.NotBoundException;
019 import java.rmi.RemoteException;
020 import java.rmi.registry.LocateRegistry;
021 import java.rmi.registry.Registry;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026 public final class JenevaClient<I> implements Runnable {
027 private static final Log log = LogFactory.getLog(JenevaClient.class);
028
029 private static class RemoteClassLoader extends ClassLoader {
030 private final ClientService clientService;
031
032 public RemoteClassLoader(final ClientService clientService,
033 final ClassLoader parent) {
034 super(parent);
035 this.clientService = clientService;
036 }
037
038 @Override
039 protected Class<?> findClass(final String name)
040 throws ClassNotFoundException {
041 if (log.isDebugEnabled()) {
042 log.debug("Retrieving class: " + name);
043 }
044 try {
045 return super.findClass(name);
046 } catch (final ClassNotFoundException e) {
047 // continue
048 }
049 try {
050 final byte[] bytes = clientService.findClass(name);
051 final Class<?> defineClass = defineClass(name, bytes, 0,
052 bytes.length);
053 return defineClass;
054 } catch (final RemoteException e) {
055 throw new ClassNotFoundException("remote exception", e);
056 }
057 }
058 }
059
060 public static <I> void main(final String[] args) throws RemoteException,
061 NotBoundException {
062 if (log.isInfoEnabled()) {
063 log.info("Starting JenevaClient...");
064 }
065 new JenevaClient<I>(args).start();
066 }
067
068 private final ClientService clientService;
069
070 private JenevaClient(final String[] args) throws RemoteException,
071 NotBoundException {
072 clientService = connectToClientService();
073 }
074
075 private void start() {
076 final ClassLoader contextClassLoader = Thread.currentThread()
077 .getContextClassLoader();
078
079 final int nrOfWorkers = Runtime.getRuntime().availableProcessors();
080 final ThreadGroup workerThreadGroup = new ThreadGroup("Workers");
081 for (int i = 0; i < nrOfWorkers; i++) {
082 final ClassLoader classLoader = new RemoteClassLoader(
083 clientService, contextClassLoader);
084 final Thread worker = new Thread(workerThreadGroup, this, "Worker "
085 + i);
086 worker.setContextClassLoader(classLoader);
087 worker.start();
088 }
089 }
090
091 private ClientService connectToClientService() throws RemoteException,
092 NotBoundException {
093 final String host = getSystemProperty("jeneva.host");
094 final int port = Integer.parseInt(getSystemProperty("jeneva.port"));
095 final Registry registry = LocateRegistry.getRegistry(host, port);
096
097 final ClientService clientService = (ClientService) registry
098 .lookup(ClientService.class.getSimpleName());
099
100 return clientService;
101 }
102
103 private String getSystemProperty(final String property) {
104 final String value = System.getProperty(property);
105 if (null == value) {
106 throw new NullPointerException(property
107 + " system property was not specified");
108 }
109 return value;
110 }
111
112 @Override
113 public void run() {
114 if (log.isInfoEnabled()) {
115 log.info("Worker started.");
116 }
117 try {
118 for (;;) {
119 if (log.isDebugEnabled()) {
120 log.debug("Fetching next task...");
121 }
122 final Runnable task = clientService.nextTask();
123 if (log.isTraceEnabled()) {
124 log.trace("Running task: " + task);
125 }
126 task.run();
127 }
128 } catch (final RemoteException e) {
129 if (log.isErrorEnabled()) {
130 log.error("Remote exception:", e);
131 }
132 } catch (final InterruptedException e) {
133 if (log.isWarnEnabled()) {
134 log.warn("Interrupted!");
135 }
136 } catch (final RuntimeException e) {
137 if (log.isErrorEnabled()) {
138 log.error("Runtime exception:", e);
139 }
140 } finally {
141 if (log.isInfoEnabled()) {
142 log.info("Worker finished.");
143 }
144 }
145 }
146 }