Java源码示例:io.github.resilience4j.bulkhead.Bulkhead
示例1
/**
* handle the Spring web flux (Flux /Mono) return types AOP based into reactor bulk head See
* {@link Bulkhead} for details.
*
* @param proceedingJoinPoint Spring AOP proceedingJoinPoint
* @param bulkhead the configured bulkhead
* @param methodName the method name
* @return the result object
* @throws Throwable exception in case of faulty flow
*/
@Override
public Object handle(ProceedingJoinPoint proceedingJoinPoint, Bulkhead bulkhead,
String methodName) throws Throwable {
Object returnValue = proceedingJoinPoint.proceed();
if (Flux.class.isAssignableFrom(returnValue.getClass())) {
Flux<?> fluxReturnValue = (Flux<?>) returnValue;
return fluxReturnValue.transformDeferred(BulkheadOperator.of(bulkhead));
} else if (Mono.class.isAssignableFrom(returnValue.getClass())) {
Mono<?> monoReturnValue = (Mono<?>) returnValue;
return monoReturnValue.transformDeferred(BulkheadOperator.of(bulkhead));
} else {
logger.error("Unsupported type for Reactor BulkHead {}",
returnValue.getClass().getTypeName());
throw new IllegalArgumentException(
"Not Supported type for the BulkHead in Reactor :" + returnValue.getClass()
.getName());
}
}
示例2
@Test
public void bulkheadConfigChangeAffectsTheMaxAllowedConcurrentCallsValue() {
Bulkhead bulkhead = givenMetricRegistry("testPre", metricRegistry);
// Then make sure that configured value is reported as max allowed concurrent calls
assertThat(
metricRegistry.getGauges().get("testPre.testBulkhead.max_allowed_concurrent_calls")
.getValue())
.isEqualTo(DEFAULT_MAX_CONCURRENT_CALLS);
// And when the config is changed
BulkheadConfig newConfig = BulkheadConfig.custom()
.maxConcurrentCalls(DEFAULT_MAX_CONCURRENT_CALLS + 50)
.build();
bulkhead.changeConfig(newConfig);
// Then the new config value gets reported
assertThat(
metricRegistry.getGauges().get("testPre.testBulkhead.max_allowed_concurrent_calls")
.getValue())
.isEqualTo(newConfig.getMaxConcurrentCalls());
}
示例3
@Override
public List<MetricFamilySamples> collect() {
GaugeMetricFamily availableCallsFamily = new GaugeMetricFamily(
names.getAvailableConcurrentCallsMetricName(),
"The number of available concurrent calls",
LabelNames.NAME
);
GaugeMetricFamily maxAllowedCallsFamily = new GaugeMetricFamily(
names.getMaxAllowedConcurrentCallsMetricName(),
"The maximum number of allowed concurrent calls",
LabelNames.NAME
);
for (Bulkhead bulkhead : bulkheadRegistry.getAllBulkheads()) {
List<String> labelValues = singletonList(bulkhead.getName());
availableCallsFamily
.addMetric(labelValues, bulkhead.getMetrics().getAvailableConcurrentCalls());
maxAllowedCallsFamily
.addMetric(labelValues, bulkhead.getMetrics().getMaxAllowedConcurrentCalls());
}
return asList(availableCallsFamily, maxAllowedCallsFamily);
}
示例4
@Test
public void shouldAddMetricsForANewlyCreatedRetry() {
Bulkhead newBulkhead = bulkheadRegistry.bulkhead("backendB");
assertThat(taggedBulkheadMetrics.meterIdMap).containsKeys("backendA", "backendB");
assertThat(taggedBulkheadMetrics.meterIdMap.get("backendA")).hasSize(2);
assertThat(taggedBulkheadMetrics.meterIdMap.get("backendB")).hasSize(2);
List<Meter> meters = meterRegistry.getMeters();
assertThat(meters).hasSize(4);
Collection<Gauge> gauges = meterRegistry
.get(DEFAULT_BULKHEAD_MAX_ALLOWED_CONCURRENT_CALLS_METRIC_NAME).gauges();
Optional<Gauge> successful = findMeterByNamesTag(gauges, newBulkhead.getName());
assertThat(successful).isPresent();
assertThat(successful.get().value())
.isEqualTo(newBulkhead.getMetrics().getMaxAllowedConcurrentCalls());
}
示例5
@Test
public void shouldReplaceMetrics() {
Collection<Gauge> gauges = meterRegistry
.get(DEFAULT_BULKHEAD_MAX_ALLOWED_CONCURRENT_CALLS_METRIC_NAME).gauges();
Optional<Gauge> successful = findMeterByNamesTag(gauges, bulkhead.getName());
assertThat(successful).isPresent();
assertThat(successful.get().value())
.isEqualTo(bulkhead.getMetrics().getMaxAllowedConcurrentCalls());
Bulkhead newBulkhead = Bulkhead.of(bulkhead.getName(), BulkheadConfig.custom()
.maxConcurrentCalls(100).build());
bulkheadRegistry.replace(bulkhead.getName(), newBulkhead);
gauges = meterRegistry.get(DEFAULT_BULKHEAD_MAX_ALLOWED_CONCURRENT_CALLS_METRIC_NAME)
.gauges();
successful = findMeterByNamesTag(gauges, newBulkhead.getName());
assertThat(successful).isPresent();
assertThat(successful.get().value())
.isEqualTo(newBulkhead.getMetrics().getMaxAllowedConcurrentCalls());
}
示例6
@Test
public void shouldAddMetricsForANewlyCreatedRetry() {
Bulkhead newBulkhead = bulkheadRegistry.bulkhead("backendB");
assertThat(taggedBulkheadMetricsPublisher.meterIdMap).containsKeys("backendA", "backendB");
assertThat(taggedBulkheadMetricsPublisher.meterIdMap.get("backendA")).hasSize(2);
assertThat(taggedBulkheadMetricsPublisher.meterIdMap.get("backendB")).hasSize(2);
List<Meter> meters = meterRegistry.getMeters();
assertThat(meters).hasSize(4);
Collection<Gauge> gauges = meterRegistry
.get(DEFAULT_BULKHEAD_MAX_ALLOWED_CONCURRENT_CALLS_METRIC_NAME).gauges();
Optional<Gauge> successful = findMeterByNamesTag(gauges, newBulkhead.getName());
assertThat(successful).isPresent();
assertThat(successful.get().value())
.isEqualTo(newBulkhead.getMetrics().getMaxAllowedConcurrentCalls());
}
示例7
@Test
public void shouldReplaceMetrics() {
Collection<Gauge> gauges = meterRegistry
.get(DEFAULT_BULKHEAD_MAX_ALLOWED_CONCURRENT_CALLS_METRIC_NAME).gauges();
Optional<Gauge> successful = findMeterByNamesTag(gauges, bulkhead.getName());
assertThat(successful).isPresent();
assertThat(successful.get().value())
.isEqualTo(bulkhead.getMetrics().getMaxAllowedConcurrentCalls());
Bulkhead newBulkhead = Bulkhead.of(bulkhead.getName(), BulkheadConfig.custom()
.maxConcurrentCalls(100).build());
bulkheadRegistry.replace(bulkhead.getName(), newBulkhead);
gauges = meterRegistry.get(DEFAULT_BULKHEAD_MAX_ALLOWED_CONCURRENT_CALLS_METRIC_NAME)
.gauges();
successful = findMeterByNamesTag(gauges, newBulkhead.getName());
assertThat(successful).isPresent();
assertThat(successful.get().value())
.isEqualTo(newBulkhead.getMetrics().getMaxAllowedConcurrentCalls());
}
示例8
@Override
public void publishMetrics(Bulkhead bulkhead) {
String name = bulkhead.getName();
//number of available concurrent calls as an integer
String availableConcurrentCalls = name(prefix, name, AVAILABLE_CONCURRENT_CALLS);
String maxAllowedConcurrentCalls = name(prefix, name, MAX_ALLOWED_CONCURRENT_CALLS);
metricRegistry.register(availableConcurrentCalls,
(Gauge<Integer>) () -> bulkhead.getMetrics().getAvailableConcurrentCalls());
metricRegistry.register(maxAllowedConcurrentCalls,
(Gauge<Integer>) () -> bulkhead.getMetrics().getMaxAllowedConcurrentCalls());
List<String> metricNames = Arrays
.asList(availableConcurrentCalls, maxAllowedConcurrentCalls);
metricsNameMap.put(name, new HashSet<>(metricNames));
}
示例9
@Test
public void testDecorateSupplier() {
given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Supplier<String> decoratedSupplier = Decorators
.ofSupplier(() -> helloWorldService.returnHelloWorld())
.withCircuitBreaker(circuitBreaker)
.withRetry(Retry.ofDefaults("id"))
.withRateLimiter(RateLimiter.ofDefaults("testName"))
.withBulkhead(Bulkhead.ofDefaults("testName"))
.decorate();
String result = decoratedSupplier.get();
assertThat(result).isEqualTo("Hello world");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorld();
}
示例10
@Test
public void testDecorateCallable() throws Exception {
given(helloWorldService.returnHelloWorldWithException()).willReturn("Hello world");
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Callable<String> decoratedCallable = Decorators
.ofCallable(() -> helloWorldService.returnHelloWorldWithException())
.withCircuitBreaker(circuitBreaker)
.withRetry(Retry.ofDefaults("id"))
.withRateLimiter(RateLimiter.ofDefaults("testName"))
.withBulkhead(Bulkhead.ofDefaults("testName"))
.decorate();
String result = decoratedCallable.call();
assertThat(result).isEqualTo("Hello world");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldWithException();
}
示例11
@Test
public void testDecorateRunnable() {
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Runnable decoratedRunnable = Decorators
.ofRunnable(() -> helloWorldService.sayHelloWorld())
.withCircuitBreaker(circuitBreaker)
.withRetry(Retry.ofDefaults("id"))
.withRateLimiter(RateLimiter.ofDefaults("testName"))
.withBulkhead(Bulkhead.ofDefaults("testName"))
.decorate();
decoratedRunnable.run();
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).sayHelloWorld();
}
示例12
@Test
public void testDecorateCompletionStage() throws ExecutionException, InterruptedException {
given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Supplier<CompletionStage<String>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);
CompletionStage<String> completionStage = Decorators
.ofCompletionStage(completionStageSupplier)
.withCircuitBreaker(circuitBreaker)
.withRetry(Retry.ofDefaults("id"), Executors.newSingleThreadScheduledExecutor())
.withBulkhead(Bulkhead.ofDefaults("testName"))
.get();
String value = completionStage.toCompletableFuture().get();
assertThat(value).isEqualTo("Hello world");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorld();
}
示例13
@Test
public void testExecuteConsumer() {
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Decorators.DecorateConsumer<String> decoratedConsumer =
Decorators.ofConsumer((String input) -> helloWorldService
.sayHelloWorldWithName(input))
.withCircuitBreaker(circuitBreaker)
.withBulkhead(Bulkhead.ofDefaults("testName"))
.withRateLimiter(RateLimiter.ofDefaults("testName"));
decoratedConsumer.accept("test");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).sayHelloWorldWithName("test");
}
示例14
@Test
public void testDecorateFunction() {
given(helloWorldService.returnHelloWorldWithName("Name")).willReturn("Hello world Name");
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Function<String, String> decoratedFunction = Decorators
.ofFunction(helloWorldService::returnHelloWorldWithName)
.withCircuitBreaker(circuitBreaker)
.withRetry(Retry.ofDefaults("id"))
.withRateLimiter(RateLimiter.ofDefaults("testName"))
.withBulkhead(Bulkhead.ofDefaults("testName"))
.decorate();
String result = decoratedFunction.apply("Name");
assertThat(result).isEqualTo("Hello world Name");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
}
示例15
@Test
public void testDecoratorBuilderWithRetry() {
given(helloWorldService.returnHelloWorld()).willThrow(new RuntimeException("BAM!"));
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Supplier<String> decoratedSupplier = Decorators
.ofSupplier(() -> helloWorldService.returnHelloWorld())
.withCircuitBreaker(circuitBreaker)
.withRetry(Retry.ofDefaults("id"))
.withBulkhead(Bulkhead.ofDefaults("testName"))
.decorate();
Try.of(decoratedSupplier::get);
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(3);
assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(3);
then(helloWorldService).should(times(3)).returnHelloWorld();
}
示例16
@Override
protected Bulkhead givenMetricRegistry(String prefix, MetricRegistry metricRegistry) {
BulkheadRegistry bulkheadRegistry =
BulkheadRegistry.of(BulkheadConfig.ofDefaults(),
new BulkheadMetricsPublisher(prefix, metricRegistry));
return bulkheadRegistry.bulkhead("testBulkhead");
}
示例17
public ConcurrentBulkheadTest() {
bulkhead = Bulkhead.of("test", BulkheadConfig.custom().maxConcurrentCalls(1).build());
callRejectectedEventSubscriber = RxJava2Adapter.toFlowable(bulkhead.getEventPublisher())
.filter(event -> event.getEventType() == Type.CALL_REJECTED)
.map(BulkheadEvent::getEventType)
.test();
}
示例18
@Test
public void testReactorTypes() throws Throwable {
Bulkhead bulkhead = Bulkhead.ofDefaults("test");
when(proceedingJoinPoint.proceed()).thenReturn(Mono.just("Test"));
assertThat(reactorBulkheadAspectExt.handle(proceedingJoinPoint, bulkhead, "testMethod"))
.isNotNull();
when(proceedingJoinPoint.proceed()).thenReturn(Flux.just("Test"));
assertThat(reactorBulkheadAspectExt.handle(proceedingJoinPoint, bulkhead, "testMethod"))
.isNotNull();
}
示例19
private BulkheadMetrics(String prefix, Iterable<Bulkhead> bulkheads,
MetricRegistry metricRegistry) {
requireNonNull(prefix);
requireNonNull(bulkheads);
requireNonNull(metricRegistry);
this.metricRegistry = metricRegistry;
bulkheads.forEach(bulkhead -> {
String name = bulkhead.getName();
//number of available concurrent calls as an integer
metricRegistry.register(name(prefix, name, AVAILABLE_CONCURRENT_CALLS),
(Gauge<Integer>) () -> bulkhead.getMetrics().getAvailableConcurrentCalls());
metricRegistry.register(name(prefix, name, MAX_ALLOWED_CONCURRENT_CALLS),
(Gauge<Integer>) () -> bulkhead.getMetrics().getMaxAllowedConcurrentCalls());
});
}
示例20
public InMemoryBulkheadRegistry(Map<String, BulkheadConfig> configs,
List<RegistryEventConsumer<Bulkhead>> registryEventConsumers,
io.vavr.collection.Map<String, String> tags, RegistryStore<Bulkhead> registryStore) {
super(configs.getOrDefault(DEFAULT_CONFIG, BulkheadConfig.ofDefaults()),
registryEventConsumers, Optional.ofNullable(tags).orElse(HashMap.empty()),
Optional.ofNullable(registryStore).orElse(new InMemoryRegistryStore<>()));
this.configurations.putAll(configs);
}
示例21
/**
* {@inheritDoc}
*/
@Override
public Bulkhead bulkhead(String name, BulkheadConfig config,
io.vavr.collection.Map<String, String> tags) {
return computeIfAbsent(name, () -> Bulkhead
.of(name, Objects.requireNonNull(config, CONFIG_MUST_NOT_BE_NULL), getAllTags(tags)));
}
示例22
@Override
protected Bulkhead givenMetricRegistry(String prefix, MetricRegistry metricRegistry) {
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();
Bulkhead bulkhead = bulkheadRegistry.bulkhead("testBulkhead");
metricRegistry
.registerAll(BulkheadMetrics.ofIterable(prefix, bulkheadRegistry.getAllBulkheads()));
return bulkhead;
}
示例23
/**
* {@inheritDoc}
*/
@Override
public Bulkhead bulkhead(String name, Supplier<BulkheadConfig> bulkheadConfigSupplier,
io.vavr.collection.Map<String, String> tags) {
return computeIfAbsent(name, () -> Bulkhead.of(name, Objects.requireNonNull(
Objects.requireNonNull(bulkheadConfigSupplier, SUPPLIER_MUST_NOT_BE_NULL).get(),
CONFIG_MUST_NOT_BE_NULL), getAllTags(tags)));
}
示例24
/**
* Initializes a bulkhead registry.
*
* @param bulkheadConfigurationProperties The bulkhead configuration properties.
* @param compositeBulkheadCustomizer
* @return a BulkheadRegistry
*/
private BulkheadRegistry createBulkheadRegistry(
BulkheadConfigurationProperties bulkheadConfigurationProperties,
RegistryEventConsumer<Bulkhead> bulkheadRegistryEventConsumer,
CompositeCustomizer<BulkheadConfigCustomizer> compositeBulkheadCustomizer) {
Map<String, BulkheadConfig> configs = bulkheadConfigurationProperties.getConfigs()
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> bulkheadConfigurationProperties.createBulkheadConfig(entry.getValue(),
compositeBulkheadCustomizer, entry.getKey())));
return BulkheadRegistry.of(configs, bulkheadRegistryEventConsumer,
io.vavr.collection.HashMap.ofAll(bulkheadConfigurationProperties.getTags()));
}
示例25
@Before
public void setUp() {
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(2)
.maxWaitDuration(Duration.ofMillis(0))
.build();
bulkhead = Bulkhead.of("test", config);
testSubscriber = RxJava2Adapter.toFlowable(bulkhead.getEventPublisher())
.map(BulkheadEvent::getEventType)
.test();
}
示例26
@Test
public void testCreateWithNullConfig() {
Supplier<BulkheadConfig> configSupplier = () -> null;
assertThatThrownBy(() -> Bulkhead.of("test", configSupplier))
.isInstanceOf(NullPointerException.class).hasMessage("Config must not be null");
}
示例27
@Test
public void testCreateWithDefaults() {
Bulkhead bulkhead = Bulkhead.ofDefaults("test");
assertThat(bulkhead).isNotNull();
assertThat(bulkhead.getBulkheadConfig()).isNotNull();
assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls())
.isEqualTo(DEFAULT_MAX_CONCURRENT_CALLS);
assertThat(bulkhead.getBulkheadConfig().isWritableStackTraceEnabled())
.isEqualTo(DEFAULT_WRITABLE_STACK_TRACE_ENABLED);
assertThat(bulkhead.getBulkheadConfig().isFairCallHandlingEnabled())
.isEqualTo(DEFAULT_FAIR_CALL_HANDLING_STRATEGY_ENABLED);
}
示例28
@Before
public void setUp() {
bulkhead = spy(Bulkhead.of("bulkheadTest", BulkheadConfig.ofDefaults()));
final FeignDecorators decorators = FeignDecorators.builder()
.withBulkhead(bulkhead)
.build();
testService = Resilience4jFeign.builder(decorators)
.target(TestService.class, MOCK_URL);
}
示例29
/**
* @param proceedingJoinPoint Spring AOP proceedingJoinPoint
* @param bulkhead the configured bulkhead
* @param methodName the method name
* @return the result object
* @throws Throwable exception in case of faulty flow
*/
@Override
public Object handle(ProceedingJoinPoint proceedingJoinPoint, Bulkhead bulkhead,
String methodName) throws Throwable {
BulkheadOperator<?> bulkheadOperator = BulkheadOperator.of(bulkhead);
Object returnValue = proceedingJoinPoint.proceed();
return executeRxJava2Aspect(bulkheadOperator, returnValue);
}
示例30
@Override
public void bindTo(MeterRegistry registry) {
for (Bulkhead bulkhead : bulkheadRegistry.getAllBulkheads()) {
addMetrics(registry, bulkhead);
}
bulkheadRegistry.getEventPublisher()
.onEntryAdded(event -> addMetrics(registry, event.getAddedEntry()));
bulkheadRegistry.getEventPublisher()
.onEntryRemoved(event -> removeMetrics(registry, event.getRemovedEntry().getName()));
bulkheadRegistry.getEventPublisher().onEntryReplaced(event -> {
removeMetrics(registry, event.getOldEntry().getName());
addMetrics(registry, event.getNewEntry());
});
}