001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client.api;
020
021 import java.io.IOException;
022 import java.util.Collection;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceAudience.Private;
029 import org.apache.hadoop.classification.InterfaceAudience.Public;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.service.AbstractService;
032 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
033 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
034 import org.apache.hadoop.yarn.api.records.ContainerId;
035 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
036 import org.apache.hadoop.yarn.api.records.Priority;
037 import org.apache.hadoop.yarn.api.records.Resource;
038 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
039 import org.apache.hadoop.yarn.exceptions.YarnException;
040
041 import com.google.common.base.Preconditions;
042 import com.google.common.base.Supplier;
043 import com.google.common.collect.ImmutableList;
044
045 @InterfaceAudience.Public
046 @InterfaceStability.Stable
047 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
048 AbstractService {
049 private static final Log LOG = LogFactory.getLog(AMRMClient.class);
050
051 /**
052 * Create a new instance of AMRMClient.
053 * For usage:
054 * <pre>
055 * {@code
056 * AMRMClient.<T>createAMRMClientContainerRequest()
057 * }</pre>
058 * @return the newly create AMRMClient instance.
059 */
060 @Public
061 public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
062 AMRMClient<T> client = new AMRMClientImpl<T>();
063 return client;
064 }
065
066 private NMTokenCache nmTokenCache;
067
068 @Private
069 protected AMRMClient(String name) {
070 super(name);
071 nmTokenCache = NMTokenCache.getSingleton();
072 }
073
074 /**
075 * Object to represent a single container request for resources. Scheduler
076 * documentation should be consulted for the specifics of how the parameters
077 * are honored.
078 *
079 * By default, YARN schedulers try to allocate containers at the requested
080 * locations but they may relax the constraints in order to expedite meeting
081 * allocations limits. They first relax the constraint to the same rack as the
082 * requested node and then to anywhere in the cluster. The relaxLocality flag
083 * may be used to disable locality relaxation and request containers at only
084 * specific locations. The following conditions apply.
085 * <ul>
086 * <li>Within a priority, all container requests must have the same value for
087 * locality relaxation. Either enabled or disabled.</li>
088 * <li>If locality relaxation is disabled, then across requests, locations at
089 * different network levels may not be specified. E.g. its invalid to make a
090 * request for a specific node and another request for a specific rack.</li>
091 * <li>If locality relaxation is disabled, then only within the same request,
092 * a node and its rack may be specified together. This allows for a specific
093 * rack with a preference for a specific node within that rack.</li>
094 * <li></li>
095 * </ul>
096 * To re-enable locality relaxation at a given priority, all pending requests
097 * with locality relaxation disabled must be first removed. Then they can be
098 * added back with locality relaxation enabled.
099 *
100 * All getters return immutable values.
101 */
102 public static class ContainerRequest {
103 final Resource capability;
104 final List<String> nodes;
105 final List<String> racks;
106 final Priority priority;
107 final boolean relaxLocality;
108 final String nodeLabelsExpression;
109
110 /**
111 * Instantiates a {@link ContainerRequest} with the given constraints and
112 * locality relaxation enabled.
113 *
114 * @param capability
115 * The {@link Resource} to be requested for each container.
116 * @param nodes
117 * Any hosts to request that the containers are placed on.
118 * @param racks
119 * Any racks to request that the containers are placed on. The
120 * racks corresponding to any hosts requested will be automatically
121 * added to this list.
122 * @param priority
123 * The priority at which to request the containers. Higher
124 * priorities have lower numerical values.
125 */
126 public ContainerRequest(Resource capability, String[] nodes,
127 String[] racks, Priority priority) {
128 this(capability, nodes, racks, priority, true, null);
129 }
130
131 /**
132 * Instantiates a {@link ContainerRequest} with the given constraints.
133 *
134 * @param capability
135 * The {@link Resource} to be requested for each container.
136 * @param nodes
137 * Any hosts to request that the containers are placed on.
138 * @param racks
139 * Any racks to request that the containers are placed on. The
140 * racks corresponding to any hosts requested will be automatically
141 * added to this list.
142 * @param priority
143 * The priority at which to request the containers. Higher
144 * priorities have lower numerical values.
145 * @param relaxLocality
146 * If true, containers for this request may be assigned on hosts
147 * and racks other than the ones explicitly requested.
148 */
149 public ContainerRequest(Resource capability, String[] nodes,
150 String[] racks, Priority priority, boolean relaxLocality) {
151 this(capability, nodes, racks, priority, relaxLocality, null);
152 }
153
154 /**
155 * Instantiates a {@link ContainerRequest} with the given constraints.
156 *
157 * @param capability
158 * The {@link Resource} to be requested for each container.
159 * @param nodes
160 * Any hosts to request that the containers are placed on.
161 * @param racks
162 * Any racks to request that the containers are placed on. The
163 * racks corresponding to any hosts requested will be automatically
164 * added to this list.
165 * @param priority
166 * The priority at which to request the containers. Higher
167 * priorities have lower numerical values.
168 * @param relaxLocality
169 * If true, containers for this request may be assigned on hosts
170 * and racks other than the ones explicitly requested.
171 * @param nodeLabelsExpression
172 * Set node labels to allocate resource
173 */
174 public ContainerRequest(Resource capability, String[] nodes,
175 String[] racks, Priority priority, boolean relaxLocality,
176 String nodeLabelsExpression) {
177 // Validate request
178 Preconditions.checkArgument(capability != null,
179 "The Resource to be requested for each container " +
180 "should not be null ");
181 Preconditions.checkArgument(priority != null,
182 "The priority at which to request containers should not be null ");
183 Preconditions.checkArgument(
184 !(!relaxLocality && (racks == null || racks.length == 0)
185 && (nodes == null || nodes.length == 0)),
186 "Can't turn off locality relaxation on a " +
187 "request with no location constraints");
188 this.capability = capability;
189 this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
190 this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
191 this.priority = priority;
192 this.relaxLocality = relaxLocality;
193 this.nodeLabelsExpression = nodeLabelsExpression;
194 }
195
196 public Resource getCapability() {
197 return capability;
198 }
199
200 public List<String> getNodes() {
201 return nodes;
202 }
203
204 public List<String> getRacks() {
205 return racks;
206 }
207
208 public Priority getPriority() {
209 return priority;
210 }
211
212 public boolean getRelaxLocality() {
213 return relaxLocality;
214 }
215
216 public String getNodeLabelExpression() {
217 return nodeLabelsExpression;
218 }
219
220 public String toString() {
221 StringBuilder sb = new StringBuilder();
222 sb.append("Capability[").append(capability).append("]");
223 sb.append("Priority[").append(priority).append("]");
224 return sb.toString();
225 }
226 }
227
228 /**
229 * Register the application master. This must be called before any
230 * other interaction
231 * @param appHostName Name of the host on which master is running
232 * @param appHostPort Port master is listening on
233 * @param appTrackingUrl URL at which the master info can be seen
234 * @return <code>RegisterApplicationMasterResponse</code>
235 * @throws YarnException
236 * @throws IOException
237 */
238 public abstract RegisterApplicationMasterResponse
239 registerApplicationMaster(String appHostName,
240 int appHostPort,
241 String appTrackingUrl)
242 throws YarnException, IOException;
243
244 /**
245 * Request additional containers and receive new container allocations.
246 * Requests made via <code>addContainerRequest</code> are sent to the
247 * <code>ResourceManager</code>. New containers assigned to the master are
248 * retrieved. Status of completed containers and node health updates are also
249 * retrieved. This also doubles up as a heartbeat to the ResourceManager and
250 * must be made periodically. The call may not always return any new
251 * allocations of containers. App should not make concurrent allocate
252 * requests. May cause request loss.
253 *
254 * <p>
255 * Note : If the user has not removed container requests that have already
256 * been satisfied, then the re-register may end up sending the entire
257 * container requests to the RM (including matched requests). Which would mean
258 * the RM could end up giving it a lot of new allocated containers.
259 * </p>
260 *
261 * @param progressIndicator Indicates progress made by the master
262 * @return the response of the allocate request
263 * @throws YarnException
264 * @throws IOException
265 */
266 public abstract AllocateResponse allocate(float progressIndicator)
267 throws YarnException, IOException;
268
269 /**
270 * Unregister the application master. This must be called in the end.
271 * @param appStatus Success/Failure status of the master
272 * @param appMessage Diagnostics message on failure
273 * @param appTrackingUrl New URL to get master info
274 * @throws YarnException
275 * @throws IOException
276 */
277 public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
278 String appMessage,
279 String appTrackingUrl)
280 throws YarnException, IOException;
281
282 /**
283 * Request containers for resources before calling <code>allocate</code>
284 * @param req Resource request
285 */
286 public abstract void addContainerRequest(T req);
287
288 /**
289 * Remove previous container request. The previous container request may have
290 * already been sent to the ResourceManager. So even after the remove request
291 * the app must be prepared to receive an allocation for the previous request
292 * even after the remove request
293 * @param req Resource request
294 */
295 public abstract void removeContainerRequest(T req);
296
297 /**
298 * Release containers assigned by the Resource Manager. If the app cannot use
299 * the container or wants to give up the container then it can release them.
300 * The app needs to make new requests for the released resource capability if
301 * it still needs it. eg. it released non-local resources
302 * @param containerId
303 */
304 public abstract void releaseAssignedContainer(ContainerId containerId);
305
306 /**
307 * Get the currently available resources in the cluster.
308 * A valid value is available after a call to allocate has been made
309 * @return Currently available resources
310 */
311 public abstract Resource getAvailableResources();
312
313 /**
314 * Get the current number of nodes in the cluster.
315 * A valid values is available after a call to allocate has been made
316 * @return Current number of nodes in the cluster
317 */
318 public abstract int getClusterNodeCount();
319
320 /**
321 * Get outstanding <code>ContainerRequest</code>s matching the given
322 * parameters. These ContainerRequests should have been added via
323 * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
324 * the AMRMClient may return its internal collection directly without creating
325 * a copy. Users should not perform mutable operations on the return value.
326 * Each collection in the list contains requests with identical
327 * <code>Resource</code> size that fit in the given capability. In a
328 * collection, requests will be returned in the same order as they were added.
329 * @return Collection of request matching the parameters
330 */
331 public abstract List<? extends Collection<T>> getMatchingRequests(
332 Priority priority,
333 String resourceName,
334 Resource capability);
335
336 /**
337 * Update application's blacklist with addition or removal resources.
338 *
339 * @param blacklistAdditions list of resources which should be added to the
340 * application blacklist
341 * @param blacklistRemovals list of resources which should be removed from the
342 * application blacklist
343 */
344 public abstract void updateBlacklist(List<String> blacklistAdditions,
345 List<String> blacklistRemovals);
346
347 /**
348 * Set the NM token cache for the <code>AMRMClient</code>. This cache must
349 * be shared with the {@link NMClient} used to manage containers for the
350 * <code>AMRMClient</code>
351 * <p/>
352 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
353 * singleton instance will be used.
354 *
355 * @param nmTokenCache the NM token cache to use.
356 */
357 public void setNMTokenCache(NMTokenCache nmTokenCache) {
358 this.nmTokenCache = nmTokenCache;
359 }
360
361 /**
362 * Get the NM token cache of the <code>AMRMClient</code>. This cache must be
363 * shared with the {@link NMClient} used to manage containers for the
364 * <code>AMRMClient</code>.
365 * <p/>
366 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
367 * singleton instance will be used.
368 *
369 * @return the NM token cache.
370 */
371 public NMTokenCache getNMTokenCache() {
372 return nmTokenCache;
373 }
374
375 /**
376 * Wait for <code>check</code> to return true for each 1000 ms.
377 * See also {@link #waitFor(com.google.common.base.Supplier, int)}
378 * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
379 * @param check
380 */
381 public void waitFor(Supplier<Boolean> check) throws InterruptedException {
382 waitFor(check, 1000);
383 }
384
385 /**
386 * Wait for <code>check</code> to return true for each
387 * <code>checkEveryMillis</code> ms.
388 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
389 * @param check user defined checker
390 * @param checkEveryMillis interval to call <code>check</code>
391 */
392 public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
393 throws InterruptedException {
394 waitFor(check, checkEveryMillis, 1);
395 }
396
397 /**
398 * Wait for <code>check</code> to return true for each
399 * <code>checkEveryMillis</code> ms. In the main loop, this method will log
400 * the message "waiting in main loop" for each <code>logInterval</code> times
401 * iteration to confirm the thread is alive.
402 * @param check user defined checker
403 * @param checkEveryMillis interval to call <code>check</code>
404 * @param logInterval interval to log for each
405 */
406 public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
407 int logInterval) throws InterruptedException {
408 Preconditions.checkNotNull(check, "check should not be null");
409 Preconditions.checkArgument(checkEveryMillis >= 0,
410 "checkEveryMillis should be positive value");
411 Preconditions.checkArgument(logInterval >= 0,
412 "logInterval should be positive value");
413
414 int loggingCounter = logInterval;
415 do {
416 if (LOG.isDebugEnabled()) {
417 LOG.debug("Check the condition for main loop.");
418 }
419
420 boolean result = check.get();
421 if (result) {
422 LOG.info("Exits the main loop.");
423 return;
424 }
425 if (--loggingCounter <= 0) {
426 LOG.info("Waiting in main loop.");
427 loggingCounter = logInterval;
428 }
429
430 Thread.sleep(checkEveryMillis);
431 } while (true);
432 }
433
434 }