Java源码示例:org.apache.hadoop.yarn.conf.HAUtil

示例1
@Override
public void init(Configuration configuration, RMProxy<T> rmProxy,
                  Class<T> protocol) {
  this.rmProxy = rmProxy;
  this.protocol = protocol;
  this.rmProxy.checkAllowedProtocols(this.protocol);
  this.conf = new YarnConfiguration(configuration);
  Collection<String> rmIds = HAUtil.getRMHAIds(conf);
  this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
  conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);

  conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
      conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));

  conf.setInt(CommonConfigurationKeysPublic.
      IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
      conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
 
示例2
/**
 * Create a proxy for the specified protocol. For non-HA,
 * this is a direct connection to the ResourceManager address. When HA is
 * enabled, the proxy handles the failover between the ResourceManagers as
 * well.
 */
@Private
protected static <T> T createRMProxy(final Configuration configuration,
    final Class<T> protocol, RMProxy instance) throws IOException {
  YarnConfiguration conf = (configuration instanceof YarnConfiguration)
      ? (YarnConfiguration) configuration
      : new YarnConfiguration(configuration);
  RetryPolicy retryPolicy = createRetryPolicy(conf);
  if (HAUtil.isHAEnabled(conf)) {
    RMFailoverProxyProvider<T> provider =
        instance.createRMFailoverProxyProvider(conf, protocol);
    return (T) RetryProxy.create(protocol, provider, retryPolicy);
  } else {
    InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
  }
}
 
示例3
@Unstable
public static Text getTokenService(Configuration conf, String address,
    String defaultAddr, int defaultPort) {
  if (HAUtil.isHAEnabled(conf)) {
    // Build a list of service addresses to form the service name
    ArrayList<String> services = new ArrayList<String>();
    YarnConfiguration yarnConf = new YarnConfiguration(conf);
    for (String rmId : HAUtil.getRMHAIds(conf)) {
      // Set RM_ID to get the corresponding RM_ADDRESS
      yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
      services.add(SecurityUtil.buildTokenService(
          yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
          .toString());
    }
    return new Text(Joiner.on(',').join(services));
  }

  // Non-HA case - no need to set RM_ID
  return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
    defaultAddr, defaultPort));
}
 
示例4
protected String findRedirectUrl() throws ServletException {
  String addr;
  if (proxyUriBases.size() == 1) {  // external proxy or not RM HA
    addr = proxyUriBases.values().iterator().next();
  } else {                          // RM HA
    YarnConfiguration conf = new YarnConfiguration();
    String activeRMId = RMHAUtils.findActiveRMHAId(conf);
    String addressPropertyPrefix = YarnConfiguration.useHttps(conf)
        ? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
        : YarnConfiguration.RM_WEBAPP_ADDRESS;
    String host = conf.get(
        HAUtil.addSuffix(addressPropertyPrefix, activeRMId));
    addr = proxyUriBases.get(host);
  }
  if (addr == null) {
    throw new ServletException(
        "Could not determine the proxy server for redirection");
  }
  return addr;
}
 
示例5
private synchronized void initResourceManager(int index, Configuration conf) {
  if (HAUtil.isHAEnabled(conf)) {
    conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
  }
  resourceManagers[index].init(conf);
  resourceManagers[index].getRMContext().getDispatcher().register(
      RMAppAttemptEventType.class,
      new EventHandler<RMAppAttemptEvent>() {
        public void handle(RMAppAttemptEvent event) {
          if (event instanceof RMAppAttemptRegistrationEvent) {
            appMasters.put(event.getApplicationAttemptId(),
                event.getTimestamp());
          } else if (event instanceof RMAppAttemptUnregistrationEvent) {
            appMasters.remove(event.getApplicationAttemptId());
          }
        }
      });
}
 
示例6
@Override
public void serviceInit(Configuration conf) throws Exception {
  if (rmContext.isHAEnabled()) {
    autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
    if (autoFailoverEnabled) {
      if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
        embeddedElector = createEmbeddedElectorService();
        addIfService(embeddedElector);
      }
    }
  }

  masterServiceBindAddress = conf.getSocketAddr(
      YarnConfiguration.RM_BIND_HOST,
      YarnConfiguration.RM_ADMIN_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
  daemonUser = UserGroupInformation.getCurrentUser();
  authorizer = YarnAuthorizationProvider.getInstance(conf);
  authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation
      .getCurrentUser());
  rmId = conf.get(YarnConfiguration.RM_HA_ID);
  super.serviceInit(conf);
}
 
