Java源码示例:org.apache.flink.runtime.state.ttl.TtlUtils

示例1
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
	input.setBuffer(bytes);
	int lastElementOffset = 0;
	while (input.available() > 0) {
		try {
			long timestamp = nextElementLastAccessTimestamp();
			if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
				break;
			}
			lastElementOffset = input.getPosition();
		} catch (IOException e) {
			throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
		}
	}
	return lastElementOffset;
}
 
示例2
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
	input.setBuffer(bytes);
	int lastElementOffset = 0;
	while (input.available() > 0) {
		try {
			long timestamp = nextElementLastAccessTimestamp();
			if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
				break;
			}
			lastElementOffset = input.getPosition();
		} catch (IOException e) {
			throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
		}
	}
	return lastElementOffset;
}
 
示例3
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
	input.setBuffer(bytes);
	int lastElementOffset = 0;
	while (input.available() > 0) {
		try {
			long timestamp = nextElementLastAccessTimestamp();
			if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
				break;
			}
			lastElementOffset = input.getPosition();
		} catch (IOException e) {
			throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
		}
	}
	return lastElementOffset;
}