Java源码示例:kafka.utils.ZKGroupTopicDirs
示例1
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
示例2
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
示例3
public Long fetchOffset(TopicPartition topicPartition) {
ZKGroupTopicDirs dirs = new ZKGroupTopicDirs(groupId, topicPartition.topic());
String path = dirs.consumerOffsetDir() + "/" + topicPartition.partition();
if (!commitZkClient.exists(path)) {
return -1L;
}
String offset = commitZkClient.readData(path).toString();
if (StringUtils.isEmpty(offset)) {
return -1L;
}
try {
return Long.parseLong(offset);
} catch (Exception e) {
LOGGER.warn("Parse offset {} for topic partition failed, zk path: {}", offset, path);
return -1L;
}
}
示例4
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
示例5
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
示例6
public static void setOffsets(String zkOffsetManagement,
String groupID,
Map<TopicAndPartition, Long> offsets) {
try (AutoZkClient zkClient = new AutoZkClient(zkOffsetManagement)) {
offsets.forEach((topicAndPartition, offset) -> {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
int partition = topicAndPartition.partition();
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
log.info("updating offset path" + partitionOffsetPath + " offset=" + Long.toString(offset));
});
}
}
示例7
public static Map<TopicAndPartition, Long> getOffsets(String zkKafkaServers,
String zkOffSetManager,
String groupID,
String topic,
Map<String, String> kafkaParams) {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
Map<TopicAndPartition, Long> offsets = new HashMap<>();
AutoZkClient zkKafkaClient = new AutoZkClient(zkKafkaServers);
AutoZkClient zkOffsetManagerClient = new AutoZkClient(zkOffSetManager);
List<?> partitions = JavaConversions.seqAsJavaList(
ZkUtils.getPartitionsForTopics(
zkKafkaClient,
JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
partitions.forEach(partition -> {
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
log.info("Offset location, zookeeper path=" + partitionOffsetPath);
Option<String> maybeOffset = ZkUtils.readDataMaybeNull(zkOffsetManagerClient, partitionOffsetPath)._1();
Long offset = maybeOffset.isDefined() ? Long.parseLong(maybeOffset.get()) : null;
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, Integer.parseInt(partition.toString()));
offsets.put(topicAndPartition, offset);
});
fillInLatestOffsets(offsets, kafkaParams); // in case offsets are blank for any partition
return offsets;
}
示例8
private void commitOffsetToZookeeper(TopicPartition topicPartition, long offset) {
if (!offsetCheckpoints.containsKey(topicPartition)
|| offsetCheckpoints.get(topicPartition) != offset) {
ZKGroupTopicDirs dirs = new ZKGroupTopicDirs(groupId, topicPartition.topic());
String path = dirs.consumerOffsetDir() + "/" + topicPartition.partition();
if (!commitZkClient.exists(path)) {
commitZkClient.createPersistent(path, true);
}
commitZkClient.writeData(path,
String.valueOf(offset));
offsetCheckpoints.put(topicPartition, offset);
}
}
示例9
private Stream<ConsumerPartitionVO> getConsumerPartitionStream(String groupId,
String topicName,
TopicVO topicOpt)
{
ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topicName);
if (topicOpt == null || topicOpt.getName().equals(topicName))
{
topicOpt = getTopic(topicName).orElse(null);
}
if (topicOpt != null)
{
final TopicVO topic = topicOpt;
Map<Integer, ConsumerOffsetVO> consumerOffsets = getConsumerOffsets(groupId, topic);
return topic.getPartitions().stream()
.map(partition -> {
int partitionId = partition.getId();
final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
consumerPartition.setOwner(
Optional.ofNullable(
consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
.map(data -> new String(data.getData()))
.orElse(null));
consumerPartition.setConsumerOffset(consumerOffsets.getOrDefault(partitionId, ConsumerOffsetVO.UNREAD));
final Optional<TopicPartitionVO> topicPartition = topic.getPartition(partitionId);
consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));
return consumerPartition;
});
}
else
{
return Stream.empty();
}
}