示例7
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
 
示例8
@Override
public synchronized void startInternal() throws Exception {
  // createConnection for future API calls
  createConnection();

  // ensure root dirs exist
  createRootDirRecursively(znodeWorkingPath);
  createRootDir(zkRootNodePath);
  if (HAUtil.isHAEnabled(getConfig())){
    fence();
    verifyActiveStatusThread = new VerifyActiveStatusThread();
    verifyActiveStatusThread.start();
  }
  createRootDir(rmAppRoot);
  createRootDir(rmDTSecretManagerRoot);
  createRootDir(dtMasterKeysRootPath);
  createRootDir(delegationTokensRootPath);
  createRootDir(dtSequenceNumberPath);
  createRootDir(amrmTokenSecretManagerRoot);
}
 
示例9
@Before
public void setup() throws Exception {
  configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
  configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
  configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
  configuration.set(YarnConfiguration.RM_STORE,
      ZKRMStateStore.class.getName());
  configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
  configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
  configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
  configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
  int base = 100;
  for (String confKey : YarnConfiguration
      .getServiceAddressConfKeys(configuration)) {
    configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
        + (base + 20));
    configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
        + (base + 40));
    base = base * 2;
  }
  confForRM1 = new Configuration(configuration);
  confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
  confForRM2 = new Configuration(configuration);
  confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
}
 
示例10
@Before
public void setUp() throws Exception {
  configuration = new Configuration();
  UserGroupInformation.setConfiguration(configuration);
  configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
  configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
      + RM2_NODE_ID);
  for (String confKey : YarnConfiguration
      .getServiceAddressConfKeys(configuration)) {
    configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
    configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
    configuration.set(HAUtil.addSuffix(confKey, RM3_NODE_ID), RM3_ADDRESS);
  }

  // Enable webapp to test web-services also
  configuration.setBoolean(MockRM.ENABLE_WEBAPP, true);
  configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
  ClusterMetrics.destroy();
  QueueMetrics.clearQueueMetrics();
  DefaultMetricsSystem.shutdown();
}
 
示例11
private Configuration createHARMConf(
    String rmIds, String rmId, int adminPort) {
  Configuration conf = new YarnConfiguration();
  conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
  conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
  conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
  conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
  conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
  conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
  conf.set(YarnConfiguration.RM_HA_ID, rmId);
  conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");

  for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
    for (String id : HAUtil.getRMHAIds(conf)) {
      conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
    }
  }
  conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
      "localhost:" + adminPort);
  return conf;
}
 
示例12
@Override
protected HAServiceTarget resolveTarget(String rmId) {
  Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
  if (!rmIds.contains(rmId)) {
    StringBuilder msg = new StringBuilder();
    msg.append(rmId + " is not a valid serviceId. It should be one of ");
    for (String id : rmIds) {
      msg.append(id + " ");
    }
    throw new IllegalArgumentException(msg.toString());
  }
  try {
    YarnConfiguration conf = new YarnConfiguration(getConf());
    conf.set(YarnConfiguration.RM_HA_ID, rmId);
    return new RMHAServiceTarget(conf);
  } catch (IllegalArgumentException iae) {
    throw new YarnRuntimeException("Could not connect to " + rmId +
        "; the configuration for it might be missing");
  } catch (IOException ioe) {
    throw new YarnRuntimeException(
        "Could not connect to RM HA Admin for node " + rmId);
  }
}
 
