001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.lib.service.hadoop;
020    
021    import org.apache.hadoop.classification.InterfaceAudience;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
024    import org.apache.hadoop.fs.FileSystem;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.hadoop.fs.permission.FsPermission;
027    import org.apache.hadoop.lib.server.BaseService;
028    import org.apache.hadoop.lib.server.ServiceException;
029    import org.apache.hadoop.lib.service.FileSystemAccess;
030    import org.apache.hadoop.lib.service.FileSystemAccessException;
031    import org.apache.hadoop.lib.service.Instrumentation;
032    import org.apache.hadoop.lib.service.Scheduler;
033    import org.apache.hadoop.lib.util.Check;
034    import org.apache.hadoop.lib.util.ConfigurationUtils;
035    import org.apache.hadoop.security.UserGroupInformation;
036    import org.apache.hadoop.util.VersionInfo;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    import java.io.File;
041    import java.io.IOException;
042    import java.net.URI;
043    import java.security.PrivilegedExceptionAction;
044    import java.util.Collection;
045    import java.util.HashSet;
046    import java.util.Map;
047    import java.util.Set;
048    import java.util.concurrent.ConcurrentHashMap;
049    import java.util.concurrent.TimeUnit;
050    import java.util.concurrent.atomic.AtomicInteger;
051    
052    @InterfaceAudience.Private
053    public class FileSystemAccessService extends BaseService implements FileSystemAccess {
054      private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
055    
056      public static final String PREFIX = "hadoop";
057    
058      private static final String INSTRUMENTATION_GROUP = "hadoop";
059    
060      public static final String AUTHENTICATION_TYPE = "authentication.type";
061      public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
062      public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
063      public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
064      public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
065    
066      public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
067    
068      public static final String HADOOP_CONF_DIR = "config.dir";
069    
070      private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"};
071    
072      private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
073    
074      private static class CachedFileSystem {
075        private FileSystem fs;
076        private long lastUse;
077        private long timeout;
078        private int count;
079    
080        public CachedFileSystem(long timeout) {
081          this.timeout = timeout;
082          lastUse = -1;
083          count = 0;
084        }
085    
086        synchronized FileSystem getFileSytem(Configuration conf)
087          throws IOException {
088          if (fs == null) {
089            fs = FileSystem.get(conf);
090          }
091          lastUse = -1;
092          count++;
093          return fs;
094        }
095    
096        synchronized void release() throws IOException {
097          count--;
098          if (count == 0) {
099            if (timeout == 0) {
100              fs.close();
101              fs = null;
102              lastUse = -1;
103            }
104            else {
105              lastUse = System.currentTimeMillis();
106            }
107          }
108        }
109    
110        // to avoid race conditions in the map cache adding removing entries
111        // an entry in the cache remains forever, it just closes/opens filesystems
112        // based on their utilization. Worse case scenario, the penalty we'll
113        // pay is that the amount of entries in the cache will be the total
114        // number of users in HDFS (which seems a resonable overhead).
115        synchronized boolean purgeIfIdle() throws IOException {
116          boolean ret = false;
117          if (count == 0 && lastUse != -1 &&
118              (System.currentTimeMillis() - lastUse) > timeout) {
119            fs.close();
120            fs = null;
121            lastUse = -1;
122            ret = true;
123          }
124          return ret;
125        }
126    
127      }
128    
129      public FileSystemAccessService() {
130        super(PREFIX);
131      }
132    
133      private Collection<String> nameNodeWhitelist;
134    
135      Configuration serviceHadoopConf;
136    
137      private AtomicInteger unmanagedFileSystems = new AtomicInteger();
138    
139      private ConcurrentHashMap<String, CachedFileSystem> fsCache =
140        new ConcurrentHashMap<String, CachedFileSystem>();
141    
142      private long purgeTimeout;
143    
144      @Override
145      protected void init() throws ServiceException {
146        LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
147        String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
148        if (security.equals("kerberos")) {
149          String defaultName = getServer().getName();
150          String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
151          keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
152          if (keytab.length() == 0) {
153            throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB);
154          }
155          String principal = defaultName + "/localhost@LOCALHOST";
156          principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
157          if (principal.length() == 0) {
158            throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL);
159          }
160          Configuration conf = new Configuration();
161          conf.set("hadoop.security.authentication", "kerberos");
162          UserGroupInformation.setConfiguration(conf);
163          try {
164            UserGroupInformation.loginUserFromKeytab(principal, keytab);
165          } catch (IOException ex) {
166            throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex);
167          }
168          LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
169        } else if (security.equals("simple")) {
170          Configuration conf = new Configuration();
171          conf.set("hadoop.security.authentication", "simple");
172          UserGroupInformation.setConfiguration(conf);
173          LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
174        } else {
175          throw new ServiceException(FileSystemAccessException.ERROR.H09, security);
176        }
177    
178        String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir());
179        File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile();
180        if (hadoopConfDir == null) {
181          hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile();
182        }
183        if (!hadoopConfDir.exists()) {
184          throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir);
185        }
186        try {
187          serviceHadoopConf = loadHadoopConf(hadoopConfDir);
188        } catch (IOException ex) {
189          throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex);
190        }
191    
192        LOG.debug("FileSystemAccess FileSystem configuration:");
193        for (Map.Entry entry : serviceHadoopConf) {
194          LOG.debug("  {} = {}", entry.getKey(), entry.getValue());
195        }
196        setRequiredServiceHadoopConf(serviceHadoopConf);
197    
198        nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
199      }
200    
201      private Configuration loadHadoopConf(File dir) throws IOException {
202        Configuration hadoopConf = new Configuration(false);
203        for (String file : HADOOP_CONF_FILES) {
204          File f = new File(dir, file);
205          if (f.exists()) {
206            hadoopConf.addResource(new Path(f.getAbsolutePath()));
207          }
208        }
209        return hadoopConf;
210      }
211    
212      @Override
213      public void postInit() throws ServiceException {
214        super.postInit();
215        Instrumentation instrumentation = getServer().get(Instrumentation.class);
216        instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
217          @Override
218          public Integer getValue() {
219            return unmanagedFileSystems.get();
220          }
221        });
222        instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
223          @Override
224          public Long getValue() {
225            return (long) unmanagedFileSystems.get();
226          }
227        });
228        Scheduler scheduler = getServer().get(Scheduler.class);
229        int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
230        purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
231        purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
232        if (purgeTimeout > 0) {
233          scheduler.schedule(new FileSystemCachePurger(),
234                             purgeInterval, purgeInterval, TimeUnit.SECONDS);
235        }
236      }
237    
238      private class FileSystemCachePurger implements Runnable {
239    
240        @Override
241        public void run() {
242          int count = 0;
243          for (CachedFileSystem cacheFs : fsCache.values()) {
244            try {
245              count += cacheFs.purgeIfIdle() ? 1 : 0;
246            } catch (Throwable ex) {
247              LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
248            }
249          }
250          LOG.debug("Purged [{}} filesystem instances", count);
251        }
252      }
253    
254      private Set<String> toLowerCase(Collection<String> collection) {
255        Set<String> set = new HashSet<String>();
256        for (String value : collection) {
257          set.add(value.toLowerCase());
258        }
259        return set;
260      }
261    
262      @Override
263      public Class getInterface() {
264        return FileSystemAccess.class;
265      }
266    
267      @Override
268      public Class[] getServiceDependencies() {
269        return new Class[]{Instrumentation.class, Scheduler.class};
270      }
271    
272      protected UserGroupInformation getUGI(String user) throws IOException {
273        return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
274      }
275    
276      protected void setRequiredServiceHadoopConf(Configuration conf) {
277        conf.set("fs.hdfs.impl.disable.cache", "true");
278      }
279    
280      private static final String HTTPFS_FS_USER = "httpfs.fs.user";
281    
282      protected FileSystem createFileSystem(Configuration namenodeConf)
283        throws IOException {
284        String user = UserGroupInformation.getCurrentUser().getShortUserName();
285        CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
286        CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
287        if (cachedFS == null) {
288          cachedFS = newCachedFS;
289        }
290        Configuration conf = new Configuration(namenodeConf);
291        conf.set(HTTPFS_FS_USER, user);
292        return cachedFS.getFileSytem(conf);
293      }
294    
295      protected void closeFileSystem(FileSystem fs) throws IOException {
296        if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
297          fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
298        }
299      }
300    
301      protected void validateNamenode(String namenode) throws FileSystemAccessException {
302        if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
303          if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
304            throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist");
305          }
306        }
307      }
308    
309      protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException {
310      }
311    
312      @Override
313      public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
314        throws FileSystemAccessException {
315        Check.notEmpty(user, "user");
316        Check.notNull(conf, "conf");
317        Check.notNull(executor, "executor");
318        if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
319          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
320        }
321        if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null ||
322            conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) {
323          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06,
324                                              CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
325        }
326        try {
327          validateNamenode(
328            new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).
329              getAuthority());
330          UserGroupInformation ugi = getUGI(user);
331          return ugi.doAs(new PrivilegedExceptionAction<T>() {
332            @Override
333            public T run() throws Exception {
334              FileSystem fs = createFileSystem(conf);
335              Instrumentation instrumentation = getServer().get(Instrumentation.class);
336              Instrumentation.Cron cron = instrumentation.createCron();
337              try {
338                checkNameNodeHealth(fs);
339                cron.start();
340                return executor.execute(fs);
341              } finally {
342                cron.stop();
343                instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
344                closeFileSystem(fs);
345              }
346            }
347          });
348        } catch (FileSystemAccessException ex) {
349          throw ex;
350        } catch (Exception ex) {
351          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex);
352        }
353      }
354    
355      public FileSystem createFileSystemInternal(String user, final Configuration conf)
356        throws IOException, FileSystemAccessException {
357        Check.notEmpty(user, "user");
358        Check.notNull(conf, "conf");
359        if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
360          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
361        }
362        try {
363          validateNamenode(
364            new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority());
365          UserGroupInformation ugi = getUGI(user);
366          return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
367            @Override
368            public FileSystem run() throws Exception {
369              return createFileSystem(conf);
370            }
371          });
372        } catch (IOException ex) {
373          throw ex;
374        } catch (FileSystemAccessException ex) {
375          throw ex;
376        } catch (Exception ex) {
377          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex);
378        }
379      }
380    
381      @Override
382      public FileSystem createFileSystem(String user, final Configuration conf) throws IOException,
383        FileSystemAccessException {
384        unmanagedFileSystems.incrementAndGet();
385        return createFileSystemInternal(user, conf);
386      }
387    
388      @Override
389      public void releaseFileSystem(FileSystem fs) throws IOException {
390        unmanagedFileSystems.decrementAndGet();
391        closeFileSystem(fs);
392      }
393    
394      @Override
395      public Configuration getFileSystemConfiguration() {
396        Configuration conf = new Configuration(true);
397        ConfigurationUtils.copy(serviceHadoopConf, conf);
398        conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true);
399    
400        // Force-clear server-side umask to make HttpFS match WebHDFS behavior
401        conf.set(FsPermission.UMASK_LABEL, "000");
402    
403        return conf;
404      }
405    
406    }