Java源码示例:io.rsocket.util.ByteBufPayload

示例1
/**
 * Create a Payload from the given metadata and data.
 * @param metadata the metadata part for the payload
 * @param data the data part for the payload
 * @return the created Payload
 */
public static Payload createPayload(DataBuffer metadata, DataBuffer data) {
	if (metadata instanceof NettyDataBuffer && data instanceof NettyDataBuffer) {
		return ByteBufPayload.create(
				((NettyDataBuffer) data).getNativeBuffer(),
				((NettyDataBuffer) metadata).getNativeBuffer());
	}
	else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuffer) {
		return DefaultPayload.create(
				((DefaultDataBuffer) data).getNativeBuffer(),
				((DefaultDataBuffer) metadata).getNativeBuffer());
	}
	else {
		return DefaultPayload.create(data.asByteBuffer(), metadata.asByteBuffer());
	}
}
 
示例2
@Override
public Supplier<Payload> setupPayload() {
    return () -> {
        //composite metadata with app metadata
        RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(getAppMetadata());
        //add published in setup payload
        Set<ServiceLocator> serviceLocators = exposedServices().get();
        if (!compositeMetadata.contains(RSocketMimeType.ServiceRegistry) && !serviceLocators.isEmpty()) {
            ServiceRegistryMetadata serviceRegistryMetadata = new ServiceRegistryMetadata();
            serviceRegistryMetadata.setPublished(serviceLocators);
            compositeMetadata.addMetadata(serviceRegistryMetadata);
        }
        // authentication
        if (this.jwtToken != null && this.jwtToken.length > 0) {
            compositeMetadata.addMetadata(new BearerTokenMetadata(this.jwtToken));
        }
        return ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeMetadata.getContent());
    };
}
 
示例3
@GetMapping(value = "/definition/{serviceName}")
public Mono<String> queryDefinition(@PathVariable(name = "serviceName") String serviceName) {
    Integer handler = routingSelector.findHandler(new ServiceLocator("", serviceName, "").getId());
    if (handler != null) {
        RSocketBrokerResponderHandler brokerResponderHandler = brokerHandlerRegistry.findById(handler);
        if (brokerResponderHandler != null) {
            GSVRoutingMetadata routingMetadata = new GSVRoutingMetadata("", ReactiveServiceDiscovery.class.getCanonicalName() + ".findServiceByFullName", "");
            RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(routingMetadata, jsonMetaEncoding);
            ByteBuf bodyBuf = Unpooled.wrappedBuffer(("[\"" + serviceName + "\"]").getBytes());
            return brokerResponderHandler.getPeerRsocket()
                    .requestResponse(ByteBufPayload.create(bodyBuf, compositeMetadata.getContent()))
                    .map(Payload::getDataUtf8);
        }
    }
    return Mono.error(new Exception(RsocketErrorCode.message("RST-900404", serviceName)));
}
 
示例4
@Override
@SuppressWarnings("unchecked")
public Mono<Payload> apply(Payload payload, MetadataDecoder.Metadata metadata) {
  Object input = unmarshaller.apply(payload.sliceData());
  return rr.apply(input, metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(
          Tracing.traceAsChild(
                  tracer,
                  route,
                  Tag.of("rsocket.route", route),
                  Tag.of("rsocket.ipc.role", "server"),
                  Tag.of("rsocket.ipc.version", "ipc"))
              .apply(metadata.spanContext()))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}
 
示例5
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(
    Flux<Payload> source, Payload payload, MetadataDecoder.Metadata decodedMetadata) {
  return rc.apply(
          unmarshaller.apply(payload.sliceData()),
          source.map(
              p -> {
                try {
                  ByteBuf dd = p.sliceData();
                  Object result = unmarshaller.apply(dd);
                  p.release();
                  return result;
                } catch (Throwable t) {
                  p.release();
                  throw Exceptions.propagate(t);
                }
              }),
          decodedMetadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}
 
示例6
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(
    Flux<Payload> source, Payload payload, MetadataDecoder.Metadata metadata) {
  return rc.apply(
          unmarshaller.apply(payload.sliceData()),
          source.map(
              p -> {
                try {
                  ByteBuf dd = p.sliceData();
                  Object result = unmarshaller.apply(dd);
                  p.release();
                  return result;
                } catch (Throwable t) {
                  p.release();
                  throw Exceptions.propagate(t);
                }
              }),
          metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)));
}
 
