001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.lib.service.hadoop;
020
021 import org.apache.hadoop.classification.InterfaceAudience;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
024 import org.apache.hadoop.fs.FileSystem;
025 import org.apache.hadoop.fs.Path;
026 import org.apache.hadoop.fs.permission.FsPermission;
027 import org.apache.hadoop.lib.server.BaseService;
028 import org.apache.hadoop.lib.server.ServiceException;
029 import org.apache.hadoop.lib.service.FileSystemAccess;
030 import org.apache.hadoop.lib.service.FileSystemAccessException;
031 import org.apache.hadoop.lib.service.Instrumentation;
032 import org.apache.hadoop.lib.service.Scheduler;
033 import org.apache.hadoop.lib.util.Check;
034 import org.apache.hadoop.lib.util.ConfigurationUtils;
035 import org.apache.hadoop.security.UserGroupInformation;
036 import org.apache.hadoop.util.VersionInfo;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 import java.io.File;
041 import java.io.IOException;
042 import java.net.URI;
043 import java.security.PrivilegedExceptionAction;
044 import java.util.Collection;
045 import java.util.HashSet;
046 import java.util.Map;
047 import java.util.Set;
048 import java.util.concurrent.ConcurrentHashMap;
049 import java.util.concurrent.TimeUnit;
050 import java.util.concurrent.atomic.AtomicInteger;
051
052 @InterfaceAudience.Private
053 public class FileSystemAccessService extends BaseService implements FileSystemAccess {
054 private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
055
056 public static final String PREFIX = "hadoop";
057
058 private static final String INSTRUMENTATION_GROUP = "hadoop";
059
060 public static final String AUTHENTICATION_TYPE = "authentication.type";
061 public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
062 public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
063 public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
064 public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
065
066 public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
067
068 public static final String HADOOP_CONF_DIR = "config.dir";
069
070 private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"};
071
072 private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
073
074 private static class CachedFileSystem {
075 private FileSystem fs;
076 private long lastUse;
077 private long timeout;
078 private int count;
079
080 public CachedFileSystem(long timeout) {
081 this.timeout = timeout;
082 lastUse = -1;
083 count = 0;
084 }
085
086 synchronized FileSystem getFileSytem(Configuration conf)
087 throws IOException {
088 if (fs == null) {
089 fs = FileSystem.get(conf);
090 }
091 lastUse = -1;
092 count++;
093 return fs;
094 }
095
096 synchronized void release() throws IOException {
097 count--;
098 if (count == 0) {
099 if (timeout == 0) {
100 fs.close();
101 fs = null;
102 lastUse = -1;
103 }
104 else {
105 lastUse = System.currentTimeMillis();
106 }
107 }
108 }
109
110 // to avoid race conditions in the map cache adding removing entries
111 // an entry in the cache remains forever, it just closes/opens filesystems
112 // based on their utilization. Worse case scenario, the penalty we'll
113 // pay is that the amount of entries in the cache will be the total
114 // number of users in HDFS (which seems a resonable overhead).
115 synchronized boolean purgeIfIdle() throws IOException {
116 boolean ret = false;
117 if (count == 0 && lastUse != -1 &&
118 (System.currentTimeMillis() - lastUse) > timeout) {
119 fs.close();
120 fs = null;
121 lastUse = -1;
122 ret = true;
123 }
124 return ret;
125 }
126
127 }
128
129 public FileSystemAccessService() {
130 super(PREFIX);
131 }
132
133 private Collection<String> nameNodeWhitelist;
134
135 Configuration serviceHadoopConf;
136
137 private AtomicInteger unmanagedFileSystems = new AtomicInteger();
138
139 private ConcurrentHashMap<String, CachedFileSystem> fsCache =
140 new ConcurrentHashMap<String, CachedFileSystem>();
141
142 private long purgeTimeout;
143
144 @Override
145 protected void init() throws ServiceException {
146 LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
147 String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
148 if (security.equals("kerberos")) {
149 String defaultName = getServer().getName();
150 String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
151 keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
152 if (keytab.length() == 0) {
153 throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB);
154 }
155 String principal = defaultName + "/localhost@LOCALHOST";
156 principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
157 if (principal.length() == 0) {
158 throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL);
159 }
160 Configuration conf = new Configuration();
161 conf.set("hadoop.security.authentication", "kerberos");
162 UserGroupInformation.setConfiguration(conf);
163 try {
164 UserGroupInformation.loginUserFromKeytab(principal, keytab);
165 } catch (IOException ex) {
166 throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex);
167 }
168 LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
169 } else if (security.equals("simple")) {
170 Configuration conf = new Configuration();
171 conf.set("hadoop.security.authentication", "simple");
172 UserGroupInformation.setConfiguration(conf);
173 LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
174 } else {
175 throw new ServiceException(FileSystemAccessException.ERROR.H09, security);
176 }
177
178 String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir());
179 File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile();
180 if (hadoopConfDir == null) {
181 hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile();
182 }
183 if (!hadoopConfDir.exists()) {
184 throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir);
185 }
186 try {
187 serviceHadoopConf = loadHadoopConf(hadoopConfDir);
188 } catch (IOException ex) {
189 throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex);
190 }
191
192 LOG.debug("FileSystemAccess FileSystem configuration:");
193 for (Map.Entry entry : serviceHadoopConf) {
194 LOG.debug(" {} = {}", entry.getKey(), entry.getValue());
195 }
196 setRequiredServiceHadoopConf(serviceHadoopConf);
197
198 nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
199 }
200
201 private Configuration loadHadoopConf(File dir) throws IOException {
202 Configuration hadoopConf = new Configuration(false);
203 for (String file : HADOOP_CONF_FILES) {
204 File f = new File(dir, file);
205 if (f.exists()) {
206 hadoopConf.addResource(new Path(f.getAbsolutePath()));
207 }
208 }
209 return hadoopConf;
210 }
211
212 @Override
213 public void postInit() throws ServiceException {
214 super.postInit();
215 Instrumentation instrumentation = getServer().get(Instrumentation.class);
216 instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
217 @Override
218 public Integer getValue() {
219 return unmanagedFileSystems.get();
220 }
221 });
222 instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
223 @Override
224 public Long getValue() {
225 return (long) unmanagedFileSystems.get();
226 }
227 });
228 Scheduler scheduler = getServer().get(Scheduler.class);
229 int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
230 purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
231 purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
232 if (purgeTimeout > 0) {
233 scheduler.schedule(new FileSystemCachePurger(),
234 purgeInterval, purgeInterval, TimeUnit.SECONDS);
235 }
236 }
237
238 private class FileSystemCachePurger implements Runnable {
239
240 @Override
241 public void run() {
242 int count = 0;
243 for (CachedFileSystem cacheFs : fsCache.values()) {
244 try {
245 count += cacheFs.purgeIfIdle() ? 1 : 0;
246 } catch (Throwable ex) {
247 LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
248 }
249 }
250 LOG.debug("Purged [{}} filesystem instances", count);
251 }
252 }
253
254 private Set<String> toLowerCase(Collection<String> collection) {
255 Set<String> set = new HashSet<String>();
256 for (String value : collection) {
257 set.add(value.toLowerCase());
258 }
259 return set;
260 }
261
262 @Override
263 public Class getInterface() {
264 return FileSystemAccess.class;
265 }
266
267 @Override
268 public Class[] getServiceDependencies() {
269 return new Class[]{Instrumentation.class, Scheduler.class};
270 }
271
272 protected UserGroupInformation getUGI(String user) throws IOException {
273 return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
274 }
275
276 protected void setRequiredServiceHadoopConf(Configuration conf) {
277 conf.set("fs.hdfs.impl.disable.cache", "true");
278 }
279
280 private static final String HTTPFS_FS_USER = "httpfs.fs.user";
281
282 protected FileSystem createFileSystem(Configuration namenodeConf)
283 throws IOException {
284 String user = UserGroupInformation.getCurrentUser().getShortUserName();
285 CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
286 CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
287 if (cachedFS == null) {
288 cachedFS = newCachedFS;
289 }
290 Configuration conf = new Configuration(namenodeConf);
291 conf.set(HTTPFS_FS_USER, user);
292 return cachedFS.getFileSytem(conf);
293 }
294
295 protected void closeFileSystem(FileSystem fs) throws IOException {
296 if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
297 fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
298 }
299 }
300
301 protected void validateNamenode(String namenode) throws FileSystemAccessException {
302 if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
303 if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
304 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist");
305 }
306 }
307 }
308
309 protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException {
310 }
311
312 @Override
313 public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
314 throws FileSystemAccessException {
315 Check.notEmpty(user, "user");
316 Check.notNull(conf, "conf");
317 Check.notNull(executor, "executor");
318 if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
319 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
320 }
321 if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null ||
322 conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) {
323 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06,
324 CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
325 }
326 try {
327 validateNamenode(
328 new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).
329 getAuthority());
330 UserGroupInformation ugi = getUGI(user);
331 return ugi.doAs(new PrivilegedExceptionAction<T>() {
332 @Override
333 public T run() throws Exception {
334 FileSystem fs = createFileSystem(conf);
335 Instrumentation instrumentation = getServer().get(Instrumentation.class);
336 Instrumentation.Cron cron = instrumentation.createCron();
337 try {
338 checkNameNodeHealth(fs);
339 cron.start();
340 return executor.execute(fs);
341 } finally {
342 cron.stop();
343 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
344 closeFileSystem(fs);
345 }
346 }
347 });
348 } catch (FileSystemAccessException ex) {
349 throw ex;
350 } catch (Exception ex) {
351 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex);
352 }
353 }
354
355 public FileSystem createFileSystemInternal(String user, final Configuration conf)
356 throws IOException, FileSystemAccessException {
357 Check.notEmpty(user, "user");
358 Check.notNull(conf, "conf");
359 if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
360 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
361 }
362 try {
363 validateNamenode(
364 new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority());
365 UserGroupInformation ugi = getUGI(user);
366 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
367 @Override
368 public FileSystem run() throws Exception {
369 return createFileSystem(conf);
370 }
371 });
372 } catch (IOException ex) {
373 throw ex;
374 } catch (FileSystemAccessException ex) {
375 throw ex;
376 } catch (Exception ex) {
377 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex);
378 }
379 }
380
381 @Override
382 public FileSystem createFileSystem(String user, final Configuration conf) throws IOException,
383 FileSystemAccessException {
384 unmanagedFileSystems.incrementAndGet();
385 return createFileSystemInternal(user, conf);
386 }
387
388 @Override
389 public void releaseFileSystem(FileSystem fs) throws IOException {
390 unmanagedFileSystems.decrementAndGet();
391 closeFileSystem(fs);
392 }
393
394 @Override
395 public Configuration getFileSystemConfiguration() {
396 Configuration conf = new Configuration(true);
397 ConfigurationUtils.copy(serviceHadoopConf, conf);
398 conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true);
399
400 // Force-clear server-side umask to make HttpFS match WebHDFS behavior
401 conf.set(FsPermission.UMASK_LABEL, "000");
402
403 return conf;
404 }
405
406 }