001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.examples;
017
018 import java.io.BufferedReader;
019 import java.io.Closeable;
020 import java.io.FileDescriptor;
021 import java.io.FileInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.InputStreamReader;
025 import java.io.OutputStream;
026 import java.io.PrintWriter;
027 import java.net.InetAddress;
028 import java.net.UnknownHostException;
029 import java.nio.channels.Channels;
030 import java.nio.channels.ClosedByInterruptException;
031 import java.util.Collection;
032 import java.util.Date;
033 import java.util.List;
034 import java.util.concurrent.CopyOnWriteArrayList;
035
036 import org.apache.commons.cli.CommandLine;
037 import org.apache.commons.cli.HelpFormatter;
038 import org.apache.commons.cli.Option;
039 import org.apache.commons.cli.Options;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042 import org.apache.hadoop.conf.Configuration;
043 import org.apache.hadoop.conf.Configured;
044 import org.apache.hadoop.fs.FileSystem;
045 import org.apache.hadoop.fs.Path;
046 import org.apache.hadoop.util.GenericOptionsParser;
047 import org.apache.log4j.Level;
048 import org.apache.log4j.Logger;
049
050 /**
051 * Base class for all "executables" in the examples module. It delivers a basic
052 * infrastructure for a common workflow:
053 * <ol>
054 * <li>{@link #createOptions()}
055 * <li>{@link #processCommandLine()}
056 * <li>{@link #execute()}
057 * </ol>
058 *
059 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
060 */
061 public abstract class Example extends Configured {
062 public static final String BANNER_TEXT = " ***********************************************************************\n"
063 + " * HadoOptimizer 0.0.1-SNAPSHOT *\n"
064 + " * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java *\n"
065 + " * Licensed under the Apache License, Version 2.0 *\n"
066 + " ***********************************************************************\n"
067 + " _a, awa, _ \n"
068 + " _QQQL '?Ss,mQQQF :QQQ[ j \n"
069 + " _QQQQ@ -?QQQ@' swQWT' F \n"
070 + " .aQQQL jQQQQL ]QQQQ4mw, .QQQQw ]' \n"
071 + " .wQQQQQW =QQQQQE ]QQQQwD?? ]QQQQQ; 2 \n"
072 + " mQQQW ]QQQQQf ]QQQQ@' jQQQQQQwaaf \n"
073 + " <QQQQF )QQQQQg, ]QQQW' ]QQQQ?-'4Q[ \n"
074 + " _aaaaa _jQQQQm. )QQQQ_]QQP?' -jQQQk jQQQQw ]7 \n"
075 + " .amQQQQQW. ]QQQQ?WL, 4QQQQc ` ]QQQm $QQQQQw 2( \n"
076 + " _sQQQQQQf -?QQQa WQ ]QQQQQ, ]QQQQ[ ]QQf?WQQ[ \n"
077 + " ]QQQHQQQf ]QQQa'! <QQP]QQ( _wQW?QQ[ _mWP 4Q[ \n"
078 + " ]QQQQHQQ _m@?$Q. ]Q@F ]QQ( jQP! ]QQ[ wQP' ]QL, \n"
079 + " swQP4QQ'4Q; jQP` 3Q j@' -HQ[ _mQ? VQ( _Q@' _$QL \n"
080 + " -Qa, )QQaj@' -4$a )Qaa, 'Qma, Qmaa ~9L Qmaa ]Q6, J $QDb \n"
081 + " ***********************************************************************";
082
083 protected static final String OPTION_HELP = "h";
084 protected static final String OPTION_GUI = "gui";
085 protected static final String OPTION_LOG_LEVEL = "log";
086
087 /**
088 * Command line key in {@link Configuration}
089 */
090 public static final String COMMAND_LINE = "de.kumpe.hadooptimizer.examples.CliExamplesRunner.commandLine";
091
092 class StdInListener implements Runnable {
093 @Override
094 public void run() {
095 final BufferedReader stdIn = new BufferedReader(
096 new InputStreamReader(
097 Channels.newInputStream(new FileInputStream(
098 FileDescriptor.in).getChannel())));
099 try {
100 for (;;) {
101 System.out.println("Press q ENTER ro quit...");
102 final String line = stdIn.readLine();
103 if ("q".equalsIgnoreCase(line)) {
104 break;
105 }
106 }
107 executeShutdownHooks();
108 System.exit(1);
109 } catch (final ClosedByInterruptException e) {
110 // ignore
111 } catch (final IOException e) {
112 log.error("Error reading stdin: ", e);
113 }
114 }
115 }
116
117 final class CloseClosableHook implements Runnable {
118 private final Closeable out;
119
120 private CloseClosableHook(final Closeable out) {
121 this.out = out;
122 }
123
124 @Override
125 public void run() {
126 if (log.isInfoEnabled()) {
127 log.info("Closing results.txt");
128 }
129
130 try {
131 out.close();
132 } catch (final IOException e) {
133 log.error("Error closing Closable: ", e);
134 }
135 }
136 }
137
138 protected static final Log log = LogFactory.getLog(OptimizerExample.class);
139 protected final Collection<Runnable> shutdownHooks = new CopyOnWriteArrayList<Runnable>();
140 protected CommandLine commandLine;
141 protected final String baseDir;
142 protected PrintWriter logFile;
143
144 public Example() {
145 super(new Configuration());
146 baseDir = String.format("%s-%tY%<tm%<td-%<tH%<tM%<tS", getClass()
147 .getName(), new Date());
148 }
149
150 protected Options createOptions() throws Exception {
151 final Options options = new Options();
152
153 final Option logLevelOption = new Option(OPTION_LOG_LEVEL, true,
154 "sets the log-level of HadoOptimizer");
155 logLevelOption.setArgName("all|trace|debug|info|warn|error|fatal|off");
156 options.addOption(logLevelOption);
157
158 final Option guiOption = new Option(OPTION_GUI, false,
159 "use gui if available");
160 options.addOption(guiOption);
161
162 return options;
163 }
164
165 public void run(final String[] args) throws Exception {
166 if (log.isInfoEnabled()) {
167 log.info("\n" + BANNER_TEXT);
168 log.info("Running " + getClass().getName() + " ("
169 + getVersionInfo() + ")");
170 }
171
172 log.debug("Creating options...");
173 final Options options = createOptions();
174 if (log.isTraceEnabled()) {
175 log.trace("Created options: " + options);
176 }
177
178 log.trace("Add help-option...");
179 final Option helpOption = new Option(OPTION_HELP, "help", false,
180 "pring help");
181 options.addOption(helpOption);
182
183 log.debug("Parsing options...");
184 commandLine = new GenericOptionsParser(getConf(), options, args)
185 .getCommandLine();
186 if (log.isDebugEnabled()) {
187 log.debug("Remaining arguments: " + commandLine.getArgList());
188 }
189
190 if (commandLine.hasOption(OPTION_HELP)) {
191 printHelp(options);
192 return;
193 }
194
195 checkForUnrecognizedOption();
196
197 log.debug("Processing command-line...");
198 processCommandLine();
199
200 openLogFile();
201
202 storeArgument(args);
203
204 // starting StdInListener
205 final Thread stdInListener = new Thread(new StdInListener(),
206 "StdInListener");
207 stdInListener.start();
208
209 try {
210 log.debug("Executing example...");
211 execute();
212 } finally {
213 stdInListener.interrupt();
214 executeShutdownHooks();
215 }
216 }
217
218 private void openLogFile() throws IOException {
219 OutputStream outputStream = outputStream("results.txt");
220 try {
221 final Path s3logPath = new Path("s3://hadooptimizer/" + baseDir
222 + "/results.txt");
223 final FileSystem s3FileSystem = s3logPath.getFileSystem(getConf());
224 final OutputStream s3OutputStream = s3FileSystem.create(s3logPath);
225 outputStream = new TeeOutputStream(outputStream, s3OutputStream);
226 } catch (final Exception e) {
227 log.debug("Ignoring s3: " + e);
228 }
229 logFile = new PrintWriter(outputStream);
230 shutdownHooks.add(new CloseClosableHook(logFile));
231
232 logFile.print("# ");
233 try {
234 logFile.print(InetAddress.getLocalHost().getHostName());
235 } catch (final UnknownHostException e) {
236 logFile.print("unknown host");
237 }
238 logFile.print(" ");
239 logFile.println(new Date());
240 }
241
242 private void executeShutdownHooks() {
243 for (final Runnable shutdownHook : shutdownHooks) {
244 shutdownHook.run();
245 }
246 }
247
248 private void storeArgument(final String[] args) {
249 final StringBuilder buffer = new StringBuilder(getClass().getName());
250 for (final String arg : args) {
251 buffer.append(' ');
252 buffer.append(arg);
253 }
254 logFile.print("# ");
255 final String argumentsString = buffer.toString();
256 logFile.println(argumentsString);
257 getConf().set(COMMAND_LINE, argumentsString);
258 }
259
260 private void checkForUnrecognizedOption() {
261 @SuppressWarnings("unchecked")
262 final List<String> remainingArgs = commandLine.getArgList();
263 for (final String arg : remainingArgs) {
264 if (arg.startsWith("-")) {
265 throw new IllegalArgumentException("Unrecognized option: "
266 + arg);
267 }
268 }
269 }
270
271 protected abstract String getVersionInfo();
272
273 protected abstract void execute() throws Exception;
274
275 protected void processCommandLine() throws Exception {
276 final String logLevel = commandLine.getOptionValue(OPTION_LOG_LEVEL);
277 if (null != logLevel) {
278 final Level level = Level.toLevel(logLevel);
279 final Logger logger = Logger.getLogger("de.kumpe.hadooptimizer");
280 logger.setLevel(level);
281 }
282 }
283
284 private void printHelp(final Options options) {
285 new HelpFormatter().printHelp(Integer.MAX_VALUE,
286 "hadoop jar hadooptimizer.jar exampleClass [option...]",
287 "\navailable options (some are hadoop's general options):",
288 options,
289 "\n$Id: Example.java 3999 2011-05-06 09:13:39Z baumbart $\n");
290 }
291
292 public OutputStream outputStream(final String filename) throws IOException {
293 final Path path = new Path(baseDir, filename);
294 final FileSystem fs = path.getFileSystem(getConf());
295 return fs.create(path);
296 }
297
298 public InputStream intputStream(final String filename) throws IOException {
299 final FileSystem fs = FileSystem.get(getConf());
300 Path path = new Path(baseDir, filename);
301 if (fs.exists(path)) {
302 return fs.open(path);
303 }
304 path = new Path(getClass().getName(), filename);
305 if (fs.exists(path)) {
306 return fs.open(path);
307 }
308 return getClass().getResourceAsStream(
309 getClass().getName() + "/" + filename);
310 }
311 }