示例13
@Override
public void init(Configuration configuration, RMProxy<T> rmProxy,
                  Class<T> protocol) {
  this.rmProxy = rmProxy;
  this.protocol = protocol;
  this.rmProxy.checkAllowedProtocols(this.protocol);
  this.conf = new YarnConfiguration(configuration);
  Collection<String> rmIds = HAUtil.getRMHAIds(conf);
  this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
  conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);

  conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
      conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));

  conf.setInt(CommonConfigurationKeysPublic.
      IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
      conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
 
示例14
/**
 * Create a proxy for the specified protocol. For non-HA,
 * this is a direct connection to the ResourceManager address. When HA is
 * enabled, the proxy handles the failover between the ResourceManagers as
 * well.
 */
@Private
protected static <T> T createRMProxy(final Configuration configuration,
    final Class<T> protocol, RMProxy instance) throws IOException {
  YarnConfiguration conf = (configuration instanceof YarnConfiguration)
      ? (YarnConfiguration) configuration
      : new YarnConfiguration(configuration);
  RetryPolicy retryPolicy = createRetryPolicy(conf);
  if (HAUtil.isHAEnabled(conf)) {
    RMFailoverProxyProvider<T> provider =
        instance.createRMFailoverProxyProvider(conf, protocol);
    return (T) RetryProxy.create(protocol, provider, retryPolicy);
  } else {
    InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
  }
}
 
示例15
@Unstable
public static Text getTokenService(Configuration conf, String address,
    String defaultAddr, int defaultPort) {
  if (HAUtil.isHAEnabled(conf)) {
    // Build a list of service addresses to form the service name
    ArrayList<String> services = new ArrayList<String>();
    YarnConfiguration yarnConf = new YarnConfiguration(conf);
    for (String rmId : HAUtil.getRMHAIds(conf)) {
      // Set RM_ID to get the corresponding RM_ADDRESS
      yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
      services.add(SecurityUtil.buildTokenService(
          yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
          .toString());
    }
    return new Text(Joiner.on(',').join(services));
  }

  // Non-HA case - no need to set RM_ID
  return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
    defaultAddr, defaultPort));
}
 
示例16
protected String findRedirectUrl() throws ServletException {
  String addr;
  if (proxyUriBases.size() == 1) {  // external proxy or not RM HA
    addr = proxyUriBases.values().iterator().next();
  } else {                          // RM HA
    YarnConfiguration conf = new YarnConfiguration();
    String activeRMId = RMHAUtils.findActiveRMHAId(conf);
    String addressPropertyPrefix = YarnConfiguration.useHttps(conf)
        ? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
        : YarnConfiguration.RM_WEBAPP_ADDRESS;
    String host = conf.get(
        HAUtil.addSuffix(addressPropertyPrefix, activeRMId));
    addr = proxyUriBases.get(host);
  }
  if (addr == null) {
    throw new ServletException(
        "Could not determine the proxy server for redirection");
  }
  return addr;
}
 
示例17
private synchronized void initResourceManager(int index, Configuration conf) {
  if (HAUtil.isHAEnabled(conf)) {
    conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
  }
  resourceManagers[index].init(conf);
  resourceManagers[index].getRMContext().getDispatcher().register(
      RMAppAttemptEventType.class,
      new EventHandler<RMAppAttemptEvent>() {
        public void handle(RMAppAttemptEvent event) {
          if (event instanceof RMAppAttemptRegistrationEvent) {
            appMasters.put(event.getApplicationAttemptId(),
                event.getTimestamp());
          } else if (event instanceof RMAppAttemptUnregistrationEvent) {
            appMasters.remove(event.getApplicationAttemptId());
          }
        }
      });
}
 
示例18
@Override
public void serviceInit(Configuration conf) throws Exception {
  if (rmContext.isHAEnabled()) {
    autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
    if (autoFailoverEnabled) {
      if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
        embeddedElector = createEmbeddedElectorService();
        addIfService(embeddedElector);
      }
    }
  }

  masterServiceBindAddress = conf.getSocketAddr(
      YarnConfiguration.RM_BIND_HOST,
      YarnConfiguration.RM_ADMIN_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
  daemonUser = UserGroupInformation.getCurrentUser();
  authorizer = YarnAuthorizationProvider.getInstance(conf);
  authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation
      .getCurrentUser());
  rmId = conf.get(YarnConfiguration.RM_HA_ID);
  super.serviceInit(conf);
}
 
示例19
/**
 * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
 * ZooKeeper access, construct the {@link ACL}s for the store's root node.
 * In the constructed {@link ACL}, all the users allowed by zkAcl are given
 * rwa access, while the current RM has exclude create-delete access.
 *
 * To be called only when HA is enabled and the configuration doesn't set ACL
 * for the root node.
 */
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
    Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
  List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
  for (ACL acl : sourceACLs) {
    zkRootNodeAcl.add(new ACL(
        ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
        acl.getId()));
  }

  zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
      YarnConfiguration.RM_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
  Id rmId = new Id(zkRootNodeAuthScheme,
      DigestAuthenticationProvider.generateDigest(
          zkRootNodeUsername + ":" + zkRootNodePassword));
  zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
  return zkRootNodeAcl;
}
 
