Java源码示例:org.apache.kafka.clients.admin.ConfigEntry

示例1
@Test
public void typical() throws Exception {
  String topic = "topic";
  Collection<String> topics = singleton(topic);
  ConfigResource configResource = new ConfigResource(TOPIC, topic);
  Config config = new Config(singleton(new ConfigEntry("retention.ms", "1")));
  KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = completedFuture(singletonMap(configResource, config));

  doReturn(describeConfigsResult).when(adminClient).describeConfigs(any());
  doReturn(kafkaFuture).when(describeConfigsResult).all();

  Map<String, Duration> result = underTest.apply(topics);

  assertThat(result.size(), is(1));
  Duration retention = result.get(topic);
  assertThat(retention, is(Duration.ofMillis(1)));
}
 
示例2
/**
 * Transform a TopicDescription instance to ConfiguredTopic instance.
 *
 * @param td  an instance of TopicDescription
 * @param ktc a topic config future
 * @return an instance of ConfiguredTopic
 */
static ConfiguredTopic configuredTopic(TopicDescription td, KafkaFuture<Config> ktc) {
  int partitions = td.partitions().size();
  short replication = (short) td.partitions().iterator().next().replicas().size();
  try {
    Config tc = ktc.get();
    Map<String, String> configMap = tc
        .entries()
        .stream()
        .filter(TopicServiceImpl::isNonDefault)
        .collect(toMap(ConfigEntry::name, ConfigEntry::value));
    return new ConfiguredTopic(td.name(), partitions, replication, configMap);
  } catch (InterruptedException | ExecutionException e) {
    // TODO: FA-10109: Improve exception handling
    throw new RuntimeException(e);
  }
}
 
示例3
private void showTopicConfigPropertiesWindow(KafkaClusterProxy kafkaClusterProxy,
                                             String topicName) {

    final Set<ConfigEntry> topicProperties = kafkaClusterProxy.getTopicProperties(topicName);
    try {
        ConfigEntriesView entriesView = new ConfigEntriesView("Topic properties", topicProperties, topicPropertiesViewPreferences);
        final TopicPropertiesWindow topicPropertiesWindow = TopicPropertiesWindow.get(topicName,
                                                                                      entriesView,
                                                                                      kafkaClusterProxy.getTopicOffsetsInfo());
        topicPropertiesWindow.show();


    } catch (IOException e) {
        e.printStackTrace();
    }
}
 
示例4
public String getTopicPropertyByName(String topicName, String propertyName) {
    final Optional<ClusterTopicInfo> found = topicsInfo.stream().filter(e -> e.getTopicName().equals(topicName)).findFirst();
    if (!found.isPresent()) {
        throw new RuntimeException(String.format("Topic with name '%s' not found", topicName));
    }
    final ClusterTopicInfo clusterTopicInfo = found.get();
    final Optional<ConfigEntry> propertyFound = clusterTopicInfo
        .getConfigEntries()
        .stream()
        .filter(e -> e.name().equalsIgnoreCase(propertyName)).findFirst();
    if (!propertyFound.isPresent()) {
        throw new RuntimeException(String.format("Could not find property '%s' for topic '%s' ", propertyName, topicName));
    }
    return propertyFound.get().value();

}
 
示例5
/**
 * Updates the given topic's config with the {@link Properties} provided. This is not additive but a full
 * replacement
 *
 * @param topic
 *      the topic to update config for
 * @param properties
 *      the properties to assign to the topic
 * @throws IllegalArgumentException
 *      if topic is null, empty or blank, or properties is {@code null}
 * @throws AdminOperationException
 *      if there is an issue updating the topic config
 */
