001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3native;
020
021 import java.io.BufferedOutputStream;
022 import java.io.EOFException;
023 import java.io.File;
024 import java.io.FileNotFoundException;
025 import java.io.FileOutputStream;
026 import java.io.IOException;
027 import java.io.InputStream;
028 import java.io.OutputStream;
029 import java.net.URI;
030 import java.security.DigestOutputStream;
031 import java.security.MessageDigest;
032 import java.security.NoSuchAlgorithmException;
033 import java.util.ArrayList;
034 import java.util.HashMap;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Set;
038 import java.util.TreeSet;
039 import java.util.concurrent.TimeUnit;
040
041 import com.google.common.base.Preconditions;
042 import org.apache.hadoop.classification.InterfaceAudience;
043 import org.apache.hadoop.classification.InterfaceStability;
044 import org.apache.hadoop.conf.Configuration;
045 import org.apache.hadoop.fs.BufferedFSInputStream;
046 import org.apache.hadoop.fs.FSDataInputStream;
047 import org.apache.hadoop.fs.FSDataOutputStream;
048 import org.apache.hadoop.fs.FSExceptionMessages;
049 import org.apache.hadoop.fs.FSInputStream;
050 import org.apache.hadoop.fs.FileAlreadyExistsException;
051 import org.apache.hadoop.fs.FileStatus;
052 import org.apache.hadoop.fs.FileSystem;
053 import org.apache.hadoop.fs.LocalDirAllocator;
054 import org.apache.hadoop.fs.Path;
055 import org.apache.hadoop.fs.permission.FsPermission;
056 import org.apache.hadoop.fs.s3.S3Exception;
057 import org.apache.hadoop.io.retry.RetryPolicies;
058 import org.apache.hadoop.io.retry.RetryPolicy;
059 import org.apache.hadoop.io.retry.RetryProxy;
060 import org.apache.hadoop.util.Progressable;
061 import org.slf4j.Logger;
062 import org.slf4j.LoggerFactory;
063
064 /**
065 * <p>
066 * A {@link FileSystem} for reading and writing files stored on
067 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
068 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
069 * stores files on S3 in their
070 * native form so they can be read by other S3 tools.
071 *
072 * A note about directories. S3 of course has no "native" support for them.
073 * The idiom we choose then is: for any directory created by this class,
074 * we use an empty object "#{dirpath}_$folder$" as a marker.
075 * Further, to interoperate with other S3 tools, we also accept the following:
076 * - an object "#{dirpath}/' denoting a directory marker
077 * - if there exists any objects with the prefix "#{dirpath}/", then the
078 * directory is said to exist
079 * - if both a file with the name of a directory and a marker for that
080 * directory exists, then the *file masks the directory*, and the directory
081 * is never returned.
082 * </p>
083 * @see org.apache.hadoop.fs.s3.S3FileSystem
084 */
085 @InterfaceAudience.Public
086 @InterfaceStability.Stable
087 public class NativeS3FileSystem extends FileSystem {
088
089 public static final Logger LOG =
090 LoggerFactory.getLogger(NativeS3FileSystem.class);
091
092 private static final String FOLDER_SUFFIX = "_$folder$";
093 static final String PATH_DELIMITER = Path.SEPARATOR;
094 private static final int S3_MAX_LISTING_LENGTH = 1000;
095
096 static class NativeS3FsInputStream extends FSInputStream {
097
098 private NativeFileSystemStore store;
099 private Statistics statistics;
100 private InputStream in;
101 private final String key;
102 private long pos = 0;
103
104 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
105 Preconditions.checkNotNull(in, "Null input stream");
106 this.store = store;
107 this.statistics = statistics;
108 this.in = in;
109 this.key = key;
110 }
111
112 @Override
113 public synchronized int read() throws IOException {
114 int result;
115 try {
116 result = in.read();
117 } catch (IOException e) {
118 LOG.info("Received IOException while reading '{}', attempting to reopen",
119 key);
120 LOG.debug("{}", e, e);
121 try {
122 seek(pos);
123 result = in.read();
124 } catch (EOFException eof) {
125 LOG.debug("EOF on input stream read: {}", eof, eof);
126 result = -1;
127 }
128 }
129 if (result != -1) {
130 pos++;
131 }
132 if (statistics != null && result != -1) {
133 statistics.incrementBytesRead(1);
134 }
135 return result;
136 }
137 @Override
138 public synchronized int read(byte[] b, int off, int len)
139 throws IOException {
140 if (in == null) {
141 throw new EOFException("Cannot read closed stream");
142 }
143 int result = -1;
144 try {
145 result = in.read(b, off, len);
146 } catch (EOFException eof) {
147 throw eof;
148 } catch (IOException e) {
149 LOG.info( "Received IOException while reading '{}'," +
150 " attempting to reopen.", key);
151 seek(pos);
152 result = in.read(b, off, len);
153 }
154 if (result > 0) {
155 pos += result;
156 }
157 if (statistics != null && result > 0) {
158 statistics.incrementBytesRead(result);
159 }
160 return result;
161 }
162
163 @Override
164 public synchronized void close() throws IOException {
165 closeInnerStream();
166 }
167
168 /**
169 * Close the inner stream if not null. Even if an exception
170 * is raised during the close, the field is set to null
171 * @throws IOException if raised by the close() operation.
172 */
173 private void closeInnerStream() throws IOException {
174 if (in != null) {
175 try {
176 in.close();
177 } finally {
178 in = null;
179 }
180 }
181 }
182
183 /**
184 * Update inner stream with a new stream and position
185 * @param newStream new stream -must not be null
186 * @param newpos new position
187 * @throws IOException IO exception on a failure to close the existing
188 * stream.
189 */
190 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
191 Preconditions.checkNotNull(newStream, "Null newstream argument");
192 closeInnerStream();
193 in = newStream;
194 this.pos = newpos;
195 }
196
197 @Override
198 public synchronized void seek(long newpos) throws IOException {
199 if (newpos < 0) {
200 throw new EOFException(
201 FSExceptionMessages.NEGATIVE_SEEK);
202 }
203 if (pos != newpos) {
204 // the seek is attempting to move the current position
205 LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
206 InputStream newStream = store.retrieve(key, newpos);
207 updateInnerStream(newStream, newpos);
208 }
209 }
210
211 @Override
212 public synchronized long getPos() throws IOException {
213 return pos;
214 }
215 @Override
216 public boolean seekToNewSource(long targetPos) throws IOException {
217 return false;
218 }
219 }
220
221 private class NativeS3FsOutputStream extends OutputStream {
222
223 private Configuration conf;
224 private String key;
225 private File backupFile;
226 private OutputStream backupStream;
227 private MessageDigest digest;
228 private boolean closed;
229 private LocalDirAllocator lDirAlloc;
230
231 public NativeS3FsOutputStream(Configuration conf,
232 NativeFileSystemStore store, String key, Progressable progress,
233 int bufferSize) throws IOException {
234 this.conf = conf;
235 this.key = key;
236 this.backupFile = newBackupFile();
237 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
238 try {
239 this.digest = MessageDigest.getInstance("MD5");
240 this.backupStream = new BufferedOutputStream(new DigestOutputStream(
241 new FileOutputStream(backupFile), this.digest));
242 } catch (NoSuchAlgorithmException e) {
243 LOG.warn("Cannot load MD5 digest algorithm," +
244 "skipping message integrity check.", e);
245 this.backupStream = new BufferedOutputStream(
246 new FileOutputStream(backupFile));
247 }
248 }
249
250 private File newBackupFile() throws IOException {
251 if (lDirAlloc == null) {
252 lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
253 }
254 File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
255 result.deleteOnExit();
256 return result;
257 }
258
259 @Override
260 public void flush() throws IOException {
261 backupStream.flush();
262 }
263
264 @Override
265 public synchronized void close() throws IOException {
266 if (closed) {
267 return;
268 }
269
270 backupStream.close();
271 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
272
273 try {
274 byte[] md5Hash = digest == null ? null : digest.digest();
275 store.storeFile(key, backupFile, md5Hash);
276 } finally {
277 if (!backupFile.delete()) {
278 LOG.warn("Could not delete temporary s3n file: " + backupFile);
279 }
280 super.close();
281 closed = true;
282 }
283 LOG.info("OutputStream for key '{}' upload complete", key);
284 }
285
286 @Override
287 public void write(int b) throws IOException {
288 backupStream.write(b);
289 }
290
291 @Override
292 public void write(byte[] b, int off, int len) throws IOException {
293 backupStream.write(b, off, len);
294 }
295 }
296
297 private URI uri;
298 private NativeFileSystemStore store;
299 private Path workingDir;
300
301 public NativeS3FileSystem() {
302 // set store in initialize()
303 }
304
305 public NativeS3FileSystem(NativeFileSystemStore store) {
306 this.store = store;
307 }
308
309 /**
310 * Return the protocol scheme for the FileSystem.
311 * <p/>
312 *
313 * @return <code>s3n</code>
314 */
315 @Override
316 public String getScheme() {
317 return "s3n";
318 }
319
320 @Override
321 public void initialize(URI uri, Configuration conf) throws IOException {
322 super.initialize(uri, conf);
323 if (store == null) {
324 store = createDefaultStore(conf);
325 }
326 store.initialize(uri, conf);
327 setConf(conf);
328 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
329 this.workingDir =
330 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
331 }
332
333 private static NativeFileSystemStore createDefaultStore(Configuration conf) {
334 NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
335
336 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
337 conf.getInt("fs.s3.maxRetries", 4),
338 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
339 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
340 new HashMap<Class<? extends Exception>, RetryPolicy>();
341 exceptionToPolicyMap.put(IOException.class, basePolicy);
342 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
343
344 RetryPolicy methodPolicy = RetryPolicies.retryByException(
345 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
346 Map<String, RetryPolicy> methodNameToPolicyMap =
347 new HashMap<String, RetryPolicy>();
348 methodNameToPolicyMap.put("storeFile", methodPolicy);
349 methodNameToPolicyMap.put("rename", methodPolicy);
350
351 return (NativeFileSystemStore)
352 RetryProxy.create(NativeFileSystemStore.class, store,
353 methodNameToPolicyMap);
354 }
355
356 private static String pathToKey(Path path) {
357 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
358 // allow uris without trailing slash after bucket to refer to root,
359 // like s3n://mybucket
360 return "";
361 }
362 if (!path.isAbsolute()) {
363 throw new IllegalArgumentException("Path must be absolute: " + path);
364 }
365 String ret = path.toUri().getPath().substring(1); // remove initial slash
366 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
367 ret = ret.substring(0, ret.length() -1);
368 }
369 return ret;
370 }
371
372 private static Path keyToPath(String key) {
373 return new Path("/" + key);
374 }
375
376 private Path makeAbsolute(Path path) {
377 if (path.isAbsolute()) {
378 return path;
379 }
380 return new Path(workingDir, path);
381 }
382
383 /** This optional operation is not yet supported. */
384 @Override
385 public FSDataOutputStream append(Path f, int bufferSize,
386 Progressable progress) throws IOException {
387 throw new IOException("Not supported");
388 }
389
390 @Override
391 public FSDataOutputStream create(Path f, FsPermission permission,
392 boolean overwrite, int bufferSize, short replication, long blockSize,
393 Progressable progress) throws IOException {
394
395 if (exists(f) && !overwrite) {
396 throw new FileAlreadyExistsException("File already exists: " + f);
397 }
398
399 if(LOG.isDebugEnabled()) {
400 LOG.debug("Creating new file '" + f + "' in S3");
401 }
402 Path absolutePath = makeAbsolute(f);
403 String key = pathToKey(absolutePath);
404 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
405 key, progress, bufferSize), statistics);
406 }
407
408 @Override
409 public boolean delete(Path f, boolean recurse) throws IOException {
410 FileStatus status;
411 try {
412 status = getFileStatus(f);
413 } catch (FileNotFoundException e) {
414 if(LOG.isDebugEnabled()) {
415 LOG.debug("Delete called for '" + f +
416 "' but file does not exist, so returning false");
417 }
418 return false;
419 }
420 Path absolutePath = makeAbsolute(f);
421 String key = pathToKey(absolutePath);
422 if (status.isDirectory()) {
423 if (!recurse && listStatus(f).length > 0) {
424 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
425 }
426
427 createParent(f);
428
429 if(LOG.isDebugEnabled()) {
430 LOG.debug("Deleting directory '" + f + "'");
431 }
432 String priorLastKey = null;
433 do {
434 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
435 for (FileMetadata file : listing.getFiles()) {
436 store.delete(file.getKey());
437 }
438 priorLastKey = listing.getPriorLastKey();
439 } while (priorLastKey != null);
440
441 try {
442 store.delete(key + FOLDER_SUFFIX);
443 } catch (FileNotFoundException e) {
444 //this is fine, we don't require a marker
445 }
446 } else {
447 if(LOG.isDebugEnabled()) {
448 LOG.debug("Deleting file '" + f + "'");
449 }
450 createParent(f);
451 store.delete(key);
452 }
453 return true;
454 }
455
456 @Override
457 public FileStatus getFileStatus(Path f) throws IOException {
458 Path absolutePath = makeAbsolute(f);
459 String key = pathToKey(absolutePath);
460
461 if (key.length() == 0) { // root always exists
462 return newDirectory(absolutePath);
463 }
464
465 if(LOG.isDebugEnabled()) {
466 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
467 }
468 FileMetadata meta = store.retrieveMetadata(key);
469 if (meta != null) {
470 if(LOG.isDebugEnabled()) {
471 LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
472 }
473 return newFile(meta, absolutePath);
474 }
475 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
476 if(LOG.isDebugEnabled()) {
477 LOG.debug("getFileStatus returning 'directory' for key '" + key +
478 "' as '" + key + FOLDER_SUFFIX + "' exists");
479 }
480 return newDirectory(absolutePath);
481 }
482
483 if(LOG.isDebugEnabled()) {
484 LOG.debug("getFileStatus listing key '" + key + "'");
485 }
486 PartialListing listing = store.list(key, 1);
487 if (listing.getFiles().length > 0 ||
488 listing.getCommonPrefixes().length > 0) {
489 if(LOG.isDebugEnabled()) {
490 LOG.debug("getFileStatus returning 'directory' for key '" + key +
491 "' as it has contents");
492 }
493 return newDirectory(absolutePath);
494 }
495
496 if(LOG.isDebugEnabled()) {
497 LOG.debug("getFileStatus could not find key '" + key + "'");
498 }
499 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
500 }
501
502 @Override
503 public URI getUri() {
504 return uri;
505 }
506
507 /**
508 * <p>
509 * If <code>f</code> is a file, this method will make a single call to S3.
510 * If <code>f</code> is a directory, this method will make a maximum of
511 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
512 * files and directories contained directly in <code>f</code>.
513 * </p>
514 */
515 @Override
516 public FileStatus[] listStatus(Path f) throws IOException {
517
518 Path absolutePath = makeAbsolute(f);
519 String key = pathToKey(absolutePath);
520
521 if (key.length() > 0) {
522 FileMetadata meta = store.retrieveMetadata(key);
523 if (meta != null) {
524 return new FileStatus[] { newFile(meta, absolutePath) };
525 }
526 }
527
528 URI pathUri = absolutePath.toUri();
529 Set<FileStatus> status = new TreeSet<FileStatus>();
530 String priorLastKey = null;
531 do {
532 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
533 for (FileMetadata fileMetadata : listing.getFiles()) {
534 Path subpath = keyToPath(fileMetadata.getKey());
535 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
536
537 if (fileMetadata.getKey().equals(key + "/")) {
538 // this is just the directory we have been asked to list
539 }
540 else if (relativePath.endsWith(FOLDER_SUFFIX)) {
541 status.add(newDirectory(new Path(
542 absolutePath,
543 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
544 }
545 else {
546 status.add(newFile(fileMetadata, subpath));
547 }
548 }
549 for (String commonPrefix : listing.getCommonPrefixes()) {
550 Path subpath = keyToPath(commonPrefix);
551 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
552 status.add(newDirectory(new Path(absolutePath, relativePath)));
553 }
554 priorLastKey = listing.getPriorLastKey();
555 } while (priorLastKey != null);
556
557 if (status.isEmpty() &&
558 key.length() > 0 &&
559 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
560 throw new FileNotFoundException("File " + f + " does not exist.");
561 }
562
563 return status.toArray(new FileStatus[status.size()]);
564 }
565
566 private FileStatus newFile(FileMetadata meta, Path path) {
567 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
568 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
569 }
570
571 private FileStatus newDirectory(Path path) {
572 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
573 }
574
575 @Override
576 public boolean mkdirs(Path f, FsPermission permission) throws IOException {
577 Path absolutePath = makeAbsolute(f);
578 List<Path> paths = new ArrayList<Path>();
579 do {
580 paths.add(0, absolutePath);
581 absolutePath = absolutePath.getParent();
582 } while (absolutePath != null);
583
584 boolean result = true;
585 for (Path path : paths) {
586 result &= mkdir(path);
587 }
588 return result;
589 }
590
591 private boolean mkdir(Path f) throws IOException {
592 try {
593 FileStatus fileStatus = getFileStatus(f);
594 if (fileStatus.isFile()) {
595 throw new FileAlreadyExistsException(String.format(
596 "Can't make directory for path '%s' since it is a file.", f));
597
598 }
599 } catch (FileNotFoundException e) {
600 if(LOG.isDebugEnabled()) {
601 LOG.debug("Making dir '" + f + "' in S3");
602 }
603 String key = pathToKey(f) + FOLDER_SUFFIX;
604 store.storeEmptyFile(key);
605 }
606 return true;
607 }
608
609 @Override
610 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
611 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
612 if (fs.isDirectory()) {
613 throw new FileNotFoundException("'" + f + "' is a directory");
614 }
615 LOG.info("Opening '" + f + "' for reading");
616 Path absolutePath = makeAbsolute(f);
617 String key = pathToKey(absolutePath);
618 return new FSDataInputStream(new BufferedFSInputStream(
619 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
620 }
621
622 // rename() and delete() use this method to ensure that the parent directory
623 // of the source does not vanish.
624 private void createParent(Path path) throws IOException {
625 Path parent = path.getParent();
626 if (parent != null) {
627 String key = pathToKey(makeAbsolute(parent));
628 if (key.length() > 0) {
629 store.storeEmptyFile(key + FOLDER_SUFFIX);
630 }
631 }
632 }
633
634
635 @Override
636 public boolean rename(Path src, Path dst) throws IOException {
637
638 String srcKey = pathToKey(makeAbsolute(src));
639
640 if (srcKey.length() == 0) {
641 // Cannot rename root of file system
642 return false;
643 }
644
645 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
646
647 // Figure out the final destination
648 String dstKey;
649 try {
650 boolean dstIsFile = getFileStatus(dst).isFile();
651 if (dstIsFile) {
652 if(LOG.isDebugEnabled()) {
653 LOG.debug(debugPreamble +
654 "returning false as dst is an already existing file");
655 }
656 return false;
657 } else {
658 if(LOG.isDebugEnabled()) {
659 LOG.debug(debugPreamble + "using dst as output directory");
660 }
661 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
662 }
663 } catch (FileNotFoundException e) {
664 if(LOG.isDebugEnabled()) {
665 LOG.debug(debugPreamble + "using dst as output destination");
666 }
667 dstKey = pathToKey(makeAbsolute(dst));
668 try {
669 if (getFileStatus(dst.getParent()).isFile()) {
670 if(LOG.isDebugEnabled()) {
671 LOG.debug(debugPreamble +
672 "returning false as dst parent exists and is a file");
673 }
674 return false;
675 }
676 } catch (FileNotFoundException ex) {
677 if(LOG.isDebugEnabled()) {
678 LOG.debug(debugPreamble +
679 "returning false as dst parent does not exist");
680 }
681 return false;
682 }
683 }
684
685 boolean srcIsFile;
686 try {
687 srcIsFile = getFileStatus(src).isFile();
688 } catch (FileNotFoundException e) {
689 if(LOG.isDebugEnabled()) {
690 LOG.debug(debugPreamble + "returning false as src does not exist");
691 }
692 return false;
693 }
694 if (srcIsFile) {
695 if(LOG.isDebugEnabled()) {
696 LOG.debug(debugPreamble +
697 "src is file, so doing copy then delete in S3");
698 }
699 store.copy(srcKey, dstKey);
700 store.delete(srcKey);
701 } else {
702 if(LOG.isDebugEnabled()) {
703 LOG.debug(debugPreamble + "src is directory, so copying contents");
704 }
705 store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
706
707 List<String> keysToDelete = new ArrayList<String>();
708 String priorLastKey = null;
709 do {
710 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
711 for (FileMetadata file : listing.getFiles()) {
712 keysToDelete.add(file.getKey());
713 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
714 }
715 priorLastKey = listing.getPriorLastKey();
716 } while (priorLastKey != null);
717
718 if(LOG.isDebugEnabled()) {
719 LOG.debug(debugPreamble +
720 "all files in src copied, now removing src files");
721 }
722 for (String key: keysToDelete) {
723 store.delete(key);
724 }
725
726 try {
727 store.delete(srcKey + FOLDER_SUFFIX);
728 } catch (FileNotFoundException e) {
729 //this is fine, we don't require a marker
730 }
731 if(LOG.isDebugEnabled()) {
732 LOG.debug(debugPreamble + "done");
733 }
734 }
735
736 return true;
737 }
738
739 @Override
740 public long getDefaultBlockSize() {
741 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
742 }
743
744 /**
745 * Set the working directory to the given directory.
746 */
747 @Override
748 public void setWorkingDirectory(Path newDir) {
749 workingDir = newDir;
750 }
751
752 @Override
753 public Path getWorkingDirectory() {
754 return workingDir;
755 }
756
757 @Override
758 public String getCanonicalServiceName() {
759 // Does not support Token
760 return null;
761 }
762 }