示例7
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(Payload payload, MetadataDecoder.Metadata metadata) {
  Object input = unmarshaller.apply(payload.sliceData());
  return rs.apply(input, metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(
          Tracing.traceAsChild(
                  tracer,
                  route,
                  Tag.of("rsocket.route", route),
                  Tag.of("rsocket.ipc.role", "server"),
                  Tag.of("rsocket.ipc.version", "ipc"))
              .apply(metadata.spanContext()))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}
 
示例8
@Override
@SuppressWarnings("unchecked")
public Mono<Void> apply(Payload payload, MetadataDecoder.Metadata metadata) {
  Object input = unmarshaller.apply(payload.sliceData());
  return fnf.apply(input, metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(
          Tracing.traceAsChild(
                  tracer,
                  route,
                  Tag.of("rsocket.route", route),
                  Tag.of("rsocket.ipc.role", "server"),
                  Tag.of("rsocket.ipc.version", "ipc"))
              .apply(metadata.spanContext()))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}
 
示例9
@ParameterizedTest
@MethodSource("interactions")
void requesterExpiredLeaseRequestsAreRejected(
    BiFunction<RSocket, Payload, Publisher<?>> interaction) {
  ByteBuf frame = leaseFrame(50, 1, Unpooled.EMPTY_BUFFER);
  requesterLeaseHandler.receive(frame);

  ByteBuf buffer = byteBufAllocator.buffer();
  buffer.writeCharSequence("test", CharsetUtil.UTF_8);
  Payload payload1 = ByteBufPayload.create(buffer);

  Flux.defer(() -> interaction.apply(rSocketRequester, payload1))
      .delaySubscription(Duration.ofMillis(200))
      .as(StepVerifier::create)
      .expectError(MissingLeaseException.class)
      .verify(Duration.ofSeconds(5));

  Assertions.assertThat(frame.release()).isTrue();

  byteBufAllocator.assertHasNoLeaks();
}
 
示例10
@ParameterizedTest
@MethodSource("interactions")
void expiredLeaseRequestsAreRejected(BiFunction<RSocket, Payload, Publisher<?>> interaction) {
  leaseSender.onNext(Lease.create(50, 1));

  ByteBuf buffer = byteBufAllocator.buffer();
  buffer.writeCharSequence("test", CharsetUtil.UTF_8);
  Payload payload1 = ByteBufPayload.create(buffer);

  Flux.from(interaction.apply(rSocketRequester, payload1))
      .delaySubscription(Duration.ofMillis(100))
      .as(StepVerifier::create)
      .expectError(MissingLeaseException.class)
      .verify(Duration.ofSeconds(5));

  Assertions.assertThat(connection.getSent())
      .hasSize(1)
      .first()
      .matches(bb -> FrameHeaderCodec.frameType(bb) == LEASE)
      .matches(ReferenceCounted::release);

  byteBufAllocator.assertHasNoLeaks();
}
 
示例11
@Test
public void simpleOnDiscardRequestChannelTest() {
  AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create(1);
  TestPublisher<Payload> testPublisher = TestPublisher.create();

  Flux<Payload> payloadFlux = rule.socket.requestChannel(testPublisher);

  payloadFlux.subscribe(assertSubscriber);

  testPublisher.next(
      ByteBufPayload.create("d", "m"),
      ByteBufPayload.create("d1", "m1"),
      ByteBufPayload.create("d2", "m2"));

  assertSubscriber.cancel();

  Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);

  rule.assertHasNoLeaks();
}
 
示例12
@Test
public void simpleOnDiscardRequestChannelTest2() {
  ByteBufAllocator allocator = rule.alloc();
  AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create(1);
  TestPublisher<Payload> testPublisher = TestPublisher.create();

  Flux<Payload> payloadFlux = rule.socket.requestChannel(testPublisher);

  payloadFlux.subscribe(assertSubscriber);

  testPublisher.next(ByteBufPayload.create("d", "m"));

  int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
  testPublisher.next(ByteBufPayload.create("d1", "m1"), ByteBufPayload.create("d2", "m2"));

  rule.connection.addToReceivedBuffer(
      ErrorFrameCodec.encode(
          allocator, streamId, new CustomRSocketException(0x00000404, "test")));

  Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);

  rule.assertHasNoLeaks();
}
 