public void updateTopicConfig(String topic, Properties properties) {
    if (StringUtils.isBlank(topic))
        throw new IllegalArgumentException("topic cannot be null, empty or blank");
    if (properties == null)
        throw new IllegalArgumentException("properties cannot be null");

    LOG.debug("Updating topic config for topic [{}] with config [{}]", topic, properties);

    try {
        List<ConfigEntry> configEntries = new ArrayList<>();
        for (String property : properties.stringPropertyNames()) {
            configEntries.add(new ConfigEntry(property, properties.getProperty(property)));
        }

        getNewAdminClient()
            .alterConfigs(
                Collections.singletonMap(
                    new ConfigResource(ConfigResource.Type.TOPIC, topic),
                    new Config(configEntries)))
            .all()
            .get(operationTimeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        throw new AdminOperationException("Unable to update configuration for topic: " + topic, e);
    }
}
 
示例6
private String deleteTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
	try {
		String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
		JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
		object.remove(configEntry.name());
		List<ConfigEntry> configEntrys = new ArrayList<>();
		for (String key : KConstants.Topic.getTopicConfigKeys()) {
			if (object.containsKey(key)) {
				configEntrys.add(new ConfigEntry(key, object.getString(key)));
			}
		}
		Map<ConfigResource, Config> configs = new HashMap<>();
		ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
		Config config = new Config(configEntrys);
		configs.put(configRes, config);
		adminClient.alterConfigs(configs);
		return KConstants.Topic.SUCCESS;
	} catch (Exception e) {
		e.printStackTrace();
		LOG.error("Delete topic[" + topic + "] config has error, msg is " + e.getMessage());
		return e.getMessage();
	}
}
 
示例7
/**
 * Create a Topic to reflect the given TopicMetadata.
 */
public static Topic fromTopicMetadata(TopicMetadata meta) {
    if (meta == null) {
        return null;
    }
    Topic.Builder builder = new Topic.Builder()
            .withTopicName(meta.getDescription().name())
            .withNumPartitions(meta.getDescription().partitions().size())
            .withNumReplicas((short) meta.getDescription().partitions().get(0).replicas().size())
            .withMetadata(null);
    for (ConfigEntry entry: meta.getConfig().entries()) {
        if (entry.source() != ConfigEntry.ConfigSource.DEFAULT_CONFIG
            && entry.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) {
            builder.withConfigEntry(entry.name(), entry.value());
        }
    }
    return builder.build();
}
 
示例8
protected String alterTopicConfigInKafka(String topicName, String key, Function<String, String> mutator) throws InterruptedException, ExecutionException {
    // Get the topic config
    ConfigResource configResource = topicConfigResource(topicName);
    org.apache.kafka.clients.admin.Config config = getTopicConfig(configResource);

    Map<String, ConfigEntry> m = new HashMap<>();
    for (ConfigEntry entry: config.entries()) {
        if (entry.name().equals(key)
            || entry.source() != ConfigEntry.ConfigSource.DEFAULT_CONFIG
                && entry.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) {
            m.put(entry.name(), entry);
        }
    }
    final String changedValue = mutator.apply(m.get(key).value());
    m.put(key, new ConfigEntry(key, changedValue));
    LOGGER.info("Changing topic config {} to {}", key, changedValue);

    // Update the topic config
    AlterConfigsResult cgf = adminClient.alterConfigs(singletonMap(configResource,
            new org.apache.kafka.clients.admin.Config(m.values())));
    cgf.all().get();
    return changedValue;
}
 
示例9
public static TopicMetadata getTopicMetadata(Topic kubeTopic) {
    List<Node> nodes = new ArrayList<>();
    for (int nodeId = 0; nodeId < kubeTopic.getNumReplicas(); nodeId++) {
        nodes.add(new Node(nodeId, "localhost", 9092 + nodeId));
    }
    List<TopicPartitionInfo> partitions = new ArrayList<>();
    for (int partitionId = 0; partitionId < kubeTopic.getNumPartitions(); partitionId++) {
        partitions.add(new TopicPartitionInfo(partitionId, nodes.get(0), nodes, nodes));
    }
    List<ConfigEntry> configs = new ArrayList<>();
    for (Map.Entry<String, String> entry: kubeTopic.getConfig().entrySet()) {
        configs.add(new ConfigEntry(entry.getKey(), entry.getValue()));
    }

    return new TopicMetadata(new TopicDescription(kubeTopic.getTopicName().toString(), false,
            partitions), new Config(configs));
}
 
