001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.examples; 017 018 import java.io.BufferedReader; 019 import java.io.Closeable; 020 import java.io.FileDescriptor; 021 import java.io.FileInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.InputStreamReader; 025 import java.io.OutputStream; 026 import java.io.PrintWriter; 027 import java.net.InetAddress; 028 import java.net.UnknownHostException; 029 import java.nio.channels.Channels; 030 import java.nio.channels.ClosedByInterruptException; 031 import java.util.Collection; 032 import java.util.Date; 033 import java.util.List; 034 import java.util.concurrent.CopyOnWriteArrayList; 035 036 import org.apache.commons.cli.CommandLine; 037 import org.apache.commons.cli.HelpFormatter; 038 import org.apache.commons.cli.Option; 039 import org.apache.commons.cli.Options; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 import org.apache.hadoop.conf.Configuration; 043 import org.apache.hadoop.conf.Configured; 044 import org.apache.hadoop.fs.FileSystem; 045 import org.apache.hadoop.fs.Path; 046 import org.apache.hadoop.util.GenericOptionsParser; 047 import org.apache.log4j.Level; 048 import org.apache.log4j.Logger; 049 050 /** 051 * Base class for all "executables" in the examples module. It delivers a basic 052 * infrastructure for a common workflow: 053 * <ol> 054 * <li>{@link #createOptions()} 055 * <li>{@link #processCommandLine()} 056 * <li>{@link #execute()} 057 * </ol> 058 * 059 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 060 */ 061 public abstract class Example extends Configured { 062 public static final String BANNER_TEXT = " ***********************************************************************\n" 063 + " * HadoOptimizer 0.0.1-SNAPSHOT *\n" 064 + " * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java *\n" 065 + " * Licensed under the Apache License, Version 2.0 *\n" 066 + " ***********************************************************************\n" 067 + " _a, awa, _ \n" 068 + " _QQQL '?Ss,mQQQF :QQQ[ j \n" 069 + " _QQQQ@ -?QQQ@' swQWT' F \n" 070 + " .aQQQL jQQQQL ]QQQQ4mw, .QQQQw ]' \n" 071 + " .wQQQQQW =QQQQQE ]QQQQwD?? ]QQQQQ; 2 \n" 072 + " mQQQW ]QQQQQf ]QQQQ@' jQQQQQQwaaf \n" 073 + " <QQQQF )QQQQQg, ]QQQW' ]QQQQ?-'4Q[ \n" 074 + " _aaaaa _jQQQQm. )QQQQ_]QQP?' -jQQQk jQQQQw ]7 \n" 075 + " .amQQQQQW. ]QQQQ?WL, 4QQQQc ` ]QQQm $QQQQQw 2( \n" 076 + " _sQQQQQQf -?QQQa WQ ]QQQQQ, ]QQQQ[ ]QQf?WQQ[ \n" 077 + " ]QQQHQQQf ]QQQa'! <QQP]QQ( _wQW?QQ[ _mWP 4Q[ \n" 078 + " ]QQQQHQQ _m@?$Q. ]Q@F ]QQ( jQP! ]QQ[ wQP' ]QL, \n" 079 + " swQP4QQ'4Q; jQP` 3Q j@' -HQ[ _mQ? VQ( _Q@' _$QL \n" 080 + " -Qa, )QQaj@' -4$a )Qaa, 'Qma, Qmaa ~9L Qmaa ]Q6, J $QDb \n" 081 + " ***********************************************************************"; 082 083 protected static final String OPTION_HELP = "h"; 084 protected static final String OPTION_GUI = "gui"; 085 protected static final String OPTION_LOG_LEVEL = "log"; 086 087 /** 088 * Command line key in {@link Configuration} 089 */ 090 public static final String COMMAND_LINE = "de.kumpe.hadooptimizer.examples.CliExamplesRunner.commandLine"; 091 092 class StdInListener implements Runnable { 093 @Override 094 public void run() { 095 final BufferedReader stdIn = new BufferedReader( 096 new InputStreamReader( 097 Channels.newInputStream(new FileInputStream( 098 FileDescriptor.in).getChannel()))); 099 try { 100 for (;;) { 101 System.out.println("Press q ENTER ro quit..."); 102 final String line = stdIn.readLine(); 103 if ("q".equalsIgnoreCase(line)) { 104 break; 105 } 106 } 107 executeShutdownHooks(); 108 System.exit(1); 109 } catch (final ClosedByInterruptException e) { 110 // ignore 111 } catch (final IOException e) { 112 log.error("Error reading stdin: ", e); 113 } 114 } 115 } 116 117 final class CloseClosableHook implements Runnable { 118 private final Closeable out; 119 120 private CloseClosableHook(final Closeable out) { 121 this.out = out; 122 } 123 124 @Override 125 public void run() { 126 if (log.isInfoEnabled()) { 127 log.info("Closing results.txt"); 128 } 129 130 try { 131 out.close(); 132 } catch (final IOException e) { 133 log.error("Error closing Closable: ", e); 134 } 135 } 136 } 137 138 protected static final Log log = LogFactory.getLog(OptimizerExample.class); 139 protected final Collection<Runnable> shutdownHooks = new CopyOnWriteArrayList<Runnable>(); 140 protected CommandLine commandLine; 141 protected final String baseDir; 142 protected PrintWriter logFile; 143 144 public Example() { 145 super(new Configuration()); 146 baseDir = String.format("%s-%tY%<tm%<td-%<tH%<tM%<tS", getClass() 147 .getName(), new Date()); 148 } 149 150 protected Options createOptions() throws Exception { 151 final Options options = new Options(); 152 153 final Option logLevelOption = new Option(OPTION_LOG_LEVEL, true, 154 "sets the log-level of HadoOptimizer"); 155 logLevelOption.setArgName("all|trace|debug|info|warn|error|fatal|off"); 156 options.addOption(logLevelOption); 157 158 final Option guiOption = new Option(OPTION_GUI, false, 159 "use gui if available"); 160 options.addOption(guiOption); 161 162 return options; 163 } 164 165 public void run(final String[] args) throws Exception { 166 if (log.isInfoEnabled()) { 167 log.info("\n" + BANNER_TEXT); 168 log.info("Running " + getClass().getName() + " (" 169 + getVersionInfo() + ")"); 170 } 171 172 log.debug("Creating options..."); 173 final Options options = createOptions(); 174 if (log.isTraceEnabled()) { 175 log.trace("Created options: " + options); 176 } 177 178 log.trace("Add help-option..."); 179 final Option helpOption = new Option(OPTION_HELP, "help", false, 180 "pring help"); 181 options.addOption(helpOption); 182 183 log.debug("Parsing options..."); 184 commandLine = new GenericOptionsParser(getConf(), options, args) 185 .getCommandLine(); 186 if (log.isDebugEnabled()) { 187 log.debug("Remaining arguments: " + commandLine.getArgList()); 188 } 189 190 if (commandLine.hasOption(OPTION_HELP)) { 191 printHelp(options); 192 return; 193 } 194 195 checkForUnrecognizedOption(); 196 197 log.debug("Processing command-line..."); 198 processCommandLine(); 199 200 openLogFile(); 201 202 storeArgument(args); 203 204 // starting StdInListener 205 final Thread stdInListener = new Thread(new StdInListener(), 206 "StdInListener"); 207 stdInListener.start(); 208 209 try { 210 log.debug("Executing example..."); 211 execute(); 212 } finally { 213 stdInListener.interrupt(); 214 executeShutdownHooks(); 215 } 216 } 217 218 private void openLogFile() throws IOException { 219 OutputStream outputStream = outputStream("results.txt"); 220 try { 221 final Path s3logPath = new Path("s3://hadooptimizer/" + baseDir 222 + "/results.txt"); 223 final FileSystem s3FileSystem = s3logPath.getFileSystem(getConf()); 224 final OutputStream s3OutputStream = s3FileSystem.create(s3logPath); 225 outputStream = new TeeOutputStream(outputStream, s3OutputStream); 226 } catch (final Exception e) { 227 log.debug("Ignoring s3: " + e); 228 } 229 logFile = new PrintWriter(outputStream); 230 shutdownHooks.add(new CloseClosableHook(logFile)); 231 232 logFile.print("# "); 233 try { 234 logFile.print(InetAddress.getLocalHost().getHostName()); 235 } catch (final UnknownHostException e) { 236 logFile.print("unknown host"); 237 } 238 logFile.print(" "); 239 logFile.println(new Date()); 240 } 241 242 private void executeShutdownHooks() { 243 for (final Runnable shutdownHook : shutdownHooks) { 244 shutdownHook.run(); 245 } 246 } 247 248 private void storeArgument(final String[] args) { 249 final StringBuilder buffer = new StringBuilder(getClass().getName()); 250 for (final String arg : args) { 251 buffer.append(' '); 252 buffer.append(arg); 253 } 254 logFile.print("# "); 255 final String argumentsString = buffer.toString(); 256 logFile.println(argumentsString); 257 getConf().set(COMMAND_LINE, argumentsString); 258 } 259 260 private void checkForUnrecognizedOption() { 261 @SuppressWarnings("unchecked") 262 final List<String> remainingArgs = commandLine.getArgList(); 263 for (final String arg : remainingArgs) { 264 if (arg.startsWith("-")) { 265 throw new IllegalArgumentException("Unrecognized option: " 266 + arg); 267 } 268 } 269 } 270 271 protected abstract String getVersionInfo(); 272 273 protected abstract void execute() throws Exception; 274 275 protected void processCommandLine() throws Exception { 276 final String logLevel = commandLine.getOptionValue(OPTION_LOG_LEVEL); 277 if (null != logLevel) { 278 final Level level = Level.toLevel(logLevel); 279 final Logger logger = Logger.getLogger("de.kumpe.hadooptimizer"); 280 logger.setLevel(level); 281 } 282 } 283 284 private void printHelp(final Options options) { 285 new HelpFormatter().printHelp(Integer.MAX_VALUE, 286 "hadoop jar hadooptimizer.jar exampleClass [option...]", 287 "\navailable options (some are hadoop's general options):", 288 options, 289 "\n$Id: Example.java 3999 2011-05-06 09:13:39Z baumbart $\n"); 290 } 291 292 public OutputStream outputStream(final String filename) throws IOException { 293 final Path path = new Path(baseDir, filename); 294 final FileSystem fs = path.getFileSystem(getConf()); 295 return fs.create(path); 296 } 297 298 public InputStream intputStream(final String filename) throws IOException { 299 final FileSystem fs = FileSystem.get(getConf()); 300 Path path = new Path(baseDir, filename); 301 if (fs.exists(path)) { 302 return fs.open(path); 303 } 304 path = new Path(getClass().getName(), filename); 305 if (fs.exists(path)) { 306 return fs.open(path); 307 } 308 return getClass().getResourceAsStream( 309 getClass().getName() + "/" + filename); 310 } 311 }