示例20
@Override
public synchronized void startInternal() throws Exception {
  // createConnection for future API calls
  createConnection();

  // ensure root dirs exist
  createRootDirRecursively(znodeWorkingPath);
  createRootDir(zkRootNodePath);
  if (HAUtil.isHAEnabled(getConfig())){
    fence();
    verifyActiveStatusThread = new VerifyActiveStatusThread();
    verifyActiveStatusThread.start();
  }
  createRootDir(rmAppRoot);
  createRootDir(rmDTSecretManagerRoot);
  createRootDir(dtMasterKeysRootPath);
  createRootDir(delegationTokensRootPath);
  createRootDir(dtSequenceNumberPath);
  createRootDir(amrmTokenSecretManagerRoot);
}
 
示例21
@Before
public void setup() throws Exception {
  configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
  configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
  configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
  configuration.set(YarnConfiguration.RM_STORE,
      ZKRMStateStore.class.getName());
  configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
  configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
  configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
  configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
  int base = 100;
  for (String confKey : YarnConfiguration
      .getServiceAddressConfKeys(configuration)) {
    configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
        + (base + 20));
    configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
        + (base + 40));
    base = base * 2;
  }
  confForRM1 = new Configuration(configuration);
  confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
  confForRM2 = new Configuration(configuration);
  confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
}
 
示例22
@Before
public void setUp() throws Exception {
  configuration = new Configuration();
  UserGroupInformation.setConfiguration(configuration);
  configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
  configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
      + RM2_NODE_ID);
  for (String confKey : YarnConfiguration
      .getServiceAddressConfKeys(configuration)) {
    configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
    configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
    configuration.set(HAUtil.addSuffix(confKey, RM3_NODE_ID), RM3_ADDRESS);
  }

  // Enable webapp to test web-services also
  configuration.setBoolean(MockRM.ENABLE_WEBAPP, true);
  configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
  ClusterMetrics.destroy();
  QueueMetrics.clearQueueMetrics();
  DefaultMetricsSystem.shutdown();
}
 
示例23
private Configuration createHARMConf(
    String rmIds, String rmId, int adminPort) {
  Configuration conf = new YarnConfiguration();
  conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
  conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
  conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
  conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
  conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
  conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
  conf.set(YarnConfiguration.RM_HA_ID, rmId);
  conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");

  for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
    for (String id : HAUtil.getRMHAIds(conf)) {
      conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
    }
  }
  conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
      "localhost:" + adminPort);
  return conf;
}
 
示例24
@Override
protected HAServiceTarget resolveTarget(String rmId) {
  Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
  if (!rmIds.contains(rmId)) {
    StringBuilder msg = new StringBuilder();
    msg.append(rmId + " is not a valid serviceId. It should be one of ");
    for (String id : rmIds) {
      msg.append(id + " ");
    }
    throw new IllegalArgumentException(msg.toString());
  }
  try {
    YarnConfiguration conf = new YarnConfiguration(getConf());
    conf.set(YarnConfiguration.RM_HA_ID, rmId);
    return new RMHAServiceTarget(conf);
  } catch (IllegalArgumentException iae) {
    throw new YarnRuntimeException("Could not connect to " + rmId +
        "; the configuration for it might be missing");
  } catch (IOException ioe) {
    throw new YarnRuntimeException(
        "Could not connect to RM HA Admin for node " + rmId);
  }
}
 
示例25
public static String extractJobHistoryUrl(String yarnWebapp, Configuration conf) {
    Pattern pattern = Pattern.compile("(http[s]*://)([^:]*):([^/])*.*");
    Matcher m = pattern.matcher(yarnWebapp);
    Preconditions.checkArgument(m.matches(), "Yarn master URL" + yarnWebapp + " not right.");
    String defaultHistoryUrl = m.group(2) + ":19888";
    return m.group(1) + HAUtil.getConfValueForRMInstance(MR_JOB_HISTORY_URL_CONF_KEY, defaultHistoryUrl, conf);
}
 