示例10
private void updateTopicConfigPostAK23(Topic topic, String fullTopicName)
    throws ExecutionException, InterruptedException {

  Config currentConfigs = getActualTopicConfig(fullTopicName);

  Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
  ArrayList<AlterConfigOp> listOfValues = new ArrayList<>();

  topic
      .rawConfig()
      .forEach(
          (configKey, configValue) -> {
            listOfValues.add(
                new AlterConfigOp(new ConfigEntry(configKey, configValue), OpType.SET));
          });
  Set<String> newEntryKeys = topic.rawConfig().keySet();

  currentConfigs
      .entries()
      .forEach(
          entry -> {
            if (!newEntryKeys.contains(entry.name())) {
              listOfValues.add(new AlterConfigOp(entry, OpType.DELETE));
            }
          });

  configs.put(new ConfigResource(Type.TOPIC, fullTopicName), listOfValues);

  adminClient.incrementalAlterConfigs(configs).all().get();
}
 
示例11
/**
 * Transform free form topic config to a {@link Config} instance.
 *
 * @param topic an instance of {@link ConfiguredTopic}
 * @return an instance of {@link Config}
 */
static Config resourceConfig(ConfiguredTopic topic) {
  return new Config(topic.getConfig()
      .entrySet()
      .stream()
      .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
      .collect(toList()));
}
 
示例12
/**
 * Check if the given topic configuration is set to default or otherwise.
 *
 * @param e a config entry
 * @return true if config is not set to its default value
 */
static boolean isNonDefault(ConfigEntry e) {
  // DYNAMIC_TOPIC_CONFIG is a config that is configured for a specific topic.
  // For per topic config enforcement, that is the only type which interest
  // us since that indicates an override. All other ConfigSource types are
  // either default provided by kafka or default set by the cluster admin in
  // broker properties.
  return e.source().equals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
}
 
示例13
@Test
public void testConfiguredTopic() {
  Cluster cluster = createCluster(1);
  TopicPartitionInfo tp = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
  TopicDescription td = new TopicDescription("test", false, Collections.singletonList(tp));
  ConfigEntry configEntry = mock(ConfigEntry.class);
  when(configEntry.source()).thenReturn(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
  KafkaFuture<Config> kfc = KafkaFuture.completedFuture(new Config(Collections.singletonList(configEntry)));
  ConfiguredTopic expected = new ConfiguredTopic("test", 1, (short) 1, Collections.emptyMap());
  Assert.assertEquals(expected, TopicServiceImpl.configuredTopic(td, kfc));
}
 
示例14
@Test
public void testListExisting() {
  Cluster cluster = createCluster(1);
  TopicPartitionInfo tp = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
  ConfigEntry configEntry = new ConfigEntry("k", "v");
  KafkaFuture<Config> kfc = KafkaFuture.completedFuture(new Config(Collections.singletonList(configEntry)));
  Set<String> topicNames = new HashSet<>(Arrays.asList("a", "b", "_c"));
  Map<String, TopicDescription> tds = new HashMap<String, TopicDescription>() {
    {
      put("a", new TopicDescription("a", false, Collections.singletonList(tp)));
      put("b", new TopicDescription("b", false, Collections.singletonList(tp)));
      put("c", new TopicDescription("_c", false, Collections.singletonList(tp)));
    }
  };
  Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<ConfigResource, KafkaFuture<Config>>() {
    {
      put(new ConfigResource(TOPIC, "a"), kfc);
      put(new ConfigResource(TOPIC, "b"), kfc);
      put(new ConfigResource(TOPIC, "_c"), kfc);
    }
  };

  TopicService service = new TopicServiceImpl(adminClient, true);
  ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
  DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class);
  DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);

  when(describeTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(tds));
  when(listTopicsResult.names()).thenReturn(KafkaFuture.completedFuture(topicNames));
  when(describeConfigsResult.values()).thenReturn(configs);
  when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
  when(adminClient.describeTopics(topicNames)).thenReturn(describeTopicsResult);
  when(adminClient.describeConfigs(any(Collection.class))).thenReturn(describeConfigsResult);

  Map<String, ConfiguredTopic> actual = service.listExisting(true);
  Assert.assertEquals(2, actual.size());
  Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b")), actual.keySet());
}
 
