001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Map;
024 import java.util.regex.Matcher;
025 import java.util.regex.Pattern;
026
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.hadoop.io.WritableComparable;
031 import org.apache.hadoop.mapred.InputFormat;
032 import org.apache.hadoop.mapred.InputSplit;
033 import org.apache.hadoop.mapred.JobConf;
034 import org.apache.hadoop.mapred.Reporter;
035
036 /**
037 * An InputFormat capable of performing joins over a set of data sources sorted
038 * and partitioned the same way.
039 *
040 * A user may define new join types by setting the property
041 * <tt>mapred.join.define.<ident></tt> to a classname. In the expression
042 * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
043 * ComposableRecordReader.
044 * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
045 * in the join.
046 * @see #setFormat
047 * @see JoinRecordReader
048 * @see MultiFilterRecordReader
049 */
050 @InterfaceAudience.Public
051 @InterfaceStability.Stable
052 public class CompositeInputFormat<K extends WritableComparable>
053 implements ComposableInputFormat<K,TupleWritable> {
054
055 // expression parse tree to which IF requests are proxied
056 private Parser.Node root;
057
058 public CompositeInputFormat() { }
059
060
061 /**
062 * Interpret a given string as a composite expression.
063 * {@code
064 * func ::= <ident>([<func>,]*<func>)
065 * func ::= tbl(<class>,"<path>")
066 * class ::= @see java.lang.Class#forName(java.lang.String)
067 * path ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
068 * }
069 * Reads expression from the <tt>mapred.join.expr</tt> property and
070 * user-supplied join types from <tt>mapred.join.define.<ident></tt>
071 * types. Paths supplied to <tt>tbl</tt> are given as input paths to the
072 * InputFormat class listed.
073 * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
074 */
075 public void setFormat(JobConf job) throws IOException {
076 addDefaults();
077 addUserIdentifiers(job);
078 root = Parser.parse(job.get("mapred.join.expr", null), job);
079 }
080
081 /**
082 * Adds the default set of identifiers to the parser.
083 */
084 protected void addDefaults() {
085 try {
086 Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
087 Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
088 Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
089 Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
090 } catch (NoSuchMethodException e) {
091 throw new RuntimeException("FATAL: Failed to init defaults", e);
092 }
093 }
094
095 /**
096 * Inform the parser of user-defined types.
097 */
098 private void addUserIdentifiers(JobConf job) throws IOException {
099 Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
100 for (Map.Entry<String,String> kv : job) {
101 Matcher m = x.matcher(kv.getKey());
102 if (m.matches()) {
103 try {
104 Parser.CNode.addIdentifier(m.group(1),
105 job.getClass(m.group(0), null, ComposableRecordReader.class));
106 } catch (NoSuchMethodException e) {
107 throw (IOException)new IOException(
108 "Invalid define for " + m.group(1)).initCause(e);
109 }
110 }
111 }
112 }
113
114 /**
115 * Build a CompositeInputSplit from the child InputFormats by assigning the
116 * ith split from each child to the ith composite split.
117 */
118 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
119 setFormat(job);
120 job.setLong("mapred.min.split.size", Long.MAX_VALUE);
121 return root.getSplits(job, numSplits);
122 }
123
124 /**
125 * Construct a CompositeRecordReader for the children of this InputFormat
126 * as defined in the init expression.
127 * The outermost join need only be composable, not necessarily a composite.
128 * Mandating TupleWritable isn't strictly correct.
129 */
130 @SuppressWarnings("unchecked") // child types unknown
131 public ComposableRecordReader<K,TupleWritable> getRecordReader(
132 InputSplit split, JobConf job, Reporter reporter) throws IOException {
133 setFormat(job);
134 return root.getRecordReader(split, job, reporter);
135 }
136
137 /**
138 * Convenience method for constructing composite formats.
139 * Given InputFormat class (inf), path (p) return:
140 * {@code tbl(<inf>, <p>) }
141 */
142 public static String compose(Class<? extends InputFormat> inf, String path) {
143 return compose(inf.getName().intern(), path, new StringBuffer()).toString();
144 }
145
146 /**
147 * Convenience method for constructing composite formats.
148 * Given operation (op), Object class (inf), set of paths (p) return:
149 * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
150 */
151 public static String compose(String op, Class<? extends InputFormat> inf,
152 String... path) {
153 final String infname = inf.getName();
154 StringBuffer ret = new StringBuffer(op + '(');
155 for (String p : path) {
156 compose(infname, p, ret);
157 ret.append(',');
158 }
159 ret.setCharAt(ret.length() - 1, ')');
160 return ret.toString();
161 }
162
163 /**
164 * Convenience method for constructing composite formats.
165 * Given operation (op), Object class (inf), set of paths (p) return:
166 * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
167 */
168 public static String compose(String op, Class<? extends InputFormat> inf,
169 Path... path) {
170 ArrayList<String> tmp = new ArrayList<String>(path.length);
171 for (Path p : path) {
172 tmp.add(p.toString());
173 }
174 return compose(op, inf, tmp.toArray(new String[0]));
175 }
176
177 private static StringBuffer compose(String inf, String path,
178 StringBuffer sb) {
179 sb.append("tbl(" + inf + ",\"");
180 sb.append(path);
181 sb.append("\")");
182 return sb;
183 }
184
185 }