Java源码示例:kafka.admin.AdminClient
示例1
/**
* Returns the consumer group assignments of partitions to client IDs or empty map if the group does not exist or is not active
*
* @param consumerGroup
* the name of the consumer group
* @return unmodifiable map of the consumer group assignments of partitions to client IDs or empty map if the group does not
* exist or is not active
* @throws IllegalArgumentException
* if the consumerGroup is null, empty or blank
* @throws AdminOperationException
* if an issue occurs retrieving the assignments
*/
public Map<TopicPartition, String> getConsumerGroupAssignments(String consumerGroup) {
if (StringUtils.isBlank(consumerGroup))
throw new IllegalArgumentException("consumerGroup cannot be null, empty or blank");
Map<TopicPartition, String> assignments = new HashMap<>();
Collection<AdminClient.ConsumerSummary> summaries = getConsumerGroupSummaries(consumerGroup);
for (final AdminClient.ConsumerSummary consumerSummary : summaries) {
Set<TopicPartition> topicPartitions = convertToJavaSet(consumerSummary.assignment().iterator());
for (final TopicPartition topicPartition : topicPartitions) {
assignments.put(topicPartition, consumerSummary.clientId());
}
}
return Collections.unmodifiableMap(assignments);
}
示例2
@Test
public void getConsumerGroupSummary() {
client.createTopic(testName.getMethodName(), 1, 1);
Properties properties = new Properties();
properties.putAll(KafkaTests.getProps());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testName.getMethodName());
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testName.getMethodName() + "-client-id");
try (Consumer<Object, Object> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Arrays.asList(testName.getMethodName()));
consumer.poll(Duration.ofSeconds(5L));
AdminClient.ConsumerGroupSummary summary = client.getConsumerGroupSummary(testName.getMethodName());
assertThat("Expected only 1 consumer summary when getConsumerGroupSummaries(" + testName.getMethodName() + ")",
convertToJavaSet(summary.consumers().get().iterator()).size(), is(1));
assertThat(summary.state(), is(notNullValue()));
assertThat(summary.coordinator(), is(notNullValue()));
assertThat(summary.assignmentStrategy(), is(notNullValue()));
}
}
示例3
public ConsumerGroupSummary describeConsumerGroup(String group) {
AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(
group,
ADMIN_CLIENT_TIMEOUT_MS
);
scala.collection.immutable.List<AdminClient.ConsumerSummary> consumerSummaryList =
consumerGroupSummary.consumers().get();
scala.collection.Iterator<AdminClient.ConsumerSummary> consumerSummaryIterator =
consumerSummaryList.iterator();
ConsumerGroupSummary results = new ConsumerGroupSummary();
while (consumerSummaryIterator.hasNext()) {
AdminClient.ConsumerSummary consumerSummary = consumerSummaryIterator.next();
ConsumerSummary consumerSummary1 = new ConsumerSummary(consumerSummary.consumerId());
results.addConsumerSummary(consumerSummary1);
scala.collection.immutable.List<TopicPartition> topicPartitionList =
consumerSummary.assignment();
scala.collection.Iterator<TopicPartition> topicPartitionIterator =
topicPartitionList.iterator();
while (topicPartitionIterator.hasNext()) {
TopicPartition topicPartition = topicPartitionIterator.next();
consumerSummary1.addPartition(new TopicPartition(
topicPartition.topic(),
topicPartition.partition()
));
}
}
return results;
}
示例4
public static void reinitialize(HostPortValue hostPort, KafkaClusterProxy proxy) throws ClusterConfigurationError,
ExecutionException,
TimeoutException,
InterruptedException {
final AdminClient kafkaAdminClient = createKafkaAdminClient(hostPort);
final org.apache.kafka.clients.admin.AdminClient kafkaClientAdminClient = createKafkaClientAdminClient(hostPort);
final TopicAdmin topicAdmin = new TopicAdmin(kafkaClientAdminClient);
proxy.refresh(topicAdmin, kafkaClientAdminClient, kafkaAdminClient);
}
示例5
@Override
public void refresh(TopicAdmin topicAdmin,
org.apache.kafka.clients.admin.AdminClient adminClient2,
AdminClient kafkaAdminClient)
throws ClusterConfigurationError, InterruptedException, ExecutionException, TimeoutException {
closeOldDependencies();
clearClusterSummary();
assignNewDependencies(topicAdmin, adminClient2, kafkaAdminClient);
throwIfInvalidConfigMakesClusterUnusable();
fetchClusterStateSummary();
}
示例6
private AssignedConsumerInfo getAssignedConsumerInfo(String consumerGroupId,
Map<TopicPartition, Object> offsetForPartition,
AdminClient.ConsumerSummary consumerSummary,
TopicPartition topicPartition) {
return AssignedConsumerInfo.builder()
.consumerGroupId(consumerGroupId)
.consumerId(consumerSummary.consumerId())
.clientId(consumerSummary.clientId())
.host(consumerSummary.host())
.topic(topicPartition.topic())
.partition(String.valueOf(topicPartition.partition()))
.offset(getOffsetForPartition(offsetForPartition, topicPartition))
.build();
}
示例7
private UnassignedConsumerInfo getUnassignedConsumerInfo(String consumerGroupId,
AdminClient.ConsumerSummary consumerSummary) {
return UnassignedConsumerInfo.builder()
.consumerGroupId(consumerGroupId)
.consumerId(consumerSummary.consumerId())
.clientId(consumerSummary.clientId())
.host(consumerSummary.host())
.build();
}
示例8
@Test
public void getConsumerGroupSummaries() {
client.createTopic(testName.getMethodName(), 1, 1);
Properties properties = new Properties();
properties.putAll(KafkaTests.getProps());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testName.getMethodName());
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testName.getMethodName() + "-client-id");
try (Consumer<Object, Object> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Arrays.asList(testName.getMethodName()));
consumer.poll(Duration.ofSeconds(5L));
Collection<AdminClient.ConsumerSummary> summaries = client.getConsumerGroupSummaries(testName.getMethodName());
assertThat("Expected only 1 consumer summary when getConsumerGroupSummaries(" + testName.getMethodName() + ")",
summaries.size(), is(1));
AdminClient.ConsumerSummary summary = summaries.iterator().next();
Collection<TopicPartition> assignments = convertToJavaSet(summary.assignment().iterator());
assertThat("Expected consumer assignment to have single partition", assignments.size(), is(1));
assertThat(assignments.iterator().next(), is(new TopicPartition(testName.getMethodName(), 0)));
assertThat(summary.clientId(), is(testName.getMethodName() + "-client-id"));
}
}
示例9
public TestKafkaConsumerGroupService() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test");
adminClient = AdminClient.create(props);
logger.debug("{}", adminClient.bootstrapBrokers());
}
示例10
TopicPartitionsOffsetInfo(final AdminClient adminClient, final Node leaderNode, final List<TopicPartition>topicPartitionList,
final CountDownLatch countDownLatch,
final long offsetRequestValue){
this.adminClient = adminClient;
this.kafkaApiRequest = new KafkaApiRequest(adminClient.client());
this.topicPartitionList = topicPartitionList;
this.countDownLatch = countDownLatch;
this.leaderNode = leaderNode;
this.offsetRequestValue = offsetRequestValue;
}
示例11
@Bean
AdminClient adminClient(@Value("${kafka.bootstrapServers}") String bootstrapServers) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
return AdminClient.create(properties);
}
示例12
public KafkaOffsetMetrics(AdminClient adminClient, MeterRegistry registry) {
this.adminClient = adminClient;
gauge = MultiGauge.builder("kafka_offset").register(registry);
}
示例13
public KafkaConsumerGroupClientImpl(KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig;
Properties props = new Properties();
props.putAll(ksqlConfig.getKsqlAdminClientConfigProps());
this.adminClient = AdminClient.create(props);
}
示例14
private static AdminClient createKafkaAdminClient(HostPortValue hostPort) {
return AdminClient.createSimplePlaintext(hostPort.toHostString());
}
示例15
private static org.apache.kafka.clients.admin.AdminClient createKafkaClientAdminClient(HostPortValue hostPort) {
final Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, hostPort.toHostString());
return org.apache.kafka.clients.admin.AdminClient.create(props);
}
示例16
private void assignNewDependencies(TopicAdmin topicAdmin, org.apache.kafka.clients.admin.AdminClient adminClient2, AdminClient kafkaAdminClient) {
this.topicAdmin = topicAdmin;
this.kafkaClientsAdminClient = adminClient2;
this.kafkaAdminClient = kafkaAdminClient;
}
示例17
private AdminClient getAdminClient() {
if (adminClient == null)
adminClient = AdminClient.create(properties);
return adminClient;
}
示例18
private org.apache.kafka.clients.admin.AdminClient getNewAdminClient() {
if (newAdminClient == null)
newAdminClient = org.apache.kafka.clients.admin.AdminClient.create(properties);
return newAdminClient;
}
示例19
@Test
public void getConsumerGroupSummary_doesNotExist() {
AdminClient.ConsumerGroupSummary summary = failureClient.getConsumerGroupSummary("consumer-group-does-not-exist");
assertThat(summary.state(), is(ConsumerGroupState.DEAD.toString()));
}
示例20
public CustomConsumerGroupService() {
props = ConfigService.getKafkaConsumerConf();
adminClient = AdminClient.create(props);
logger.debug("{}", adminClient.bootstrapBrokers());
}
示例21
@Override
public void describeGroup(String group) {
List<AdminClient.ConsumerSummary> consumerSummaryList = JavaConversions.asJavaList(adminClient.describeConsumerGroup(group));
Consumer consumer = getConsumer();
logger.debug("consumerList ----- {}", consumerSummaryList);
consumerSummaryList.stream().forEach(e -> {
List<TopicPartition> topicPartitions = JavaConversions.asJavaList(e.assignment());
Stream<Map<String, Long>> partitionOffsets = topicPartitions.stream().flatMap(topicPartition -> {
Map<String, Long> topic = new HashMap<>();
OffsetAndMetadata metadata = consumer.committed(new TopicPartition(topicPartition.topic(), topicPartition.partition()));
if(metadata!=null) {
topic.put(topicPartition.topic(), metadata.offset());
logger.debug("-------- offset {}", metadata.offset());
}
return Stream.of(topic);
});
//partitionOffsets
// logger.debug("partitionOffsets {}", partitionOffsets.collect(Collectors.toList()));
final Map<String, Long> partitionOffsetsMap = topicPartitions.size() > 0 ? partitionOffsets.findFirst().get() : new HashMap<>();
topicPartitions.forEach(tp -> {
long endOff = findLogEndOffset(tp.topic(), tp.partition());
long currentOff = 0;
if (partitionOffsetsMap.size() > 0)
currentOff = partitionOffsetsMap.get(tp.topic());
logger.debug("{}",
String.format("%s %s %s %s %s %s %s %s",
group, tp.topic(), String.valueOf(tp.partition()),
currentOff, endOff, endOff - currentOff,
e.clientId(), e.clientHost()));
});
});
}
示例22
TopicPartitionsOffsetInfo(final AdminClient adminClient){
this.adminClient = adminClient;
this.kafkaApiRequest = new KafkaApiRequest(adminClient.client());
}
示例23
public KafkaConsumerCommand(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
adminClient = AdminClient.create(props);
}
示例24
public KafkaConsumerCommand(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
adminClient = AdminClient.create(props);
}
示例25
/**
* Retrieves the {@link AdminClient.ConsumerGroupSummary} information from Kafka. If the specified group is not found then the
* returned summary will have a {@link AdminClient.ConsumerGroupSummary#state()} of
* {@link org.apache.kafka.common.ConsumerGroupState#DEAD}{@code .toString()}, no exception will be thrown in that case.
*
* @param consumerGroup
* the name of the consumer group
* @return the {@link AdminClient.ConsumerGroupSummary} information from Kafka
* @throws AdminOperationException
* if there is an issue retrieving the consumer group summary
*/
public AdminClient.ConsumerGroupSummary getConsumerGroupSummary(String consumerGroup) {
if (StringUtils.isBlank(consumerGroup))
throw new IllegalArgumentException("consumerGroup cannot be null, empty or blank");
try {
return getAdminClient().describeConsumerGroup(consumerGroup, operationTimeout);
} catch (KafkaException e) {
throw new AdminOperationException("Unable to retrieve summary for consumer group: " + consumerGroup, e);
}
}