示例13
@Test
// see https://github.com/rsocket/rsocket-java/issues/858
public void testWorkaround858() {
  ByteBuf buffer = rule.alloc().buffer();
  buffer.writeCharSequence("test", CharsetUtil.UTF_8);

  rule.socket.requestResponse(ByteBufPayload.create(buffer)).subscribe();

  rule.connection.addToReceivedBuffer(
      ErrorFrameCodec.encode(rule.alloc(), 1, new RuntimeException("test")));

  Assertions.assertThat(rule.connection.getSent())
      .hasSize(1)
      .first()
      .matches(bb -> FrameHeaderCodec.frameType(bb) == REQUEST_RESPONSE)
      .matches(ByteBuf::release);

  Assertions.assertThat(rule.socket.isDisposed()).isFalse();

  rule.assertHasNoLeaks();
}
 
示例14
/**
 * Create a Payload from the given data.
 * @param data the data part for the payload
 * @return the created Payload
 */
public static Payload createPayload(DataBuffer data) {
	if (data instanceof NettyDataBuffer) {
		return ByteBufPayload.create(((NettyDataBuffer) data).getNativeBuffer());
	}
	else if (data instanceof DefaultDataBuffer) {
		return DefaultPayload.create(((DefaultDataBuffer) data).getNativeBuffer());
	}
	else {
		return DefaultPayload.create(data.asByteBuffer());
	}
}
 
示例15
@RequestMapping(value = "/{serviceName}/{method}", produces = {MediaType.APPLICATION_JSON_VALUE})
public Mono<ResponseEntity<ByteBuf>> handle(@PathVariable("serviceName") String serviceName,
                                            @PathVariable("method") String method,
                                            @RequestParam(name = "group", required = false, defaultValue = "") String group,
                                            @RequestParam(name = "version", required = false, defaultValue = "") String version,
                                            @RequestBody(required = false) ByteBuf body,
                                            @RequestHeader(name = "Authorization", required = false, defaultValue = "") String authorizationValue) {
    boolean authenticated;
    if (!authRequired) {
        authenticated = true;
    } else {
        authenticated = authAuthorizationValue(authorizationValue);
    }
    if (!authenticated) {
        return Mono.error(new Exception(RsocketErrorCode.message("RST-500403")));
    }
    try {
        GSVRoutingMetadata routingMetadata = new GSVRoutingMetadata(group, serviceName, method, version);
        RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(routingMetadata, jsonMetaEncoding);
        ByteBuf bodyBuf = body == null ? EMPTY_BUFFER : body;
        return rsocket.requestResponse(ByteBufPayload.create(bodyBuf, compositeMetadata.getContent()))
                .map(payload -> {
                    HttpHeaders headers = new HttpHeaders();
                    headers.setContentType(MediaType.APPLICATION_JSON);
                    headers.setCacheControl(CacheControl.noCache().getHeaderValue());
                    return new ResponseEntity<>(payload.data(), headers, HttpStatus.OK);
                });
    } catch (Exception e) {
        return Mono.error(e);
    }
}
 
