001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.examples;
017    
018    import java.io.BufferedReader;
019    import java.io.Closeable;
020    import java.io.FileDescriptor;
021    import java.io.FileInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.InputStreamReader;
025    import java.io.OutputStream;
026    import java.io.PrintWriter;
027    import java.net.InetAddress;
028    import java.net.UnknownHostException;
029    import java.nio.channels.Channels;
030    import java.nio.channels.ClosedByInterruptException;
031    import java.util.Collection;
032    import java.util.Date;
033    import java.util.List;
034    import java.util.concurrent.CopyOnWriteArrayList;
035    
036    import org.apache.commons.cli.CommandLine;
037    import org.apache.commons.cli.HelpFormatter;
038    import org.apache.commons.cli.Option;
039    import org.apache.commons.cli.Options;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    import org.apache.hadoop.conf.Configuration;
043    import org.apache.hadoop.conf.Configured;
044    import org.apache.hadoop.fs.FileSystem;
045    import org.apache.hadoop.fs.Path;
046    import org.apache.hadoop.util.GenericOptionsParser;
047    import org.apache.log4j.Level;
048    import org.apache.log4j.Logger;
049    
050    /**
051     * Base class for all "executables" in the examples module. It delivers a basic
052     * infrastructure for a common workflow:
053     * <ol>
054     * <li>{@link #createOptions()}
055     * <li>{@link #processCommandLine()}
056     * <li>{@link #execute()}
057     * </ol>
058     * 
059     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
060     */
061    public abstract class Example extends Configured {
062            public static final String BANNER_TEXT = "    ***********************************************************************\n"
063                            + "    * HadoOptimizer 0.0.1-SNAPSHOT                                        *\n"
064                            + "    * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java       *\n"
065                            + "    * Licensed under the Apache License, Version 2.0                      *\n"
066                            + "    ***********************************************************************\n"
067                            + "                                                    _a,          awa,    _ \n"
068                            + "                                     _QQQL    '?Ss,mQQQF        :QQQ[    j \n"
069                            + "                                    _QQQQ@       -?QQQ@'       swQWT'    F \n"
070                            + "                         .aQQQL    jQQQQL         ]QQQQ4mw,   .QQQQw    ]' \n"
071                            + "                       .wQQQQQW   =QQQQQE         ]QQQQwD??   ]QQQQQ;   2  \n"
072                            + "                       mQQQW      ]QQQQQf         ]QQQQ@'     jQQQQQQwaaf  \n"
073                            + "                      <QQQQF      )QQQQQg,        ]QQQW'      ]QQQQ?-'4Q[  \n"
074                            + "            _aaaaa   _jQQQQm.     )QQQQ_]QQP?'   -jQQQk       jQQQQw  ]7   \n"
075                            + "         .amQQQQQW.  ]QQQQ?WL,     4QQQQc `       ]QQQm       $QQQQQw 2(   \n"
076                            + "       _sQQQQQQf     -?QQQa WQ     ]QQQQQ,        ]QQQQ[      ]QQf?WQQ[    \n"
077                            + "       ]QQQHQQQf       ]QQQa'!    <QQP]QQ(      _wQW?QQ[     _mWP   4Q[    \n"
078                            + "        ]QQQQHQQ      _m@?$Q.    ]Q@F ]QQ(     jQP! ]QQ[    wQP'    ]QL,   \n"
079                            + "      swQP4QQ'4Q;    jQP` 3Q     j@'  -HQ[   _mQ?    VQ(   _Q@'     _$QL   \n"
080                            + "     -Qa, )QQaj@'   -4$a  )Qaa, 'Qma,   Qmaa  ~9L     Qmaa ]Q6,     J $QDb \n"
081                            + "    ***********************************************************************";
082    
083            protected static final String OPTION_HELP = "h";
084            protected static final String OPTION_GUI = "gui";
085            protected static final String OPTION_LOG_LEVEL = "log";
086    
087            /**
088             * Command line key in {@link Configuration}
089             */
090            public static final String COMMAND_LINE = "de.kumpe.hadooptimizer.examples.CliExamplesRunner.commandLine";
091    
092            class StdInListener implements Runnable {
093                    @Override
094                    public void run() {
095                            final BufferedReader stdIn = new BufferedReader(
096                                            new InputStreamReader(
097                                                            Channels.newInputStream(new FileInputStream(
098                                                                            FileDescriptor.in).getChannel())));
099                            try {
100                                    for (;;) {
101                                            System.out.println("Press q ENTER ro quit...");
102                                            final String line = stdIn.readLine();
103                                            if ("q".equalsIgnoreCase(line)) {
104                                                    break;
105                                            }
106                                    }
107                                    executeShutdownHooks();
108                                    System.exit(1);
109                            } catch (final ClosedByInterruptException e) {
110                                    // ignore
111                            } catch (final IOException e) {
112                                    log.error("Error reading stdin: ", e);
113                            }
114                    }
115            }
116    
117            final class CloseClosableHook implements Runnable {
118                    private final Closeable out;
119    
120                    private CloseClosableHook(final Closeable out) {
121                            this.out = out;
122                    }
123    
124                    @Override
125                    public void run() {
126                            if (log.isInfoEnabled()) {
127                                    log.info("Closing results.txt");
128                            }
129    
130                            try {
131                                    out.close();
132                            } catch (final IOException e) {
133                                    log.error("Error closing Closable: ", e);
134                            }
135                    }
136            }
137    
138            protected static final Log log = LogFactory.getLog(OptimizerExample.class);
139            protected final Collection<Runnable> shutdownHooks = new CopyOnWriteArrayList<Runnable>();
140            protected CommandLine commandLine;
141            protected final String baseDir;
142            protected PrintWriter logFile;
143    
144            public Example() {
145                    super(new Configuration());
146                    baseDir = String.format("%s-%tY%<tm%<td-%<tH%<tM%<tS", getClass()
147                                    .getName(), new Date());
148            }
149    
150            protected Options createOptions() throws Exception {
151                    final Options options = new Options();
152    
153                    final Option logLevelOption = new Option(OPTION_LOG_LEVEL, true,
154                                    "sets the log-level of HadoOptimizer");
155                    logLevelOption.setArgName("all|trace|debug|info|warn|error|fatal|off");
156                    options.addOption(logLevelOption);
157    
158                    final Option guiOption = new Option(OPTION_GUI, false,
159                                    "use gui if available");
160                    options.addOption(guiOption);
161    
162                    return options;
163            }
164    
165            public void run(final String[] args) throws Exception {
166                    if (log.isInfoEnabled()) {
167                            log.info("\n" + BANNER_TEXT);
168                            log.info("Running " + getClass().getName() + " ("
169                                            + getVersionInfo() + ")");
170                    }
171    
172                    log.debug("Creating options...");
173                    final Options options = createOptions();
174                    if (log.isTraceEnabled()) {
175                            log.trace("Created options: " + options);
176                    }
177    
178                    log.trace("Add help-option...");
179                    final Option helpOption = new Option(OPTION_HELP, "help", false,
180                                    "pring help");
181                    options.addOption(helpOption);
182    
183                    log.debug("Parsing options...");
184                    commandLine = new GenericOptionsParser(getConf(), options, args)
185                                    .getCommandLine();
186                    if (log.isDebugEnabled()) {
187                            log.debug("Remaining arguments: " + commandLine.getArgList());
188                    }
189    
190                    if (commandLine.hasOption(OPTION_HELP)) {
191                            printHelp(options);
192                            return;
193                    }
194    
195                    checkForUnrecognizedOption();
196    
197                    log.debug("Processing command-line...");
198                    processCommandLine();
199    
200                    openLogFile();
201    
202                    storeArgument(args);
203    
204                    // starting StdInListener
205                    final Thread stdInListener = new Thread(new StdInListener(),
206                                    "StdInListener");
207                    stdInListener.start();
208    
209                    try {
210                            log.debug("Executing example...");
211                            execute();
212                    } finally {
213                            stdInListener.interrupt();
214                            executeShutdownHooks();
215                    }
216            }
217    
218            private void openLogFile() throws IOException {
219                    OutputStream outputStream = outputStream("results.txt");
220                    try {
221                            final Path s3logPath = new Path("s3://hadooptimizer/" + baseDir
222                                            + "/results.txt");
223                            final FileSystem s3FileSystem = s3logPath.getFileSystem(getConf());
224                            final OutputStream s3OutputStream = s3FileSystem.create(s3logPath);
225                            outputStream = new TeeOutputStream(outputStream, s3OutputStream);
226                    } catch (final Exception e) {
227                            log.debug("Ignoring s3: " + e);
228                    }
229                    logFile = new PrintWriter(outputStream);
230                    shutdownHooks.add(new CloseClosableHook(logFile));
231    
232                    logFile.print("# ");
233                    try {
234                            logFile.print(InetAddress.getLocalHost().getHostName());
235                    } catch (final UnknownHostException e) {
236                            logFile.print("unknown host");
237                    }
238                    logFile.print(" ");
239                    logFile.println(new Date());
240            }
241    
242            private void executeShutdownHooks() {
243                    for (final Runnable shutdownHook : shutdownHooks) {
244                            shutdownHook.run();
245                    }
246            }
247    
248            private void storeArgument(final String[] args) {
249                    final StringBuilder buffer = new StringBuilder(getClass().getName());
250                    for (final String arg : args) {
251                            buffer.append(' ');
252                            buffer.append(arg);
253                    }
254                    logFile.print("# ");
255                    final String argumentsString = buffer.toString();
256                    logFile.println(argumentsString);
257                    getConf().set(COMMAND_LINE, argumentsString);
258            }
259    
260            private void checkForUnrecognizedOption() {
261                    @SuppressWarnings("unchecked")
262                    final List<String> remainingArgs = commandLine.getArgList();
263                    for (final String arg : remainingArgs) {
264                            if (arg.startsWith("-")) {
265                                    throw new IllegalArgumentException("Unrecognized option: "
266                                                    + arg);
267                            }
268                    }
269            }
270    
271            protected abstract String getVersionInfo();
272    
273            protected abstract void execute() throws Exception;
274    
275            protected void processCommandLine() throws Exception {
276                    final String logLevel = commandLine.getOptionValue(OPTION_LOG_LEVEL);
277                    if (null != logLevel) {
278                            final Level level = Level.toLevel(logLevel);
279                            final Logger logger = Logger.getLogger("de.kumpe.hadooptimizer");
280                            logger.setLevel(level);
281                    }
282            }
283    
284            private void printHelp(final Options options) {
285                    new HelpFormatter().printHelp(Integer.MAX_VALUE,
286                                    "hadoop jar hadooptimizer.jar exampleClass [option...]",
287                                    "\navailable options (some are hadoop's general options):",
288                                    options,
289                                    "\n$Id: Example.java 3999 2011-05-06 09:13:39Z baumbart $\n");
290            }
291    
292            public OutputStream outputStream(final String filename) throws IOException {
293                    final Path path = new Path(baseDir, filename);
294                    final FileSystem fs = path.getFileSystem(getConf());
295                    return fs.create(path);
296            }
297    
298            public InputStream intputStream(final String filename) throws IOException {
299                    final FileSystem fs = FileSystem.get(getConf());
300                    Path path = new Path(baseDir, filename);
301                    if (fs.exists(path)) {
302                            return fs.open(path);
303                    }
304                    path = new Path(getClass().getName(), filename);
305                    if (fs.exists(path)) {
306                            return fs.open(path);
307                    }
308                    return getClass().getResourceAsStream(
309                                    getClass().getName() + "/" + filename);
310            }
311    }