示例26
public static List<String> getProxyHostsAndPortsForAmFilter(
    Configuration conf) {
  List<String> addrs = new ArrayList<String>();
  String proxyAddr = conf.get(YarnConfiguration.PROXY_ADDRESS);
  // If PROXY_ADDRESS isn't set, fallback to RM_WEBAPP(_HTTPS)_ADDRESS
  // There could be multiple if using RM HA
  if (proxyAddr == null || proxyAddr.isEmpty()) {
    // If RM HA is enabled, try getting those addresses
    if (HAUtil.isHAEnabled(conf)) {
      List<String> haAddrs =
          RMHAUtils.getRMHAWebappAddresses(new YarnConfiguration(conf));
      for (String addr : haAddrs) {
        try {
          InetSocketAddress socketAddr = NetUtils.createSocketAddr(addr);
          addrs.add(getResolvedAddress(socketAddr));
        } catch(IllegalArgumentException e) {
          // skip if can't resolve
        }
      }
    }
    // If couldn't resolve any of the addresses or not using RM HA, fallback
    if (addrs.isEmpty()) {
      addrs.add(getResolvedRMWebAppURLWithoutScheme(conf));
    }
  } else {
    addrs.add(proxyAddr);
  }
  return addrs;
}
 
示例27
public static String getResolvedRemoteRMWebAppURLWithoutScheme(Configuration conf,
    Policy httpPolicy) {
  InetSocketAddress address = null;
  String rmId = null;
  if (HAUtil.isHAEnabled(conf)) {
    // If HA enabled, pick one of the RM-IDs and rely on redirect to go to
    // the Active RM
    rmId = (String) HAUtil.getRMHAIds(conf).toArray()[0];
  }

  if (httpPolicy == Policy.HTTPS_ONLY) {
    address =
        conf.getSocketAddr(
            rmId == null
                ? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
                : HAUtil.addSuffix(
                YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, rmId),
            YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
            YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
  } else {
    address =
        conf.getSocketAddr(
            rmId == null
                ? YarnConfiguration.RM_WEBAPP_ADDRESS
                : HAUtil.addSuffix(
                YarnConfiguration.RM_WEBAPP_ADDRESS, rmId),
            YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
            YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
  }
  return getResolvedAddress(address);
}
 
示例28
public RMHAServiceTarget(YarnConfiguration conf)
    throws IOException {
  autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
  haAdminServiceAddress = conf.getSocketAddr(
      YarnConfiguration.RM_ADMIN_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
      YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
 
示例29
@Override
protected void serviceInit(Configuration conf)
    throws Exception {
  conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);

  String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
  if (zkQuorum == null) {
   throw new YarnRuntimeException("Embedded automatic failover " +
        "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
        " is not set");
  }

  String rmId = HAUtil.getRMHAId(conf);
  String clusterId = YarnConfiguration.getClusterId(conf);
  localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

  String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
      YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
  String electionZNode = zkBasePath + "/" + clusterId;

  long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
      YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

  List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
  List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);

  int maxRetryNum = conf.getInt(
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
      CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
  elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
      electionZNode, zkAcls, zkAuths, this, maxRetryNum);

  elector.ensureParentZNode();
  if (!isParentZnodeSafe(clusterId)) {
    notifyFatalError(electionZNode + " znode has invalid data! "+
        "Might need formatting!");
  }

  super.serviceInit(conf);
}
 
示例30
T runWithRetries() throws Exception {
  int retry = 0;
  while (true) {
    try {
      return runWithCheck();
    } catch (KeeperException.NoAuthException nae) {
      if (HAUtil.isHAEnabled(getConfig())) {
        // NoAuthException possibly means that this store is fenced due to
        // another RM becoming active. Even if not,
        // it is safer to assume we have been fenced
        throw new StoreFencedException();
      }
    } catch (KeeperException ke) {
      if (ke.code() == Code.NODEEXISTS) {
        LOG.info("znode already exists!");
        return null;
      }
      if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
        LOG.info("znode has already been deleted!");
        return null;
      }

      LOG.info("Exception while executing a ZK operation.", ke);
      if (shouldRetry(ke.code()) && ++retry < numRetries) {
        LOG.info("Retrying operation on ZK. Retry no. " + retry);
        Thread.sleep(zkRetryInterval);
        createConnection();
        continue;
      }
      LOG.info("Maxed out ZK retries. Giving up!");
      throw ke;
    }
  }
}