Java源码示例:kafka.utils.ZKGroupTopicDirs

示例1
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

	byte[] data = curatorClient.getData().forPath(path);

	if (data == null) {
		return null;
	} else {
		String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
		if (asString.length() == 0) {
			return null;
		} else {
			try {
				return Long.valueOf(asString);
			}
			catch (NumberFormatException e) {
				LOG.error(
						"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
					groupId, topic, partition, asString);
				return null;
			}
		}
	}
}
 
示例2
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

	byte[] data = curatorClient.getData().forPath(path);

	if (data == null) {
		return null;
	} else {
		String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
		if (asString.length() == 0) {
			return null;
		} else {
			try {
				return Long.valueOf(asString);
			}
			catch (NumberFormatException e) {
				LOG.error(
						"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
					groupId, topic, partition, asString);
				return null;
			}
		}
	}
}
 
示例3
public Long fetchOffset(TopicPartition topicPartition) {
  ZKGroupTopicDirs dirs = new ZKGroupTopicDirs(groupId, topicPartition.topic());
  String path = dirs.consumerOffsetDir() + "/" + topicPartition.partition();
  if (!commitZkClient.exists(path)) {
    return -1L;
  }
  String offset = commitZkClient.readData(path).toString();
  if (StringUtils.isEmpty(offset)) {
    return -1L;
  }
  try {
    return Long.parseLong(offset);
  } catch (Exception e) {
    LOGGER.warn("Parse offset {} for topic partition failed, zk path: {}", offset, path);
    return -1L;
  }
}
 
示例4
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
	byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
	curatorClient.setData().forPath(path, data);
}
 
示例5
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
	byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
	curatorClient.setData().forPath(path, data);
}
 
示例6
public static void setOffsets(String zkOffsetManagement,
                              String groupID,
                              Map<TopicAndPartition, Long> offsets) {
    try (AutoZkClient zkClient = new AutoZkClient(zkOffsetManagement)) {
        offsets.forEach((topicAndPartition, offset) -> {
            ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
            int partition = topicAndPartition.partition();
            String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
            ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
            log.info("updating offset path" + partitionOffsetPath + " offset=" + Long.toString(offset));
        });
    }
}
 
示例7
public static Map<TopicAndPartition, Long> getOffsets(String zkKafkaServers,
                                                      String zkOffSetManager,
                                                      String groupID,
                                                      String topic,
                                                      Map<String, String> kafkaParams) {
    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
    Map<TopicAndPartition, Long> offsets = new HashMap<>();

    AutoZkClient zkKafkaClient = new AutoZkClient(zkKafkaServers);
    AutoZkClient zkOffsetManagerClient = new AutoZkClient(zkOffSetManager);

    List<?> partitions = JavaConversions.seqAsJavaList(
            ZkUtils.getPartitionsForTopics(
                    zkKafkaClient,
                    JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
    partitions.forEach(partition -> {
        String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
        log.info("Offset location, zookeeper path=" + partitionOffsetPath);
        Option<String> maybeOffset = ZkUtils.readDataMaybeNull(zkOffsetManagerClient, partitionOffsetPath)._1();
        Long offset = maybeOffset.isDefined() ? Long.parseLong(maybeOffset.get()) : null;
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, Integer.parseInt(partition.toString()));
        offsets.put(topicAndPartition, offset);
    });

    fillInLatestOffsets(offsets, kafkaParams); // in case offsets are blank for any partition
    return offsets;
}
 
示例8
private void commitOffsetToZookeeper(TopicPartition topicPartition, long offset) {
  if (!offsetCheckpoints.containsKey(topicPartition)
      || offsetCheckpoints.get(topicPartition) != offset) {
    ZKGroupTopicDirs dirs = new ZKGroupTopicDirs(groupId, topicPartition.topic());
    String path = dirs.consumerOffsetDir() + "/" + topicPartition.partition();
    if (!commitZkClient.exists(path)) {
      commitZkClient.createPersistent(path, true);
    }
    commitZkClient.writeData(path,
        String.valueOf(offset));
    offsetCheckpoints.put(topicPartition, offset);
  }
}
 
示例9
private Stream<ConsumerPartitionVO> getConsumerPartitionStream(String groupId,
                                                               String topicName,
                                                               TopicVO topicOpt)
{
   ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topicName);

   if (topicOpt == null || topicOpt.getName().equals(topicName))
   {
      topicOpt = getTopic(topicName).orElse(null);
   }

   if (topicOpt != null)
   {
      final TopicVO topic = topicOpt;

      Map<Integer, ConsumerOffsetVO> consumerOffsets = getConsumerOffsets(groupId, topic);

      return topic.getPartitions().stream()
         .map(partition -> {
            int partitionId = partition.getId();

            final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
            consumerPartition.setOwner(
               Optional.ofNullable(
                  consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
                  .map(data -> new String(data.getData()))
                  .orElse(null));

            consumerPartition.setConsumerOffset(consumerOffsets.getOrDefault(partitionId, ConsumerOffsetVO.UNREAD));

            final Optional<TopicPartitionVO> topicPartition = topic.getPartition(partitionId);
            consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
            consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));

            return consumerPartition;
         });
   }
   else
   {
      return Stream.empty();
   }
}