001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client.api.async;
020
021 import com.google.common.base.Preconditions;
022 import com.google.common.base.Supplier;
023 import java.io.IOException;
024 import java.util.Collection;
025 import java.util.List;
026 import java.util.concurrent.atomic.AtomicInteger;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience.Private;
031 import org.apache.hadoop.classification.InterfaceAudience.Public;
032 import org.apache.hadoop.classification.InterfaceStability.Stable;
033 import org.apache.hadoop.service.AbstractService;
034 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
035 import org.apache.hadoop.yarn.api.records.Container;
036 import org.apache.hadoop.yarn.api.records.ContainerId;
037 import org.apache.hadoop.yarn.api.records.ContainerStatus;
038 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
039 import org.apache.hadoop.yarn.api.records.NodeReport;
040 import org.apache.hadoop.yarn.api.records.Priority;
041 import org.apache.hadoop.yarn.api.records.Resource;
042 import org.apache.hadoop.yarn.client.api.AMRMClient;
043 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
044 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
045 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
046 import org.apache.hadoop.yarn.exceptions.YarnException;
047
048 import com.google.common.annotations.VisibleForTesting;
049
050 /**
051 * <code>AMRMClientAsync</code> handles communication with the ResourceManager
052 * and provides asynchronous updates on events such as container allocations and
053 * completions. It contains a thread that sends periodic heartbeats to the
054 * ResourceManager.
055 *
056 * It should be used by implementing a CallbackHandler:
057 * <pre>
058 * {@code
059 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
060 * public void onContainersAllocated(List<Container> containers) {
061 * [run tasks on the containers]
062 * }
063 *
064 * public void onContainersCompleted(List<ContainerStatus> statuses) {
065 * [update progress, check whether app is done]
066 * }
067 *
068 * public void onNodesUpdated(List<NodeReport> updated) {}
069 *
070 * public void onReboot() {}
071 * }
072 * }
073 * </pre>
074 *
075 * The client's lifecycle should be managed similarly to the following:
076 *
077 * <pre>
078 * {@code
079 * AMRMClientAsync asyncClient =
080 * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
081 * asyncClient.init(conf);
082 * asyncClient.start();
083 * RegisterApplicationMasterResponse response = asyncClient
084 * .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
085 * appMasterTrackingUrl);
086 * asyncClient.addContainerRequest(containerRequest);
087 * [... wait for application to complete]
088 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
089 * asyncClient.stop();
090 * }
091 * </pre>
092 */
093 @Public
094 @Stable
095 public abstract class AMRMClientAsync<T extends ContainerRequest>
096 extends AbstractService {
097 private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
098
099 protected final AMRMClient<T> client;
100 protected final CallbackHandler handler;
101 protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
102
103 public static <T extends ContainerRequest> AMRMClientAsync<T>
104 createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
105 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
106 }
107
108 public static <T extends ContainerRequest> AMRMClientAsync<T>
109 createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
110 CallbackHandler callbackHandler) {
111 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
112 }
113
114 protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
115 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
116 }
117
118 @Private
119 @VisibleForTesting
120 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
121 CallbackHandler callbackHandler) {
122 super(AMRMClientAsync.class.getName());
123 this.client = client;
124 this.heartbeatIntervalMs.set(intervalMs);
125 this.handler = callbackHandler;
126 }
127
128 public void setHeartbeatInterval(int interval) {
129 heartbeatIntervalMs.set(interval);
130 }
131
132 public abstract List<? extends Collection<T>> getMatchingRequests(
133 Priority priority,
134 String resourceName,
135 Resource capability);
136
137 /**
138 * Registers this application master with the resource manager. On successful
139 * registration, starts the heartbeating thread.
140 * @throws YarnException
141 * @throws IOException
142 */
143 public abstract RegisterApplicationMasterResponse registerApplicationMaster(
144 String appHostName, int appHostPort, String appTrackingUrl)
145 throws YarnException, IOException;
146
147 /**
148 * Unregister the application master. This must be called in the end.
149 * @param appStatus Success/Failure status of the master
150 * @param appMessage Diagnostics message on failure
151 * @param appTrackingUrl New URL to get master info
152 * @throws YarnException
153 * @throws IOException
154 */
155 public abstract void unregisterApplicationMaster(
156 FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)
157 throws YarnException, IOException;
158
159 /**
160 * Request containers for resources before calling <code>allocate</code>
161 * @param req Resource request
162 */
163 public abstract void addContainerRequest(T req);
164
165 /**
166 * Remove previous container request. The previous container request may have
167 * already been sent to the ResourceManager. So even after the remove request
168 * the app must be prepared to receive an allocation for the previous request
169 * even after the remove request
170 * @param req Resource request
171 */
172 public abstract void removeContainerRequest(T req);
173
174 /**
175 * Release containers assigned by the Resource Manager. If the app cannot use
176 * the container or wants to give up the container then it can release them.
177 * The app needs to make new requests for the released resource capability if
178 * it still needs it. eg. it released non-local resources
179 * @param containerId
180 */
181 public abstract void releaseAssignedContainer(ContainerId containerId);
182
183 /**
184 * Get the currently available resources in the cluster.
185 * A valid value is available after a call to allocate has been made
186 * @return Currently available resources
187 */
188 public abstract Resource getAvailableResources();
189
190 /**
191 * Get the current number of nodes in the cluster.
192 * A valid values is available after a call to allocate has been made
193 * @return Current number of nodes in the cluster
194 */
195 public abstract int getClusterNodeCount();
196
197 /**
198 * Wait for <code>check</code> to return true for each 1000 ms.
199 * See also {@link #waitFor(com.google.common.base.Supplier, int)}
200 * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
201 * @param check
202 */
203 public void waitFor(Supplier<Boolean> check) throws InterruptedException {
204 waitFor(check, 1000);
205 }
206
207 /**
208 * Wait for <code>check</code> to return true for each
209 * <code>checkEveryMillis</code> ms.
210 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
211 * @param check user defined checker
212 * @param checkEveryMillis interval to call <code>check</code>
213 */
214 public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
215 throws InterruptedException {
216 waitFor(check, checkEveryMillis, 1);
217 };
218
219 /**
220 * Wait for <code>check</code> to return true for each
221 * <code>checkEveryMillis</code> ms. In the main loop, this method will log
222 * the message "waiting in main loop" for each <code>logInterval</code> times
223 * iteration to confirm the thread is alive.
224 * @param check user defined checker
225 * @param checkEveryMillis interval to call <code>check</code>
226 * @param logInterval interval to log for each
227 */
228 public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
229 int logInterval) throws InterruptedException {
230 Preconditions.checkNotNull(check, "check should not be null");
231 Preconditions.checkArgument(checkEveryMillis >= 0,
232 "checkEveryMillis should be positive value");
233 Preconditions.checkArgument(logInterval >= 0,
234 "logInterval should be positive value");
235
236 int loggingCounter = logInterval;
237 do {
238 if (LOG.isDebugEnabled()) {
239 LOG.debug("Check the condition for main loop.");
240 }
241
242 boolean result = check.get();
243 if (result) {
244 LOG.info("Exits the main loop.");
245 return;
246 }
247 if (--loggingCounter <= 0) {
248 LOG.info("Waiting in main loop.");
249 loggingCounter = logInterval;
250 }
251
252 Thread.sleep(checkEveryMillis);
253 } while (true);
254 }
255
256 public interface CallbackHandler {
257
258 /**
259 * Called when the ResourceManager responds to a heartbeat with completed
260 * containers. If the response contains both completed containers and
261 * allocated containers, this will be called before containersAllocated.
262 */
263 public void onContainersCompleted(List<ContainerStatus> statuses);
264
265 /**
266 * Called when the ResourceManager responds to a heartbeat with allocated
267 * containers. If the response containers both completed containers and
268 * allocated containers, this will be called after containersCompleted.
269 */
270 public void onContainersAllocated(List<Container> containers);
271
272 /**
273 * Called when the ResourceManager wants the ApplicationMaster to shutdown
274 * for being out of sync etc. The ApplicationMaster should not unregister
275 * with the RM unless the ApplicationMaster wants to be the last attempt.
276 */
277 public void onShutdownRequest();
278
279 /**
280 * Called when nodes tracked by the ResourceManager have changed in health,
281 * availability etc.
282 */
283 public void onNodesUpdated(List<NodeReport> updatedNodes);
284
285 public float getProgress();
286
287 /**
288 * Called when error comes from RM communications as well as from errors in
289 * the callback itself from the app. Calling
290 * stop() is the recommended action.
291 *
292 * @param e
293 */
294 public void onError(Throwable e);
295 }
296 }