示例16
public void callRSocketService(String service, String method, @Nullable String jsonData, Pre response) {
    Integer handlerId = this.routingSelector.findHandler(ServiceLocator.serviceHashCode(service));
    if (handlerId != null) {
        RSocketBrokerResponderHandler handler = handlerRegistry.findById(handlerId);
        if (handler != null) {
            //composite metadata for health check
            RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(
                    new GSVRoutingMetadata(null, service, method, null),
                    new MessageMimeTypeMetadata(RSocketMimeType.Json));
            ByteBuf payLoadData;
            if (jsonData == null || jsonData.isEmpty()) {
                payLoadData = Unpooled.EMPTY_BUFFER;
            } else {
                payLoadData = Unpooled.wrappedBuffer(jsonData.getBytes(StandardCharsets.UTF_8));
            }
            Payload requestPayload = ByteBufPayload.create(payLoadData, compositeMetadata.getContent());
            handler.getPeerRsocket().requestResponse(requestPayload)
                    .doOnError(throwable -> getUI().ifPresent(ui -> ui.access(() -> {
                        response.setText(throwable.getMessage());
                    })))
                    .subscribe(payload -> getUI().ifPresent(ui -> ui.access(() -> {
                        response.setText(payload.getDataUtf8());
                    })));
        } else {
            this.serviceNameFiled.setInvalid(true);
            this.serviceNameFiled.setErrorMessage("No Service Provider!");
        }
    } else {
        this.serviceNameFiled.setInvalid(true);
        this.serviceNameFiled.setErrorMessage("Service not found!");
    }

}
 
示例17
@GetMapping(value = "/{uuid}", produces = MimeTypeUtils.TEXT_PLAIN_VALUE)
public Mono<String> scrape(@PathVariable(name = "uuid") String uuid) {
    RSocketBrokerResponderHandler rsocket = handlerRegistry.findByUUID(uuid);
    if (rsocket == null) {
        return Mono.error(new Exception(RsocketErrorCode.message("RST-300205", uuid)));
    }
    return rsocket.getPeerRsocket().requestResponse(ByteBufPayload.create(Unpooled.EMPTY_BUFFER, metricsScrapeCompositeByteBuf.retainedDuplicate()))
            .map(Payload::getDataUtf8);
}
 
示例18
@Override
public Supplier<Payload> setupPayload() {
    return () -> {
        //composite metadata with app metadata
        RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(getAppMetadata());
        // authentication
        if (this.jwtToken != null && this.jwtToken.length > 0) {
            compositeMetadata.addMetadata(new BearerTokenMetadata(this.jwtToken));
        }
        return ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeMetadata.getContent());
    };
}
 
示例19
@NotNull
protected Mono<Payload> remoteRequestResponse(ReactiveMethodMetadata methodMetadata, ByteBuf compositeMetadata, ByteBuf bodyBuf) {
    return rsocket.requestResponse(ByteBufPayload.create(bodyBuf, compositeMetadata))
            .name(methodMetadata.getFullName())
            .metrics()
            .timeout(timeout)
            .doOnError(TimeoutException.class, e -> {
                timeOutMetrics(methodMetadata);
                log.error(RsocketErrorCode.message("RST-200503", methodMetadata.getFullName(), timeout));
            });
}
 