示例15
@Test
public void shouldAddTopicConfig() {
  final Map<String, ?> overrides = ImmutableMap.of(
      TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
  );

  expect(adminClient.describeConfigs(topicConfigsRequest("peter")))
      .andReturn(topicConfigResponse(
          "peter",
          overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
          defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
      ));

  expect(adminClient.alterConfigs(
      withResourceConfig(
          new ConfigResource(ConfigResource.Type.TOPIC, "peter"),
          new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
          new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
      )))
      .andReturn(alterTopicConfigResponse());
  replay(adminClient);

  KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
  kafkaTopicClient.addTopicConfig("peter", overrides);

  verify(adminClient);
}
 
示例16
private DescribeConfigsResult describeBrokerResult() {
  DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
  ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
  List<ConfigEntry> configEntries = new ArrayList<>();
  configEntries.add(configEntryDeleteEnable);
  Map<ConfigResource, Config> config = ImmutableMap.of(
      new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(configEntries));
  expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config));
  replay(describeConfigsResult);
  return describeConfigsResult;
}
 
示例17
private ConfigEntry defaultConfigEntry(final String key, final String value) {
  final ConfigEntry config = mock(ConfigEntry.class);
  expect(config.name()).andReturn(key);
  expect(config.value()).andReturn(value);
  expect(config.source()).andReturn(ConfigEntry.ConfigSource.DEFAULT_CONFIG);
  replay(config);
  return config;
}
 
