001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs.s3;
019
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.io.UnsupportedEncodingException;
023 import java.net.URI;
024 import java.net.URLDecoder;
025 import java.net.URLEncoder;
026 import java.util.Set;
027 import java.util.TreeSet;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configured;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.util.Tool;
034 import org.apache.hadoop.util.ToolRunner;
035 import org.jets3t.service.S3Service;
036 import org.jets3t.service.S3ServiceException;
037 import org.jets3t.service.ServiceException;
038 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
039 import org.jets3t.service.model.S3Bucket;
040 import org.jets3t.service.model.S3Object;
041 import org.jets3t.service.security.AWSCredentials;
042
043 /**
044 * <p>
045 * This class is a tool for migrating data from an older to a newer version
046 * of an S3 filesystem.
047 * </p>
048 * <p>
049 * All files in the filesystem are migrated by re-writing the block metadata
050 * - no datafiles are touched.
051 * </p>
052 */
053 @InterfaceAudience.Public
054 @InterfaceStability.Unstable
055 public class MigrationTool extends Configured implements Tool {
056
057 private S3Service s3Service;
058 private S3Bucket bucket;
059
060 public static void main(String[] args) throws Exception {
061 int res = ToolRunner.run(new MigrationTool(), args);
062 System.exit(res);
063 }
064
065 @Override
066 public int run(String[] args) throws Exception {
067
068 if (args.length == 0) {
069 System.err.println("Usage: MigrationTool <S3 file system URI>");
070 System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
071 ToolRunner.printGenericCommandUsage(System.err);
072 return -1;
073 }
074
075 URI uri = URI.create(args[0]);
076
077 initialize(uri);
078
079 FileSystemStore newStore = new Jets3tFileSystemStore();
080 newStore.initialize(uri, getConf());
081
082 if (get("%2F") != null) {
083 System.err.println("Current version number is [unversioned].");
084 System.err.println("Target version number is " +
085 newStore.getVersion() + ".");
086 Store oldStore = new UnversionedStore();
087 migrate(oldStore, newStore);
088 return 0;
089 } else {
090 S3Object root = get("/");
091 if (root != null) {
092 String version = (String) root.getMetadata("fs-version");
093 if (version == null) {
094 System.err.println("Can't detect version - exiting.");
095 } else {
096 String newVersion = newStore.getVersion();
097 System.err.println("Current version number is " + version + ".");
098 System.err.println("Target version number is " + newVersion + ".");
099 if (version.equals(newStore.getVersion())) {
100 System.err.println("No migration required.");
101 return 0;
102 }
103 // use version number to create Store
104 //Store oldStore = ...
105 //migrate(oldStore, newStore);
106 System.err.println("Not currently implemented.");
107 return 0;
108 }
109 }
110 System.err.println("Can't detect version - exiting.");
111 return 0;
112 }
113
114 }
115
116 public void initialize(URI uri) throws IOException {
117
118
119
120 try {
121 String accessKey = null;
122 String secretAccessKey = null;
123 String userInfo = uri.getUserInfo();
124 if (userInfo != null) {
125 int index = userInfo.indexOf(':');
126 if (index != -1) {
127 accessKey = userInfo.substring(0, index);
128 secretAccessKey = userInfo.substring(index + 1);
129 } else {
130 accessKey = userInfo;
131 }
132 }
133 if (accessKey == null) {
134 accessKey = getConf().get("fs.s3.awsAccessKeyId");
135 }
136 if (secretAccessKey == null) {
137 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
138 }
139 if (accessKey == null && secretAccessKey == null) {
140 throw new IllegalArgumentException("AWS " +
141 "Access Key ID and Secret Access Key " +
142 "must be specified as the username " +
143 "or password (respectively) of a s3 URL, " +
144 "or by setting the " +
145 "fs.s3.awsAccessKeyId or " +
146 "fs.s3.awsSecretAccessKey properties (respectively).");
147 } else if (accessKey == null) {
148 throw new IllegalArgumentException("AWS " +
149 "Access Key ID must be specified " +
150 "as the username of a s3 URL, or by setting the " +
151 "fs.s3.awsAccessKeyId property.");
152 } else if (secretAccessKey == null) {
153 throw new IllegalArgumentException("AWS " +
154 "Secret Access Key must be specified " +
155 "as the password of a s3 URL, or by setting the " +
156 "fs.s3.awsSecretAccessKey property.");
157 }
158 AWSCredentials awsCredentials =
159 new AWSCredentials(accessKey, secretAccessKey);
160 this.s3Service = new RestS3Service(awsCredentials);
161 } catch (S3ServiceException e) {
162 if (e.getCause() instanceof IOException) {
163 throw (IOException) e.getCause();
164 }
165 throw new S3Exception(e);
166 }
167 bucket = new S3Bucket(uri.getHost());
168 }
169
170 private void migrate(Store oldStore, FileSystemStore newStore)
171 throws IOException {
172 for (Path path : oldStore.listAllPaths()) {
173 INode inode = oldStore.retrieveINode(path);
174 oldStore.deleteINode(path);
175 newStore.storeINode(path, inode);
176 }
177 }
178
179 private S3Object get(String key) {
180 try {
181 return s3Service.getObject(bucket.getName(), key);
182 } catch (S3ServiceException e) {
183 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
184 return null;
185 }
186 }
187 return null;
188 }
189
190 interface Store {
191
192 Set<Path> listAllPaths() throws IOException;
193 INode retrieveINode(Path path) throws IOException;
194 void deleteINode(Path path) throws IOException;
195
196 }
197
198 class UnversionedStore implements Store {
199
200 @Override
201 public Set<Path> listAllPaths() throws IOException {
202 try {
203 String prefix = urlEncode(Path.SEPARATOR);
204 S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
205 Set<Path> prefixes = new TreeSet<Path>();
206 for (int i = 0; i < objects.length; i++) {
207 prefixes.add(keyToPath(objects[i].getKey()));
208 }
209 return prefixes;
210 } catch (S3ServiceException e) {
211 if (e.getCause() instanceof IOException) {
212 throw (IOException) e.getCause();
213 }
214 throw new S3Exception(e);
215 }
216 }
217
218 @Override
219 public void deleteINode(Path path) throws IOException {
220 delete(pathToKey(path));
221 }
222
223 private void delete(String key) throws IOException {
224 try {
225 s3Service.deleteObject(bucket, key);
226 } catch (S3ServiceException e) {
227 if (e.getCause() instanceof IOException) {
228 throw (IOException) e.getCause();
229 }
230 throw new S3Exception(e);
231 }
232 }
233
234 @Override
235 public INode retrieveINode(Path path) throws IOException {
236 return INode.deserialize(get(pathToKey(path)));
237 }
238
239 private InputStream get(String key) throws IOException {
240 try {
241 S3Object object = s3Service.getObject(bucket.getName(), key);
242 return object.getDataInputStream();
243 } catch (S3ServiceException e) {
244 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
245 return null;
246 }
247 if (e.getCause() instanceof IOException) {
248 throw (IOException) e.getCause();
249 }
250 throw new S3Exception(e);
251 } catch (ServiceException e) {
252 return null;
253 }
254 }
255
256 private String pathToKey(Path path) {
257 if (!path.isAbsolute()) {
258 throw new IllegalArgumentException("Path must be absolute: " + path);
259 }
260 return urlEncode(path.toUri().getPath());
261 }
262
263 private Path keyToPath(String key) {
264 return new Path(urlDecode(key));
265 }
266
267 private String urlEncode(String s) {
268 try {
269 return URLEncoder.encode(s, "UTF-8");
270 } catch (UnsupportedEncodingException e) {
271 // Should never happen since every implementation of the Java Platform
272 // is required to support UTF-8.
273 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
274 throw new IllegalStateException(e);
275 }
276 }
277
278 private String urlDecode(String s) {
279 try {
280 return URLDecoder.decode(s, "UTF-8");
281 } catch (UnsupportedEncodingException e) {
282 // Should never happen since every implementation of the Java Platform
283 // is required to support UTF-8.
284 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
285 throw new IllegalStateException(e);
286 }
287 }
288
289 }
290
291 }