示例20
protected Flux<Payload> localRequestStream(GSVRoutingMetadata routing,
                                           MessageMimeTypeMetadata dataEncodingMetadata,
                                           @Nullable MessageAcceptMimeTypesMetadata messageAcceptMimeTypesMetadata,
                                           Payload payload) {
    try {
        ReactiveMethodHandler methodHandler = localServiceCaller.getInvokeMethod(routing.getService(), routing.getMethod());
        if (methodHandler != null) {
            Object result = invokeLocalService(methodHandler, dataEncodingMetadata, payload);
            Flux<Object> fluxResult;
            if (result instanceof Flux) {
                fluxResult = (Flux<Object>) result;
            } else {
                fluxResult = methodHandler.getReactiveAdapter().toFlux(result);
            }
            //composite data for return value
            RSocketMimeType resultEncodingType = resultEncodingType(messageAcceptMimeTypesMetadata, dataEncodingMetadata.getRSocketMimeType(), methodHandler);
            return fluxResult
                    .map(object -> encodingFacade.encodingResult(object, resultEncodingType))
                    .map(dataByteBuf -> ByteBufPayload.create(dataByteBuf, encodingFacade.getDefaultCompositeMetadataByteBuf(resultEncodingType).retainedDuplicate()));
        } else {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-201404", routing.getService(), routing.getMethod())));
        }
    } catch (Exception e) {
        log.error(RsocketErrorCode.message("RST-200500"), e);
        ReferenceCountUtil.safeRelease(payload);
        return Flux.error(new InvalidException(RsocketErrorCode.message("RST-900500", e.getMessage())));
    }
}
 
示例21
default Payload constructEventReplyPayload(URI replyTo, EventReply eventReply) {
    String path = replyTo.getPath();
    String serviceName = path.substring(path.lastIndexOf("/") + 1);
    String method = replyTo.getFragment();
    RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(new GSVRoutingMetadata("", serviceName, method, ""), new MessageMimeTypeMetadata(WellKnownMimeType.APPLICATION_JSON));
    return ByteBufPayload.create(JsonUtils.toJsonByteBuf(eventReply), compositeMetadata.getContent());
}
 
示例22
default Payload cloudEventToMetadataPushPayload(CloudEventImpl<?> cloudEvent) {
    ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer();
    try {
        ByteBufOutputStream bos = new ByteBufOutputStream(byteBuf);
        JsonUtils.objectMapper.writeValue((OutputStream) bos, cloudEvent);
        return ByteBufPayload.create(Unpooled.EMPTY_BUFFER, byteBuf);
    } catch (Exception e) {
        ReferenceCountUtil.safeRelease(byteBuf);
        throw new EncodingException(RsocketErrorCode.message("RST-700500", "CloudEventImpl", "ByteBuf"), e);
    }
}
 
示例23
private <X> Mono<Void> doFireAndForget(
    final String service,
    final String route,
    final RSocket r,
    final Marshaller<X> marshaller,
    final X o,
    final ByteBuf metadata,
    Function<? super Publisher<Void>, ? extends Publisher<Void>> metrics,
    Function<Map<String, String>, Function<? super Publisher<Void>, ? extends Publisher<Void>>>
        tracing) {
  final HashMap<String, String> map = new HashMap<>();
  return Mono.defer(
          () -> {
            try {
              ByteBuf d = marshaller.apply(o);
              ByteBuf m =
                  metadataEncoder.encode(metadata, new SimpleSpanContext(map), service, route);
              metadata.release();
              Payload payload = ByteBufPayload.create(d, m);
              return r.fireAndForget(payload);
            } catch (Throwable t) {
              metadata.release();
              return Mono.error(t);
            }
          })
      .transform(metrics)
      .transform(tracing.apply(map));
}
 
示例24
private <X, Y> Mono<Y> doRequestResponse(
    final String service,
    final String route,
    final RSocket r,
    final Marshaller<X> marshaller,
    final Unmarshaller<Y> unmarshaller,
    final X o,
    final ByteBuf metadata,
    Function<? super Publisher<Y>, ? extends Publisher<Y>> metrics,
    Function<Map<String, String>, Function<? super Publisher<Y>, ? extends Publisher<Y>>>
        tracing) {
  final HashMap<String, String> map = new HashMap<>();
  return Mono.defer(
          () -> {
            try {
              ByteBuf d = marshaller.apply(o);
              ByteBuf m =
                  metadataEncoder.encode(metadata, new SimpleSpanContext(map), service, route);
              metadata.release();
              Payload payload = ByteBufPayload.create(d, m);
              return r.requestResponse(payload);
            } catch (Throwable t) {
              metadata.release();
              return Mono.error(t);
            }
          })
      .map(
          p -> {
            try {
              return unmarshaller.apply(p.sliceData());
            } finally {
              p.release();
            }
          })
      .transform(metrics)
      .transform(tracing.apply(map));
}
 
