Java源码示例:org.apache.hadoop.yarn.conf.HAUtil
示例1
@Override
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
this.rmProxy = rmProxy;
this.protocol = protocol;
this.rmProxy.checkAllowedProtocols(this.protocol);
this.conf = new YarnConfiguration(configuration);
Collection<String> rmIds = HAUtil.getRMHAIds(conf);
this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
conf.setInt(CommonConfigurationKeysPublic.
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
示例2
/**
* Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as
* well.
*/
@Private
protected static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, RMProxy instance) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf);
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider =
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
}
示例3
@Unstable
public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
services.add(SecurityUtil.buildTokenService(
yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
.toString());
}
return new Text(Joiner.on(',').join(services));
}
// Non-HA case - no need to set RM_ID
return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
defaultAddr, defaultPort));
}
示例4
protected String findRedirectUrl() throws ServletException {
String addr;
if (proxyUriBases.size() == 1) { // external proxy or not RM HA
addr = proxyUriBases.values().iterator().next();
} else { // RM HA
YarnConfiguration conf = new YarnConfiguration();
String activeRMId = RMHAUtils.findActiveRMHAId(conf);
String addressPropertyPrefix = YarnConfiguration.useHttps(conf)
? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
: YarnConfiguration.RM_WEBAPP_ADDRESS;
String host = conf.get(
HAUtil.addSuffix(addressPropertyPrefix, activeRMId));
addr = proxyUriBases.get(host);
}
if (addr == null) {
throw new ServletException(
"Could not determine the proxy server for redirection");
}
return addr;
}
示例5
private synchronized void initResourceManager(int index, Configuration conf) {
if (HAUtil.isHAEnabled(conf)) {
conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
}
resourceManagers[index].init(conf);
resourceManagers[index].getRMContext().getDispatcher().register(
RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) {
appMasters.put(event.getApplicationAttemptId(),
event.getTimestamp());
} else if (event instanceof RMAppAttemptUnregistrationEvent) {
appMasters.remove(event.getApplicationAttemptId());
}
}
});
}
示例6
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
}
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
daemonUser = UserGroupInformation.getCurrentUser();
authorizer = YarnAuthorizationProvider.getInstance(conf);
authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation
.getCurrentUser());
rmId = conf.get(YarnConfiguration.RM_HA_ID);
super.serviceInit(conf);
}
示例7
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by zkAcl are given
* rwa access, while the current RM has exclude create-delete access.
*
* To be called only when HA is enabled and the configuration doesn't set ACL
* for the root node.
*/
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
acl.getId()));
}
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
示例8
@Override
public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
createRootDir(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){
fence();
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot);
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
createRootDir(amrmTokenSecretManagerRoot);
}
示例9
@Before
public void setup() throws Exception {
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
int base = 100;
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ (base + 20));
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ (base + 40));
base = base * 2;
}
confForRM1 = new Configuration(configuration);
confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
confForRM2 = new Configuration(configuration);
confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
}
示例10
@Before
public void setUp() throws Exception {
configuration = new Configuration();
UserGroupInformation.setConfiguration(configuration);
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
+ RM2_NODE_ID);
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
configuration.set(HAUtil.addSuffix(confKey, RM3_NODE_ID), RM3_ADDRESS);
}
// Enable webapp to test web-services also
configuration.setBoolean(MockRM.ENABLE_WEBAPP, true);
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
}
示例11
private Configuration createHARMConf(
String rmIds, String rmId, int adminPort) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
for (String id : HAUtil.getRMHAIds(conf)) {
conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
}
}
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
"localhost:" + adminPort);
return conf;
}
示例12
@Override
protected HAServiceTarget resolveTarget(String rmId) {
Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
if (!rmIds.contains(rmId)) {
StringBuilder msg = new StringBuilder();
msg.append(rmId + " is not a valid serviceId. It should be one of ");
for (String id : rmIds) {
msg.append(id + " ");
}
throw new IllegalArgumentException(msg.toString());
}
try {
YarnConfiguration conf = new YarnConfiguration(getConf());
conf.set(YarnConfiguration.RM_HA_ID, rmId);
return new RMHAServiceTarget(conf);
} catch (IllegalArgumentException iae) {
throw new YarnRuntimeException("Could not connect to " + rmId +
"; the configuration for it might be missing");
} catch (IOException ioe) {
throw new YarnRuntimeException(
"Could not connect to RM HA Admin for node " + rmId);
}
}
示例13
@Override
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
this.rmProxy = rmProxy;
this.protocol = protocol;
this.rmProxy.checkAllowedProtocols(this.protocol);
this.conf = new YarnConfiguration(configuration);
Collection<String> rmIds = HAUtil.getRMHAIds(conf);
this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
conf.setInt(CommonConfigurationKeysPublic.
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
示例14
/**
* Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as
* well.
*/
@Private
protected static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, RMProxy instance) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf);
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider =
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
}
示例15
@Unstable
public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
services.add(SecurityUtil.buildTokenService(
yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
.toString());
}
return new Text(Joiner.on(',').join(services));
}
// Non-HA case - no need to set RM_ID
return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
defaultAddr, defaultPort));
}
示例16
protected String findRedirectUrl() throws ServletException {
String addr;
if (proxyUriBases.size() == 1) { // external proxy or not RM HA
addr = proxyUriBases.values().iterator().next();
} else { // RM HA
YarnConfiguration conf = new YarnConfiguration();
String activeRMId = RMHAUtils.findActiveRMHAId(conf);
String addressPropertyPrefix = YarnConfiguration.useHttps(conf)
? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
: YarnConfiguration.RM_WEBAPP_ADDRESS;
String host = conf.get(
HAUtil.addSuffix(addressPropertyPrefix, activeRMId));
addr = proxyUriBases.get(host);
}
if (addr == null) {
throw new ServletException(
"Could not determine the proxy server for redirection");
}
return addr;
}
示例17
private synchronized void initResourceManager(int index, Configuration conf) {
if (HAUtil.isHAEnabled(conf)) {
conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
}
resourceManagers[index].init(conf);
resourceManagers[index].getRMContext().getDispatcher().register(
RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) {
appMasters.put(event.getApplicationAttemptId(),
event.getTimestamp());
} else if (event instanceof RMAppAttemptUnregistrationEvent) {
appMasters.remove(event.getApplicationAttemptId());
}
}
});
}
示例18
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
}
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
daemonUser = UserGroupInformation.getCurrentUser();
authorizer = YarnAuthorizationProvider.getInstance(conf);
authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation
.getCurrentUser());
rmId = conf.get(YarnConfiguration.RM_HA_ID);
super.serviceInit(conf);
}
示例19
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by zkAcl are given
* rwa access, while the current RM has exclude create-delete access.
*
* To be called only when HA is enabled and the configuration doesn't set ACL
* for the root node.
*/
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
acl.getId()));
}
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
示例20
@Override
public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
createRootDir(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){
fence();
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot);
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
createRootDir(amrmTokenSecretManagerRoot);
}
示例21
@Before
public void setup() throws Exception {
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
int base = 100;
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ (base + 20));
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ (base + 40));
base = base * 2;
}
confForRM1 = new Configuration(configuration);
confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
confForRM2 = new Configuration(configuration);
confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
}
示例22
@Before
public void setUp() throws Exception {
configuration = new Configuration();
UserGroupInformation.setConfiguration(configuration);
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
+ RM2_NODE_ID);
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
configuration.set(HAUtil.addSuffix(confKey, RM3_NODE_ID), RM3_ADDRESS);
}
// Enable webapp to test web-services also
configuration.setBoolean(MockRM.ENABLE_WEBAPP, true);
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
}
示例23
private Configuration createHARMConf(
String rmIds, String rmId, int adminPort) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
for (String id : HAUtil.getRMHAIds(conf)) {
conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
}
}
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
"localhost:" + adminPort);
return conf;
}
示例24
@Override
protected HAServiceTarget resolveTarget(String rmId) {
Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
if (!rmIds.contains(rmId)) {
StringBuilder msg = new StringBuilder();
msg.append(rmId + " is not a valid serviceId. It should be one of ");
for (String id : rmIds) {
msg.append(id + " ");
}
throw new IllegalArgumentException(msg.toString());
}
try {
YarnConfiguration conf = new YarnConfiguration(getConf());
conf.set(YarnConfiguration.RM_HA_ID, rmId);
return new RMHAServiceTarget(conf);
} catch (IllegalArgumentException iae) {
throw new YarnRuntimeException("Could not connect to " + rmId +
"; the configuration for it might be missing");
} catch (IOException ioe) {
throw new YarnRuntimeException(
"Could not connect to RM HA Admin for node " + rmId);
}
}
示例25
public static String extractJobHistoryUrl(String yarnWebapp, Configuration conf) {
Pattern pattern = Pattern.compile("(http[s]*://)([^:]*):([^/])*.*");
Matcher m = pattern.matcher(yarnWebapp);
Preconditions.checkArgument(m.matches(), "Yarn master URL" + yarnWebapp + " not right.");
String defaultHistoryUrl = m.group(2) + ":19888";
return m.group(1) + HAUtil.getConfValueForRMInstance(MR_JOB_HISTORY_URL_CONF_KEY, defaultHistoryUrl, conf);
}
示例26
public static List<String> getProxyHostsAndPortsForAmFilter(
Configuration conf) {
List<String> addrs = new ArrayList<String>();
String proxyAddr = conf.get(YarnConfiguration.PROXY_ADDRESS);
// If PROXY_ADDRESS isn't set, fallback to RM_WEBAPP(_HTTPS)_ADDRESS
// There could be multiple if using RM HA
if (proxyAddr == null || proxyAddr.isEmpty()) {
// If RM HA is enabled, try getting those addresses
if (HAUtil.isHAEnabled(conf)) {
List<String> haAddrs =
RMHAUtils.getRMHAWebappAddresses(new YarnConfiguration(conf));
for (String addr : haAddrs) {
try {
InetSocketAddress socketAddr = NetUtils.createSocketAddr(addr);
addrs.add(getResolvedAddress(socketAddr));
} catch(IllegalArgumentException e) {
// skip if can't resolve
}
}
}
// If couldn't resolve any of the addresses or not using RM HA, fallback
if (addrs.isEmpty()) {
addrs.add(getResolvedRMWebAppURLWithoutScheme(conf));
}
} else {
addrs.add(proxyAddr);
}
return addrs;
}
示例27
public static String getResolvedRemoteRMWebAppURLWithoutScheme(Configuration conf,
Policy httpPolicy) {
InetSocketAddress address = null;
String rmId = null;
if (HAUtil.isHAEnabled(conf)) {
// If HA enabled, pick one of the RM-IDs and rely on redirect to go to
// the Active RM
rmId = (String) HAUtil.getRMHAIds(conf).toArray()[0];
}
if (httpPolicy == Policy.HTTPS_ONLY) {
address =
conf.getSocketAddr(
rmId == null
? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
: HAUtil.addSuffix(
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, rmId),
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
} else {
address =
conf.getSocketAddr(
rmId == null
? YarnConfiguration.RM_WEBAPP_ADDRESS
: HAUtil.addSuffix(
YarnConfiguration.RM_WEBAPP_ADDRESS, rmId),
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
}
return getResolvedAddress(address);
}
示例28
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
示例29
@Override
protected void serviceInit(Configuration conf)
throws Exception {
conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkQuorum == null) {
throw new YarnRuntimeException("Embedded automatic failover " +
"is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
" is not set");
}
String rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
int maxRetryNum = conf.getInt(
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this, maxRetryNum);
elector.ensureParentZNode();
if (!isParentZnodeSafe(clusterId)) {
notifyFatalError(electionZNode + " znode has invalid data! "+
"Might need formatting!");
}
super.serviceInit(conf);
}
示例30
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return runWithCheck();
} catch (KeeperException.NoAuthException nae) {
if (HAUtil.isHAEnabled(getConfig())) {
// NoAuthException possibly means that this store is fenced due to
// another RM becoming active. Even if not,
// it is safer to assume we have been fenced
throw new StoreFencedException();
}
} catch (KeeperException ke) {
if (ke.code() == Code.NODEEXISTS) {
LOG.info("znode already exists!");
return null;
}
if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
LOG.info("znode has already been deleted!");
return null;
}
LOG.info("Exception while executing a ZK operation.", ke);
if (shouldRetry(ke.code()) && ++retry < numRetries) {
LOG.info("Retrying operation on ZK. Retry no. " + retry);
Thread.sleep(zkRetryInterval);
createConnection();
continue;
}
LOG.info("Maxed out ZK retries. Giving up!");
throw ke;
}
}
}