示例18
private ConfigEntry overriddenConfigEntry(final String key, final String value) {
  final ConfigEntry config = mock(ConfigEntry.class);
  expect(config.name()).andReturn(key);
  expect(config.value()).andReturn(value);
  expect(config.source()).andReturn(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
  replay(config);
  return config;
}
 
示例19
private static DescribeConfigsResult topicConfigResponse(final String topicName,
                                                         final ConfigEntry... entries) {

  final Map<ConfigResource, Config> config = ImmutableMap.of(
      new ConfigResource(ConfigResource.Type.TOPIC, topicName),
      new Config(Arrays.asList(entries)));

  final DescribeConfigsResult response = mock(DescribeConfigsResult.class);
  expect(response.all()).andReturn(KafkaFuture.completedFuture(config));
  replay(response);
  return response;
}
 
示例20
private static Map<ConfigResource, Config> withResourceConfig(final ConfigResource resource,
                                                              final ConfigEntry... entries) {
  final Set<ConfigEntry> expected = Arrays.stream(entries)
      .collect(Collectors.toSet());

  class ConfigMatcher implements IArgumentMatcher {
    @SuppressWarnings("unchecked")
    @Override
    public boolean matches(final Object argument) {
      final Map<ConfigResource, Config> request = (Map<ConfigResource, Config>)argument;
      if (request.size() != 1) {
        return false;
      }

      final Config config = request.get(resource);
      if (config == null) {
        return false;
      }

      final Set<ConfigEntry> actual = new HashSet<>(config.entries());
      return actual.equals(expected);
    }

    @Override
    public void appendTo(final StringBuffer buffer) {
      buffer.append(resource).append("->")
          .append("Config{").append(expected).append("}");
    }
  }
  EasyMock.reportMatcher(new ConfigMatcher());
  return null;
}
 
示例21
public static String configEntriesToPrettyString(Collection<ConfigEntry> entries) {
    StringBuilder b = new StringBuilder();
    entries.forEach(entry -> {
        b.append(String.format("%s\n", entry));
    });
    return b.toString();
}
 
示例22
public ConfigEntriesView(String title,
                         Set<ConfigEntry> entries,
                         ConfigEntriesViewPreferences columnWidths) throws IOException {
    this.title = title;
    observableEntries.setAll(entries);
    this.columnWidths = columnWidths;

    CustomFxWidgetsLoader.loadOnAnchorPane(this, FXML_FILE);
}
 
示例23
private void refreshClusterSummaryPaneContent(KafkaClusterProxy proxy) {
    Logger.trace("Refreshing cluster pane");
    clusterConfigEntriesTabPane.getTabs().clear();

    final Set<ClusterNodeInfo> nodeInfos = proxy.getNodesInfo();
    List<Tab> tabs = new ArrayList<>();

    for (ClusterNodeInfo nodeInfo : nodeInfos) {

        final Set<ConfigEntry> entries = nodeInfo.getEntries();
        if (entries == null || entries.isEmpty()) {
            continue;
        }

        ConfigEntriesView clusterPropertiesTableView;
        try {
            clusterPropertiesTableView = new ConfigEntriesView("Node properties", entries, clusterPropertiesViewPreferences);
        } catch (IOException e) {
            e.printStackTrace();
            continue;
        }

        Tab nodeTab = getNodeTab(nodeInfo, clusterPropertiesTableView);
        tabs.add(nodeTab);

    }
    addTabsToClusterConfigPane(tabs);
}
 
示例24
public ClusterTopicInfo(String topicName,
                        List<TopicPartitionInfo> partitions,
                        Set<ConfigEntry> configEntries) {

    this.topicName = topicName;
    this.partitions = partitions;
    this.configEntries = configEntries;
}
 
示例25
public Set<ConfigEntry> getConfigEntriesForTopic(String topicName) {
    final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
    final DescribeConfigsResult topicConfiEntries = kafkaClientsAdminClient.describeConfigs(Collections.singleton(configResource));
    try {
        final Config config = topicConfiEntries.all().get(ApplicationConstants.FUTURE_GET_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(configResource);
        final Collection<ConfigEntry> entries = config.entries();
        Logger.debug(String.format("Config entries for topic '%s' : %n%s", topicName, AppUtils.configEntriesToPrettyString(entries)));
        return new HashSet<>(entries);
    } catch (Exception e) {
        Logger.error(String.format("Could not retrieve config resource for topic '%s'", topicName), e);
    }
    return Collections.emptySet();
}
 
示例26
public Set<ConfigEntry> getTopicProperties(String topicName) {
    // just get first topicsInfo for first node,
    // it should be the same on rest of nodes any way
    for (ClusterTopicInfo clusterTopicInfo : topicsInfo) {
        if (clusterTopicInfo.getTopicName().equals(topicName)) {
            return clusterTopicInfo.getConfigEntries();
        }
    }
    return Collections.emptySet();
}
 
示例27
@Override
public void updateTopic(TopicAlterableProperties topicDetails) {
    Map<ConfigResource, Config> configs = new HashMap<>();

    final ArrayList<ConfigEntry> configEntries = new ArrayList<>();

    configEntries.add(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG,
                                      String.valueOf(topicDetails.getRetentionMilliseconds())));

    final Config config = new Config(configEntries);
    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, topicDetails.getTopicName()), config);
    kafkaClientsAdminClient.alterConfigs(configs);
}
 
示例28
/**
 * Returns the {@link Properties} associated to the topic
 *
 * @param topic
 *      a Kafka topic
 * @return the {@link Properties} associated to the topic
 * @throws IllegalArgumentException
 *      if topic is null, empty or blank
 * @throws AdminOperationException
 *      if there is an issue reading the topic config
 */
