001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client.api.async;
020
021 import java.nio.ByteBuffer;
022 import java.util.Map;
023 import java.util.concurrent.ConcurrentMap;
024
025 import org.apache.hadoop.classification.InterfaceAudience.Private;
026 import org.apache.hadoop.classification.InterfaceAudience.Public;
027 import org.apache.hadoop.classification.InterfaceStability.Stable;
028 import org.apache.hadoop.service.AbstractService;
029 import org.apache.hadoop.yarn.api.records.Container;
030 import org.apache.hadoop.yarn.api.records.ContainerId;
031 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
032 import org.apache.hadoop.yarn.api.records.ContainerStatus;
033 import org.apache.hadoop.yarn.api.records.NodeId;
034 import org.apache.hadoop.yarn.api.records.Token;
035 import org.apache.hadoop.yarn.client.api.NMClient;
036 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
037 import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
038 import org.apache.hadoop.yarn.conf.YarnConfiguration;
039
040 import com.google.common.annotations.VisibleForTesting;
041
042 /**
043 * <code>NMClientAsync</code> handles communication with all the NodeManagers
044 * and provides asynchronous updates on getting responses from them. It
045 * maintains a thread pool to communicate with individual NMs where a number of
046 * worker threads process requests to NMs by using {@link NMClientImpl}. The max
047 * size of the thread pool is configurable through
048 * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
049 *
050 * It should be used in conjunction with a CallbackHandler. For example
051 *
052 * <pre>
053 * {@code
054 * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
055 * public void onContainerStarted(ContainerId containerId,
056 * Map<String, ByteBuffer> allServiceResponse) {
057 * [post process after the container is started, process the response]
058 * }
059 *
060 * public void onContainerStatusReceived(ContainerId containerId,
061 * ContainerStatus containerStatus) {
062 * [make use of the status of the container]
063 * }
064 *
065 * public void onContainerStopped(ContainerId containerId) {
066 * [post process after the container is stopped]
067 * }
068 *
069 * public void onStartContainerError(
070 * ContainerId containerId, Throwable t) {
071 * [handle the raised exception]
072 * }
073 *
074 * public void onGetContainerStatusError(
075 * ContainerId containerId, Throwable t) {
076 * [handle the raised exception]
077 * }
078 *
079 * public void onStopContainerError(
080 * ContainerId containerId, Throwable t) {
081 * [handle the raised exception]
082 * }
083 * }
084 * }
085 * </pre>
086 *
087 * The client's life-cycle should be managed like the following:
088 *
089 * <pre>
090 * {@code
091 * NMClientAsync asyncClient =
092 * NMClientAsync.createNMClientAsync(new MyCallbackhandler());
093 * asyncClient.init(conf);
094 * asyncClient.start();
095 * asyncClient.startContainer(container, containerLaunchContext);
096 * [... wait for container being started]
097 * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
098 * container.getContainerToken());
099 * [... handle the status in the callback instance]
100 * asyncClient.stopContainer(container.getId(), container.getNodeId(),
101 * container.getContainerToken());
102 * [... wait for container being stopped]
103 * asyncClient.stop();
104 * }
105 * </pre>
106 */
107 @Public
108 @Stable
109 public abstract class NMClientAsync extends AbstractService {
110
111 protected NMClient client;
112 protected CallbackHandler callbackHandler;
113
114 public static NMClientAsync createNMClientAsync(
115 CallbackHandler callbackHandler) {
116 return new NMClientAsyncImpl(callbackHandler);
117 }
118
119 protected NMClientAsync(CallbackHandler callbackHandler) {
120 this (NMClientAsync.class.getName(), callbackHandler);
121 }
122
123 protected NMClientAsync(String name, CallbackHandler callbackHandler) {
124 this (name, new NMClientImpl(), callbackHandler);
125 }
126
127 @Private
128 @VisibleForTesting
129 protected NMClientAsync(String name, NMClient client,
130 CallbackHandler callbackHandler) {
131 super(name);
132 this.setClient(client);
133 this.setCallbackHandler(callbackHandler);
134 }
135
136 public abstract void startContainerAsync(
137 Container container, ContainerLaunchContext containerLaunchContext);
138
139 public abstract void stopContainerAsync(
140 ContainerId containerId, NodeId nodeId);
141
142 public abstract void getContainerStatusAsync(
143 ContainerId containerId, NodeId nodeId);
144
145 public NMClient getClient() {
146 return client;
147 }
148
149 public void setClient(NMClient client) {
150 this.client = client;
151 }
152
153 public CallbackHandler getCallbackHandler() {
154 return callbackHandler;
155 }
156
157 public void setCallbackHandler(CallbackHandler callbackHandler) {
158 this.callbackHandler = callbackHandler;
159 }
160
161 /**
162 * <p>
163 * The callback interface needs to be implemented by {@link NMClientAsync}
164 * users. The APIs are called when responses from <code>NodeManager</code> are
165 * available.
166 * </p>
167 *
168 * <p>
169 * Once a callback happens, the users can chose to act on it in blocking or
170 * non-blocking manner. If the action on callback is done in a blocking
171 * manner, some of the threads performing requests on NodeManagers may get
172 * blocked depending on how many threads in the pool are busy.
173 * </p>
174 *
175 * <p>
176 * The implementation of the callback function should not throw the
177 * unexpected exception. Otherwise, {@link NMClientAsync} will just
178 * catch, log and then ignore it.
179 * </p>
180 */
181 public static interface CallbackHandler {
182 /**
183 * The API is called when <code>NodeManager</code> responds to indicate its
184 * acceptance of the starting container request
185 * @param containerId the Id of the container
186 * @param allServiceResponse a Map between the auxiliary service names and
187 * their outputs
188 */
189 void onContainerStarted(ContainerId containerId,
190 Map<String, ByteBuffer> allServiceResponse);
191
192 /**
193 * The API is called when <code>NodeManager</code> responds with the status
194 * of the container
195 * @param containerId the Id of the container
196 * @param containerStatus the status of the container
197 */
198 void onContainerStatusReceived(ContainerId containerId,
199 ContainerStatus containerStatus);
200
201 /**
202 * The API is called when <code>NodeManager</code> responds to indicate the
203 * container is stopped.
204 * @param containerId the Id of the container
205 */
206 void onContainerStopped(ContainerId containerId);
207
208 /**
209 * The API is called when an exception is raised in the process of
210 * starting a container
211 *
212 * @param containerId the Id of the container
213 * @param t the raised exception
214 */
215 void onStartContainerError(ContainerId containerId, Throwable t);
216
217 /**
218 * The API is called when an exception is raised in the process of
219 * querying the status of a container
220 *
221 * @param containerId the Id of the container
222 * @param t the raised exception
223 */
224 void onGetContainerStatusError(ContainerId containerId, Throwable t);
225
226 /**
227 * The API is called when an exception is raised in the process of
228 * stopping a container
229 *
230 * @param containerId the Id of the container
231 * @param t the raised exception
232 */
233 void onStopContainerError(ContainerId containerId, Throwable t);
234
235 }
236
237 }