Java源码示例:org.apache.flink.runtime.state.ttl.TtlUtils
示例1
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
input.setBuffer(bytes);
int lastElementOffset = 0;
while (input.available() > 0) {
try {
long timestamp = nextElementLastAccessTimestamp();
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
break;
}
lastElementOffset = input.getPosition();
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
}
}
return lastElementOffset;
}
示例2
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
input.setBuffer(bytes);
int lastElementOffset = 0;
while (input.available() > 0) {
try {
long timestamp = nextElementLastAccessTimestamp();
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
break;
}
lastElementOffset = input.getPosition();
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
}
}
return lastElementOffset;
}
示例3
@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
input.setBuffer(bytes);
int lastElementOffset = 0;
while (input.available() > 0) {
try {
long timestamp = nextElementLastAccessTimestamp();
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
break;
}
lastElementOffset = input.getPosition();
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
}
}
return lastElementOffset;
}