Java源码示例:org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader

示例1
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryCorrectionOffsetHeader requestHeader =
        (QueryCorrectionOffsetHeader)request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

    Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
        .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

    Map<Integer, Long> compareOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

    if (compareOffset != null && !compareOffset.isEmpty()) {
        for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
            Integer queueId = entry.getKey();
            correctionOffset.put(queueId,
                correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
        }
    }

    QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
    body.setCorrectionOffsets(correctionOffset);
    response.setBody(body.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例2
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryCorrectionOffsetHeader requestHeader =
        (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

    Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
        .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

    Map<Integer, Long> compareOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

    if (compareOffset != null && !compareOffset.isEmpty()) {
        for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
            Integer queueId = entry.getKey();
            correctionOffset.put(queueId,
                correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
        }
    }

    QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
    body.setCorrectionOffsets(correctionOffset);
    response.setBody(body.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例3
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryCorrectionOffsetHeader requestHeader =
        (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

    Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
        .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

    Map<Integer, Long> compareOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

    if (compareOffset != null && !compareOffset.isEmpty()) {
        for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
            Integer queueId = entry.getKey();
            correctionOffset.put(queueId,
                correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
        }
    }

    QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
    body.setCorrectionOffsets(correctionOffset);
    response.setBody(body.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例4
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryCorrectionOffsetHeader requestHeader =
            (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

//        在所有的消费组中查询最小的offset=》
        Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
            .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

//       按topic和消费组查找offset=》
        Map<Integer, Long> compareOffset =
            this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

        if (compareOffset != null && !compareOffset.isEmpty()) {
            for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
                Integer queueId = entry.getKey();
                correctionOffset.put(queueId,
                    correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
            }
        }

        QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
        body.setCorrectionOffsets(correctionOffset);
        response.setBody(body.encode());
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
 
示例5
/**
 * 查询当前的偏移量
 * @param ctx ;
 * @param request ;
 * @return ;
 * @throws RemotingCommandException ;
 */
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);

    QueryCorrectionOffsetHeader requestHeader =
        (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

    Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
        .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

    Map<Integer, Long> compareOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

    if (compareOffset != null && !compareOffset.isEmpty()) {
        for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
            Integer queueId = entry.getKey();
            correctionOffset.put(queueId,
                correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
        }
    }

    QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
    body.setCorrectionOffsets(correctionOffset);
    response.setBody(body.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例6
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryCorrectionOffsetHeader requestHeader =
        (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

    Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
        .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

    Map<Integer, Long> compareOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

    if (compareOffset != null && !compareOffset.isEmpty()) {
        for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
            Integer queueId = entry.getKey();
            correctionOffset.put(queueId,
                correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
        }
    }

    QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
    body.setCorrectionOffsets(correctionOffset);
    response.setBody(body.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例7
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    QueryCorrectionOffsetHeader requestHeader =
        (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);

    Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
        .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());

    Map<Integer, Long> compareOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());

    if (compareOffset != null && !compareOffset.isEmpty()) {
        for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
            Integer queueId = entry.getKey();
            correctionOffset.put(queueId,
                correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
        }
    }

    QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();
    body.setCorrectionOffsets(correctionOffset);
    response.setBody(body.encode());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}