Java源码示例:org.apache.kafka.clients.admin.ConfigEntry
示例1
@Test
public void typical() throws Exception {
String topic = "topic";
Collection<String> topics = singleton(topic);
ConfigResource configResource = new ConfigResource(TOPIC, topic);
Config config = new Config(singleton(new ConfigEntry("retention.ms", "1")));
KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = completedFuture(singletonMap(configResource, config));
doReturn(describeConfigsResult).when(adminClient).describeConfigs(any());
doReturn(kafkaFuture).when(describeConfigsResult).all();
Map<String, Duration> result = underTest.apply(topics);
assertThat(result.size(), is(1));
Duration retention = result.get(topic);
assertThat(retention, is(Duration.ofMillis(1)));
}
示例2
/**
* Transform a TopicDescription instance to ConfiguredTopic instance.
*
* @param td an instance of TopicDescription
* @param ktc a topic config future
* @return an instance of ConfiguredTopic
*/
static ConfiguredTopic configuredTopic(TopicDescription td, KafkaFuture<Config> ktc) {
int partitions = td.partitions().size();
short replication = (short) td.partitions().iterator().next().replicas().size();
try {
Config tc = ktc.get();
Map<String, String> configMap = tc
.entries()
.stream()
.filter(TopicServiceImpl::isNonDefault)
.collect(toMap(ConfigEntry::name, ConfigEntry::value));
return new ConfiguredTopic(td.name(), partitions, replication, configMap);
} catch (InterruptedException | ExecutionException e) {
// TODO: FA-10109: Improve exception handling
throw new RuntimeException(e);
}
}
示例3
private void showTopicConfigPropertiesWindow(KafkaClusterProxy kafkaClusterProxy,
String topicName) {
final Set<ConfigEntry> topicProperties = kafkaClusterProxy.getTopicProperties(topicName);
try {
ConfigEntriesView entriesView = new ConfigEntriesView("Topic properties", topicProperties, topicPropertiesViewPreferences);
final TopicPropertiesWindow topicPropertiesWindow = TopicPropertiesWindow.get(topicName,
entriesView,
kafkaClusterProxy.getTopicOffsetsInfo());
topicPropertiesWindow.show();
} catch (IOException e) {
e.printStackTrace();
}
}
示例4
public String getTopicPropertyByName(String topicName, String propertyName) {
final Optional<ClusterTopicInfo> found = topicsInfo.stream().filter(e -> e.getTopicName().equals(topicName)).findFirst();
if (!found.isPresent()) {
throw new RuntimeException(String.format("Topic with name '%s' not found", topicName));
}
final ClusterTopicInfo clusterTopicInfo = found.get();
final Optional<ConfigEntry> propertyFound = clusterTopicInfo
.getConfigEntries()
.stream()
.filter(e -> e.name().equalsIgnoreCase(propertyName)).findFirst();
if (!propertyFound.isPresent()) {
throw new RuntimeException(String.format("Could not find property '%s' for topic '%s' ", propertyName, topicName));
}
return propertyFound.get().value();
}
示例5
/**
* Updates the given topic's config with the {@link Properties} provided. This is not additive but a full
* replacement
*
* @param topic
* the topic to update config for
* @param properties
* the properties to assign to the topic
* @throws IllegalArgumentException
* if topic is null, empty or blank, or properties is {@code null}
* @throws AdminOperationException
* if there is an issue updating the topic config
*/
public void updateTopicConfig(String topic, Properties properties) {
if (StringUtils.isBlank(topic))
throw new IllegalArgumentException("topic cannot be null, empty or blank");
if (properties == null)
throw new IllegalArgumentException("properties cannot be null");
LOG.debug("Updating topic config for topic [{}] with config [{}]", topic, properties);
try {
List<ConfigEntry> configEntries = new ArrayList<>();
for (String property : properties.stringPropertyNames()) {
configEntries.add(new ConfigEntry(property, properties.getProperty(property)));
}
getNewAdminClient()
.alterConfigs(
Collections.singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, topic),
new Config(configEntries)))
.all()
.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AdminOperationException("Unable to update configuration for topic: " + topic, e);
}
}
示例6
private String deleteTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
try {
String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
object.remove(configEntry.name());
List<ConfigEntry> configEntrys = new ArrayList<>();
for (String key : KConstants.Topic.getTopicConfigKeys()) {
if (object.containsKey(key)) {
configEntrys.add(new ConfigEntry(key, object.getString(key)));
}
}
Map<ConfigResource, Config> configs = new HashMap<>();
ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
Config config = new Config(configEntrys);
configs.put(configRes, config);
adminClient.alterConfigs(configs);
return KConstants.Topic.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
LOG.error("Delete topic[" + topic + "] config has error, msg is " + e.getMessage());
return e.getMessage();
}
}
示例7
/**
* Create a Topic to reflect the given TopicMetadata.
*/
public static Topic fromTopicMetadata(TopicMetadata meta) {
if (meta == null) {
return null;
}
Topic.Builder builder = new Topic.Builder()
.withTopicName(meta.getDescription().name())
.withNumPartitions(meta.getDescription().partitions().size())
.withNumReplicas((short) meta.getDescription().partitions().get(0).replicas().size())
.withMetadata(null);
for (ConfigEntry entry: meta.getConfig().entries()) {
if (entry.source() != ConfigEntry.ConfigSource.DEFAULT_CONFIG
&& entry.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) {
builder.withConfigEntry(entry.name(), entry.value());
}
}
return builder.build();
}
示例8
protected String alterTopicConfigInKafka(String topicName, String key, Function<String, String> mutator) throws InterruptedException, ExecutionException {
// Get the topic config
ConfigResource configResource = topicConfigResource(topicName);
org.apache.kafka.clients.admin.Config config = getTopicConfig(configResource);
Map<String, ConfigEntry> m = new HashMap<>();
for (ConfigEntry entry: config.entries()) {
if (entry.name().equals(key)
|| entry.source() != ConfigEntry.ConfigSource.DEFAULT_CONFIG
&& entry.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) {
m.put(entry.name(), entry);
}
}
final String changedValue = mutator.apply(m.get(key).value());
m.put(key, new ConfigEntry(key, changedValue));
LOGGER.info("Changing topic config {} to {}", key, changedValue);
// Update the topic config
AlterConfigsResult cgf = adminClient.alterConfigs(singletonMap(configResource,
new org.apache.kafka.clients.admin.Config(m.values())));
cgf.all().get();
return changedValue;
}
示例9
public static TopicMetadata getTopicMetadata(Topic kubeTopic) {
List<Node> nodes = new ArrayList<>();
for (int nodeId = 0; nodeId < kubeTopic.getNumReplicas(); nodeId++) {
nodes.add(new Node(nodeId, "localhost", 9092 + nodeId));
}
List<TopicPartitionInfo> partitions = new ArrayList<>();
for (int partitionId = 0; partitionId < kubeTopic.getNumPartitions(); partitionId++) {
partitions.add(new TopicPartitionInfo(partitionId, nodes.get(0), nodes, nodes));
}
List<ConfigEntry> configs = new ArrayList<>();
for (Map.Entry<String, String> entry: kubeTopic.getConfig().entrySet()) {
configs.add(new ConfigEntry(entry.getKey(), entry.getValue()));
}
return new TopicMetadata(new TopicDescription(kubeTopic.getTopicName().toString(), false,
partitions), new Config(configs));
}
示例10
private void updateTopicConfigPostAK23(Topic topic, String fullTopicName)
throws ExecutionException, InterruptedException {
Config currentConfigs = getActualTopicConfig(fullTopicName);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
ArrayList<AlterConfigOp> listOfValues = new ArrayList<>();
topic
.rawConfig()
.forEach(
(configKey, configValue) -> {
listOfValues.add(
new AlterConfigOp(new ConfigEntry(configKey, configValue), OpType.SET));
});
Set<String> newEntryKeys = topic.rawConfig().keySet();
currentConfigs
.entries()
.forEach(
entry -> {
if (!newEntryKeys.contains(entry.name())) {
listOfValues.add(new AlterConfigOp(entry, OpType.DELETE));
}
});
configs.put(new ConfigResource(Type.TOPIC, fullTopicName), listOfValues);
adminClient.incrementalAlterConfigs(configs).all().get();
}
示例11
/**
* Transform free form topic config to a {@link Config} instance.
*
* @param topic an instance of {@link ConfiguredTopic}
* @return an instance of {@link Config}
*/
static Config resourceConfig(ConfiguredTopic topic) {
return new Config(topic.getConfig()
.entrySet()
.stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue()))
.collect(toList()));
}
示例12
/**
* Check if the given topic configuration is set to default or otherwise.
*
* @param e a config entry
* @return true if config is not set to its default value
*/
static boolean isNonDefault(ConfigEntry e) {
// DYNAMIC_TOPIC_CONFIG is a config that is configured for a specific topic.
// For per topic config enforcement, that is the only type which interest
// us since that indicates an override. All other ConfigSource types are
// either default provided by kafka or default set by the cluster admin in
// broker properties.
return e.source().equals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
}
示例13
@Test
public void testConfiguredTopic() {
Cluster cluster = createCluster(1);
TopicPartitionInfo tp = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
TopicDescription td = new TopicDescription("test", false, Collections.singletonList(tp));
ConfigEntry configEntry = mock(ConfigEntry.class);
when(configEntry.source()).thenReturn(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
KafkaFuture<Config> kfc = KafkaFuture.completedFuture(new Config(Collections.singletonList(configEntry)));
ConfiguredTopic expected = new ConfiguredTopic("test", 1, (short) 1, Collections.emptyMap());
Assert.assertEquals(expected, TopicServiceImpl.configuredTopic(td, kfc));
}
示例14
@Test
public void testListExisting() {
Cluster cluster = createCluster(1);
TopicPartitionInfo tp = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
ConfigEntry configEntry = new ConfigEntry("k", "v");
KafkaFuture<Config> kfc = KafkaFuture.completedFuture(new Config(Collections.singletonList(configEntry)));
Set<String> topicNames = new HashSet<>(Arrays.asList("a", "b", "_c"));
Map<String, TopicDescription> tds = new HashMap<String, TopicDescription>() {
{
put("a", new TopicDescription("a", false, Collections.singletonList(tp)));
put("b", new TopicDescription("b", false, Collections.singletonList(tp)));
put("c", new TopicDescription("_c", false, Collections.singletonList(tp)));
}
};
Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<ConfigResource, KafkaFuture<Config>>() {
{
put(new ConfigResource(TOPIC, "a"), kfc);
put(new ConfigResource(TOPIC, "b"), kfc);
put(new ConfigResource(TOPIC, "_c"), kfc);
}
};
TopicService service = new TopicServiceImpl(adminClient, true);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class);
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
when(describeTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(tds));
when(listTopicsResult.names()).thenReturn(KafkaFuture.completedFuture(topicNames));
when(describeConfigsResult.values()).thenReturn(configs);
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
when(adminClient.describeTopics(topicNames)).thenReturn(describeTopicsResult);
when(adminClient.describeConfigs(any(Collection.class))).thenReturn(describeConfigsResult);
Map<String, ConfiguredTopic> actual = service.listExisting(true);
Assert.assertEquals(2, actual.size());
Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b")), actual.keySet());
}
示例15
@Test
public void shouldAddTopicConfig() {
final Map<String, ?> overrides = ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
);
expect(adminClient.describeConfigs(topicConfigsRequest("peter")))
.andReturn(topicConfigResponse(
"peter",
overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
));
expect(adminClient.alterConfigs(
withResourceConfig(
new ConfigResource(ConfigResource.Type.TOPIC, "peter"),
new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
)))
.andReturn(alterTopicConfigResponse());
replay(adminClient);
KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
kafkaTopicClient.addTopicConfig("peter", overrides);
verify(adminClient);
}
示例16
private DescribeConfigsResult describeBrokerResult() {
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
List<ConfigEntry> configEntries = new ArrayList<>();
configEntries.add(configEntryDeleteEnable);
Map<ConfigResource, Config> config = ImmutableMap.of(
new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(configEntries));
expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config));
replay(describeConfigsResult);
return describeConfigsResult;
}
示例17
private ConfigEntry defaultConfigEntry(final String key, final String value) {
final ConfigEntry config = mock(ConfigEntry.class);
expect(config.name()).andReturn(key);
expect(config.value()).andReturn(value);
expect(config.source()).andReturn(ConfigEntry.ConfigSource.DEFAULT_CONFIG);
replay(config);
return config;
}
示例18
private ConfigEntry overriddenConfigEntry(final String key, final String value) {
final ConfigEntry config = mock(ConfigEntry.class);
expect(config.name()).andReturn(key);
expect(config.value()).andReturn(value);
expect(config.source()).andReturn(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
replay(config);
return config;
}
示例19
private static DescribeConfigsResult topicConfigResponse(final String topicName,
final ConfigEntry... entries) {
final Map<ConfigResource, Config> config = ImmutableMap.of(
new ConfigResource(ConfigResource.Type.TOPIC, topicName),
new Config(Arrays.asList(entries)));
final DescribeConfigsResult response = mock(DescribeConfigsResult.class);
expect(response.all()).andReturn(KafkaFuture.completedFuture(config));
replay(response);
return response;
}
示例20
private static Map<ConfigResource, Config> withResourceConfig(final ConfigResource resource,
final ConfigEntry... entries) {
final Set<ConfigEntry> expected = Arrays.stream(entries)
.collect(Collectors.toSet());
class ConfigMatcher implements IArgumentMatcher {
@SuppressWarnings("unchecked")
@Override
public boolean matches(final Object argument) {
final Map<ConfigResource, Config> request = (Map<ConfigResource, Config>)argument;
if (request.size() != 1) {
return false;
}
final Config config = request.get(resource);
if (config == null) {
return false;
}
final Set<ConfigEntry> actual = new HashSet<>(config.entries());
return actual.equals(expected);
}
@Override
public void appendTo(final StringBuffer buffer) {
buffer.append(resource).append("->")
.append("Config{").append(expected).append("}");
}
}
EasyMock.reportMatcher(new ConfigMatcher());
return null;
}
示例21
public static String configEntriesToPrettyString(Collection<ConfigEntry> entries) {
StringBuilder b = new StringBuilder();
entries.forEach(entry -> {
b.append(String.format("%s\n", entry));
});
return b.toString();
}
示例22
public ConfigEntriesView(String title,
Set<ConfigEntry> entries,
ConfigEntriesViewPreferences columnWidths) throws IOException {
this.title = title;
observableEntries.setAll(entries);
this.columnWidths = columnWidths;
CustomFxWidgetsLoader.loadOnAnchorPane(this, FXML_FILE);
}
示例23
private void refreshClusterSummaryPaneContent(KafkaClusterProxy proxy) {
Logger.trace("Refreshing cluster pane");
clusterConfigEntriesTabPane.getTabs().clear();
final Set<ClusterNodeInfo> nodeInfos = proxy.getNodesInfo();
List<Tab> tabs = new ArrayList<>();
for (ClusterNodeInfo nodeInfo : nodeInfos) {
final Set<ConfigEntry> entries = nodeInfo.getEntries();
if (entries == null || entries.isEmpty()) {
continue;
}
ConfigEntriesView clusterPropertiesTableView;
try {
clusterPropertiesTableView = new ConfigEntriesView("Node properties", entries, clusterPropertiesViewPreferences);
} catch (IOException e) {
e.printStackTrace();
continue;
}
Tab nodeTab = getNodeTab(nodeInfo, clusterPropertiesTableView);
tabs.add(nodeTab);
}
addTabsToClusterConfigPane(tabs);
}
示例24
public ClusterTopicInfo(String topicName,
List<TopicPartitionInfo> partitions,
Set<ConfigEntry> configEntries) {
this.topicName = topicName;
this.partitions = partitions;
this.configEntries = configEntries;
}
示例25
public Set<ConfigEntry> getConfigEntriesForTopic(String topicName) {
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
final DescribeConfigsResult topicConfiEntries = kafkaClientsAdminClient.describeConfigs(Collections.singleton(configResource));
try {
final Config config = topicConfiEntries.all().get(ApplicationConstants.FUTURE_GET_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(configResource);
final Collection<ConfigEntry> entries = config.entries();
Logger.debug(String.format("Config entries for topic '%s' : %n%s", topicName, AppUtils.configEntriesToPrettyString(entries)));
return new HashSet<>(entries);
} catch (Exception e) {
Logger.error(String.format("Could not retrieve config resource for topic '%s'", topicName), e);
}
return Collections.emptySet();
}
示例26
public Set<ConfigEntry> getTopicProperties(String topicName) {
// just get first topicsInfo for first node,
// it should be the same on rest of nodes any way
for (ClusterTopicInfo clusterTopicInfo : topicsInfo) {
if (clusterTopicInfo.getTopicName().equals(topicName)) {
return clusterTopicInfo.getConfigEntries();
}
}
return Collections.emptySet();
}
示例27
@Override
public void updateTopic(TopicAlterableProperties topicDetails) {
Map<ConfigResource, Config> configs = new HashMap<>();
final ArrayList<ConfigEntry> configEntries = new ArrayList<>();
configEntries.add(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(topicDetails.getRetentionMilliseconds())));
final Config config = new Config(configEntries);
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, topicDetails.getTopicName()), config);
kafkaClientsAdminClient.alterConfigs(configs);
}
示例28
/**
* Returns the {@link Properties} associated to the topic
*
* @param topic
* a Kafka topic
* @return the {@link Properties} associated to the topic
* @throws IllegalArgumentException
* if topic is null, empty or blank
* @throws AdminOperationException
* if there is an issue reading the topic config
*/
public Properties getTopicConfig(String topic) {
if (StringUtils.isBlank(topic))
throw new IllegalArgumentException("topic cannot be null, empty or blank");
LOG.debug("Fetching topic config for topic [{}]", topic);
try {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
Map<ConfigResource, Config> configs = getNewAdminClient()
.describeConfigs(Collections.singleton(resource))
.all()
.get(operationTimeout, TimeUnit.MILLISECONDS);
Config config = configs.get(resource);
if (config == null) {
throw new AdminOperationException("Unable to get topic config: " + topic);
}
Properties properties = new Properties();
config.entries().stream()
// We are only interested in any overrides that are set
.filter(configEntry -> configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
.forEach(configEntry -> properties.setProperty(configEntry.name(), configEntry.value()));
return properties;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AdminOperationException("Unable to retrieve configuration for topic: " + topic, e);
}
}
示例29
/**
* Add config altering operations to the given configs to alter for configs that differ between current and desired.
*
* @param configsToAlter A set of config altering operations to be populated.
* @param desiredConfig Desired config value by name.
* @param currentConfig Current config.
*/
private static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter, Map<String, String> desiredConfig, Config currentConfig) {
for (Map.Entry<String, String> entry : desiredConfig.entrySet()) {
String configName = entry.getKey();
String targetConfigValue = entry.getValue();
ConfigEntry currentConfigEntry = currentConfig.get(configName);
if (currentConfigEntry == null || !currentConfigEntry.value().equals(targetConfigValue)) {
configsToAlter.add(new AlterConfigOp(new ConfigEntry(configName, targetConfigValue), AlterConfigOp.OpType.SET));
}
}
}
示例30
@Test
public void testMaybeUpdateTopicConfig() throws InterruptedException, ExecutionException, TimeoutException {
AdminClient adminClient = EasyMock.createMock(AdminClient.class);
DescribeConfigsResult describeConfigsResult = EasyMock.createMock(DescribeConfigsResult.class);
KafkaFuture<Config> describedConfigsFuture = EasyMock.createMock(KafkaFuture.class);
Config topicConfig = EasyMock.createMock(Config.class);
AlterConfigsResult alterConfigsResult = EasyMock.createMock(AlterConfigsResult.class);
Set<AlterConfigOp> alterConfigOps = Collections.singleton(new AlterConfigOp(
new ConfigEntry(RetentionMsProp(), Long.toString(MOCK_DESIRED_RETENTION_MS)), AlterConfigOp.OpType.SET));
Map<ConfigResource, KafkaFuture<Config>> describeConfigsValues = Collections.singletonMap(MOCK_TOPIC_RESOURCE,
describedConfigsFuture);
Map<ConfigResource, KafkaFuture<Void>> alterConfigsValues = Collections.singletonMap(MOCK_TOPIC_RESOURCE,
EasyMock.createMock(KafkaFuture.class));
NewTopic topicToUpdateConfigs = SamplingUtils.wrapTopic(MOCK_TOPIC, MOCK_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_DESIRED_RETENTION_MS);
EasyMock.expect(adminClient.describeConfigs(EasyMock.eq(Collections.singleton(MOCK_TOPIC_RESOURCE)))).andReturn(describeConfigsResult);
EasyMock.expect(describeConfigsResult.values()).andReturn(describeConfigsValues);
EasyMock.expect(describedConfigsFuture.get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)).andReturn(topicConfig);
EasyMock.expect(topicConfig.get(EasyMock.eq(CleanupPolicyProp()))).andReturn(new ConfigEntry(CleanupPolicyProp(),
DEFAULT_CLEANUP_POLICY));
EasyMock.expect(topicConfig.get(EasyMock.eq(RetentionMsProp()))).andReturn(new ConfigEntry(RetentionMsProp(),
MOCK_CURRENT_RETENTION_MS));
EasyMock.expect(adminClient.incrementalAlterConfigs(EasyMock.eq(Collections.singletonMap(MOCK_TOPIC_RESOURCE,
alterConfigOps))))
.andReturn(alterConfigsResult);
EasyMock.expect(alterConfigsResult.values()).andReturn(alterConfigsValues);
EasyMock.replay(adminClient, describeConfigsResult, describedConfigsFuture, topicConfig, alterConfigsResult);
boolean updateTopicConfig = SamplingUtils.maybeUpdateTopicConfig(adminClient, topicToUpdateConfigs);
EasyMock.verify(adminClient, describeConfigsResult, describedConfigsFuture, topicConfig, alterConfigsResult);
assertTrue(updateTopicConfig);
}