示例25
private <X, Y> Flux<Y> doRequestStream(
    final String service,
    final String route,
    final RSocket r,
    final Marshaller<X> marshaller,
    final Unmarshaller<Y> unmarshaller,
    final X o,
    final ByteBuf metadata,
    Function<? super Publisher<Y>, ? extends Publisher<Y>> metrics,
    Function<Map<String, String>, Function<? super Publisher<Y>, ? extends Publisher<Y>>>
        tracing) {
  final HashMap<String, String> map = new HashMap<>();
  return Flux.defer(
          () -> {
            try {
              ByteBuf d = marshaller.apply(o);
              ByteBuf m =
                  metadataEncoder.encode(metadata, new SimpleSpanContext(map), service, route);
              metadata.release();
              Payload payload = ByteBufPayload.create(d, m);
              return r.requestStream(payload);
            } catch (Throwable t) {
              metadata.release();
              return Flux.error(t);
            }
          })
      .map(
          p -> {
            try {
              return unmarshaller.apply(p.sliceData());
            } finally {
              p.release();
            }
          })
      .transform(metrics)
      .transform(tracing.apply(map));
}
 
示例26
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(
    Flux<Payload> source, Payload payload, MetadataDecoder.Metadata metadata) {
  return rs.apply(
          unmarshaller.apply(payload.sliceData()),
          source.map(
              p -> {
                try {
                  ByteBuf dd = p.sliceData();
                  Object result = unmarshaller.apply(dd);
                  p.release();
                  return result;
                } catch (Throwable t) {
                  p.release();
                  throw Exceptions.propagate(t);
                }
              }),
          metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(
          Tracing.traceAsChild(
                  tracer,
                  route,
                  Tag.of("rsocket.route", route),
                  Tag.of("rsocket.ipc.role", "server"),
                  Tag.of("rsocket.ipc.version", "ipc"))
              .apply(metadata.spanContext()));
}
 
示例27
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(Payload payload, MetadataDecoder.Metadata metadata) {
  Object input = unmarshaller.apply(payload.sliceData());
  return rs.apply(input, metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}
 
示例28
@Override
@SuppressWarnings("unchecked")
public Mono<Payload> apply(Payload payload, MetadataDecoder.Metadata metadata) {
  Object input = unmarshaller.apply(payload.sliceData());
  return rr.apply(input, metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}
 
示例29
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(Payload payload, MetadataDecoder.Metadata metadata) {
  Object input = unmarshaller.apply(payload.sliceData());
  return rs.apply(input, metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)));
}
 
示例30
@Override
@SuppressWarnings("unchecked")
public Flux<Payload> apply(
    Flux<Payload> source, Payload payload, MetadataDecoder.Metadata metadata) {
  return rc.apply(
          unmarshaller.apply(payload.sliceData()),
          source.map(
              p -> {
                try {
                  ByteBuf dd = p.sliceData();
                  Object result = unmarshaller.apply(dd);
                  p.release();
                  return result;
                } catch (Throwable t) {
                  p.release();
                  throw Exceptions.propagate(t);
                }
              }),
          metadata.metadata())
      .map(o -> ByteBufPayload.create(marshaller.apply(o)))
      .transform(
          Tracing.traceAsChild(
                  tracer,
                  route,
                  Tag.of("rsocket.route", route),
                  Tag.of("rsocket.ipc.role", "server"),
                  Tag.of("rsocket.ipc.version", "ipc"))
              .apply(metadata.spanContext()))
      .transform(Metrics.timed(meterRegistry, "rsocket.server", "route", route));
}