public Properties getTopicConfig(String topic) {
    if (StringUtils.isBlank(topic))
        throw new IllegalArgumentException("topic cannot be null, empty or blank");

    LOG.debug("Fetching topic config for topic [{}]", topic);

    try {
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Map<ConfigResource, Config> configs = getNewAdminClient()
            .describeConfigs(Collections.singleton(resource))
            .all()
            .get(operationTimeout, TimeUnit.MILLISECONDS);
        Config config = configs.get(resource);
        if (config == null) {
            throw new AdminOperationException("Unable to get topic config: " + topic);
        }

        Properties properties = new Properties();
        config.entries().stream()
            // We are only interested in any overrides that are set
            .filter(configEntry -> configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
            .forEach(configEntry -> properties.setProperty(configEntry.name(), configEntry.value()));
        return properties;
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        throw new AdminOperationException("Unable to retrieve configuration for topic: " + topic, e);
    }
}
 
示例29
/**
 * Add config altering operations to the given configs to alter for configs that differ between current and desired.
 *
 * @param configsToAlter A set of config altering operations to be populated.
 * @param desiredConfig Desired config value by name.
 * @param currentConfig Current config.
 */
private static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter, Map<String, String> desiredConfig, Config currentConfig) {
  for (Map.Entry<String, String> entry : desiredConfig.entrySet()) {
    String configName = entry.getKey();
    String targetConfigValue = entry.getValue();
    ConfigEntry currentConfigEntry = currentConfig.get(configName);
    if (currentConfigEntry == null || !currentConfigEntry.value().equals(targetConfigValue)) {
      configsToAlter.add(new AlterConfigOp(new ConfigEntry(configName, targetConfigValue), AlterConfigOp.OpType.SET));
    }
  }
}
 
示例30
@Test
public void testMaybeUpdateTopicConfig() throws InterruptedException, ExecutionException, TimeoutException {
  AdminClient adminClient = EasyMock.createMock(AdminClient.class);
  DescribeConfigsResult describeConfigsResult = EasyMock.createMock(DescribeConfigsResult.class);
  KafkaFuture<Config> describedConfigsFuture = EasyMock.createMock(KafkaFuture.class);
  Config topicConfig = EasyMock.createMock(Config.class);
  AlterConfigsResult alterConfigsResult = EasyMock.createMock(AlterConfigsResult.class);
  Set<AlterConfigOp> alterConfigOps = Collections.singleton(new AlterConfigOp(
      new ConfigEntry(RetentionMsProp(), Long.toString(MOCK_DESIRED_RETENTION_MS)), AlterConfigOp.OpType.SET));
  Map<ConfigResource, KafkaFuture<Config>> describeConfigsValues = Collections.singletonMap(MOCK_TOPIC_RESOURCE,
                                                                                            describedConfigsFuture);
  Map<ConfigResource, KafkaFuture<Void>> alterConfigsValues = Collections.singletonMap(MOCK_TOPIC_RESOURCE,
                                                                                       EasyMock.createMock(KafkaFuture.class));

  NewTopic topicToUpdateConfigs = SamplingUtils.wrapTopic(MOCK_TOPIC, MOCK_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_DESIRED_RETENTION_MS);
  EasyMock.expect(adminClient.describeConfigs(EasyMock.eq(Collections.singleton(MOCK_TOPIC_RESOURCE)))).andReturn(describeConfigsResult);
  EasyMock.expect(describeConfigsResult.values()).andReturn(describeConfigsValues);
  EasyMock.expect(describedConfigsFuture.get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)).andReturn(topicConfig);
  EasyMock.expect(topicConfig.get(EasyMock.eq(CleanupPolicyProp()))).andReturn(new ConfigEntry(CleanupPolicyProp(),
                                                                                               DEFAULT_CLEANUP_POLICY));
  EasyMock.expect(topicConfig.get(EasyMock.eq(RetentionMsProp()))).andReturn(new ConfigEntry(RetentionMsProp(),
                                                                                             MOCK_CURRENT_RETENTION_MS));
  EasyMock.expect(adminClient.incrementalAlterConfigs(EasyMock.eq(Collections.singletonMap(MOCK_TOPIC_RESOURCE,
                                                                                           alterConfigOps))))
          .andReturn(alterConfigsResult);
  EasyMock.expect(alterConfigsResult.values()).andReturn(alterConfigsValues);
  EasyMock.replay(adminClient, describeConfigsResult, describedConfigsFuture, topicConfig, alterConfigsResult);


  boolean updateTopicConfig = SamplingUtils.maybeUpdateTopicConfig(adminClient, topicToUpdateConfigs);
  EasyMock.verify(adminClient, describeConfigsResult, describedConfigsFuture, topicConfig, alterConfigsResult);
  assertTrue(updateTopicConfig);
}