001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client.api;
020
021 import java.util.concurrent.ConcurrentHashMap;
022
023 import org.apache.hadoop.classification.InterfaceAudience.Private;
024 import org.apache.hadoop.classification.InterfaceAudience.Public;
025 import org.apache.hadoop.classification.InterfaceStability.Evolving;
026 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
027 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
028 import org.apache.hadoop.yarn.api.records.Token;
029 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
030 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
031
032 import com.google.common.annotations.VisibleForTesting;
033
034 /**
035 * NMTokenCache manages NMTokens required for an Application Master
036 * communicating with individual NodeManagers.
037 * <p/>
038 * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
039 * {@link #getSingleton()} instance of the cache.
040 * <ul>
041 * <li>Using the singleton instance of the cache is appropriate when running a
042 * single ApplicationMaster in the same JVM.</li>
043 * <li>When using the singleton, users don't need to do anything special,
044 * {@link AMRMClient} and {@link NMClient} are already set up to use the default
045 * singleton {@link NMTokenCache}</li>
046 * </ul>
047 * <p/>
048 * If running multiple Application Masters in the same JVM, a different cache
049 * instance should be used for each Application Master.
050 * <p/>
051 * <ul>
052 * <li>
053 * If using the {@link AMRMClient} and the {@link NMClient}, setting up and using
054 * an instance cache is as follows:
055 * <p/>
056 *
057 * <pre>
058 * NMTokenCache nmTokenCache = new NMTokenCache();
059 * AMRMClient rmClient = AMRMClient.createAMRMClient();
060 * NMClient nmClient = NMClient.createNMClient();
061 * nmClient.setNMTokenCache(nmTokenCache);
062 * ...
063 * </pre>
064 * </li>
065 * <li>
066 * If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up
067 * and using an instance cache is as follows:
068 * <p/>
069 *
070 * <pre>
071 * NMTokenCache nmTokenCache = new NMTokenCache();
072 * AMRMClient rmClient = AMRMClient.createAMRMClient();
073 * NMClient nmClient = NMClient.createNMClient();
074 * nmClient.setNMTokenCache(nmTokenCache);
075 * AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
076 * NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
077 * ...
078 * </pre>
079 * </li>
080 * <li>
081 * If using {@link ApplicationMasterProtocol} and
082 * {@link ContainerManagementProtocol} directly, setting up and using an
083 * instance cache is as follows:
084 * <p/>
085 *
086 * <pre>
087 * NMTokenCache nmTokenCache = new NMTokenCache();
088 * ...
089 * ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
090 * ...
091 * AllocateRequest allocateRequest = ...
092 * ...
093 * AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
094 * for (NMToken token : allocateResponse.getNMTokens()) {
095 * nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
096 * }
097 * ...
098 * ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
099 * ...
100 * nmPro.startContainer(container, containerContext);
101 * ...
102 * </pre>
103 * </li>
104 * </ul>
105 * It is also possible to mix the usage of a client (<code>AMRMClient</code> or
106 * <code>NMClient</code>, or the async versions of them) with a protocol proxy (
107 * <code>ContainerManagementProtocolProxy</code> or
108 * <code>ApplicationMasterProtocol</code>).
109 */
110 @Public
111 @Evolving
112 public class NMTokenCache {
113 private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
114
115 /**
116 * Returns the singleton NM token cache.
117 *
118 * @return the singleton NM token cache.
119 */
120 public static NMTokenCache getSingleton() {
121 return NM_TOKEN_CACHE;
122 }
123
124 /**
125 * Returns NMToken, null if absent. Only the singleton obtained from
126 * {@link #getSingleton()} is looked at for the tokens. If you are using your
127 * own NMTokenCache that is different from the singleton, use
128 * {@link #getToken(String) }
129 *
130 * @param nodeAddr
131 * @return {@link Token} NMToken required for communicating with node manager
132 */
133 @Public
134 public static Token getNMToken(String nodeAddr) {
135 return NM_TOKEN_CACHE.getToken(nodeAddr);
136 }
137
138 /**
139 * Sets the NMToken for node address only in the singleton obtained from
140 * {@link #getSingleton()}. If you are using your own NMTokenCache that is
141 * different from the singleton, use {@link #setToken(String, Token) }
142 *
143 * @param nodeAddr
144 * node address (host:port)
145 * @param token
146 * NMToken
147 */
148 @Public
149 public static void setNMToken(String nodeAddr, Token token) {
150 NM_TOKEN_CACHE.setToken(nodeAddr, token);
151 }
152
153 private ConcurrentHashMap<String, Token> nmTokens;
154
155 /**
156 * Creates a NM token cache instance.
157 */
158 public NMTokenCache() {
159 nmTokens = new ConcurrentHashMap<String, Token>();
160 }
161
162 /**
163 * Returns NMToken, null if absent
164 * @param nodeAddr
165 * @return {@link Token} NMToken required for communicating with node
166 * manager
167 */
168 @Public
169 @Evolving
170 public Token getToken(String nodeAddr) {
171 return nmTokens.get(nodeAddr);
172 }
173
174 /**
175 * Sets the NMToken for node address
176 * @param nodeAddr node address (host:port)
177 * @param token NMToken
178 */
179 @Public
180 @Evolving
181 public void setToken(String nodeAddr, Token token) {
182 nmTokens.put(nodeAddr, token);
183 }
184
185 /**
186 * Returns true if NMToken is present in cache.
187 */
188 @Private
189 @VisibleForTesting
190 public boolean containsToken(String nodeAddr) {
191 return nmTokens.containsKey(nodeAddr);
192 }
193
194 /**
195 * Returns the number of NMTokens present in cache.
196 */
197 @Private
198 @VisibleForTesting
199 public int numberOfTokensInCache() {
200 return nmTokens.size();
201 }
202
203 /**
204 * Removes NMToken for specified node manager
205 * @param nodeAddr node address (host:port)
206 */
207 @Private
208 @VisibleForTesting
209 public void removeToken(String nodeAddr) {
210 nmTokens.remove(nodeAddr);
211 }
212
213 /**
214 * It will remove all the nm tokens from its cache
215 */
216 @Private
217 @VisibleForTesting
218 public void clearCache() {
219 nmTokens.clear();
220 }
221 }