Java源码示例:akka.cluster.sharding.ClusterShardingSettings
示例1
/**
* Starts the cluster router (ShardRegion) for this persistent actor type on the given actor system,
* and returns its ActorRef. If it's already running, just returns the ActorRef.
*/
public ActorRef shardRegion(ActorSystem system) {
return ClusterSharding.get(system).start(
persistenceIdPrefix,
props,
ClusterShardingSettings.create(system),
messageExtractor);
}
示例2
public SagaShardRegionActor() {
ActorSystem system = getContext().getSystem();
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
sagaActorRegion = ClusterSharding.get(system)
.start(
SagaActor.class.getSimpleName(),
SagaActor.props(null),
settings,
messageExtractor);
}
示例3
/**
* Returns a new Sharding Region for the Search Updater.
*
* @param numberOfShards the number of shards to use.
* @param thingUpdaterProps the Props of the ThingUpdater actor.
* @return the Sharding Region.
* @throws NullPointerException if {@code thingUpdaterProps} is {@code null}.
*/
@Nonnull
public ActorRef getSearchUpdaterShardRegion(final int numberOfShards,
@Nonnull final Props thingUpdaterProps,
final String clusterRole) {
checkNotNull(thingUpdaterProps, "Props of ThingUpdater");
final ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.create(actorSystem).withRole(clusterRole);
final ShardRegionExtractor shardRegionExtractor = ShardRegionExtractor.of(numberOfShards, actorSystem);
return clusterSharding.start(UPDATER_SHARD_REGION, thingUpdaterProps, shardingSettings, shardRegionExtractor);
}
示例4
private static ActorRef getConnectionShardRegion(final ActorSystem actorSystem,
final Props connectionSupervisorProps, final ClusterConfig clusterConfig) {
final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(actorSystem)
.withRole(ConnectivityMessagingConstants.CLUSTER_ROLE);
return ClusterSharding.get(actorSystem)
.start(ConnectivityMessagingConstants.SHARD_REGION,
connectionSupervisorProps,
shardingSettings,
ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem));
}
示例5
/**
* Starts the cluster router (ShardRegion) for this persistent actor type on the given actor system,
* and returns its ActorRef. If it's already running, just returns the ActorRef.
*/
public ActorRef shardRegion(ActorSystem system) {
return ClusterSharding.get(system).start(
typeName,
props,
ClusterShardingSettings.create(system),
messageExtractor);
}
示例6
@SuppressWarnings("unused")
private ThingsRootActor(final ThingsConfig thingsConfig,
final ActorRef pubSubMediator,
final ActorMaterializer materializer,
final ThingPersistenceActorPropsFactory propsFactory) {
final ActorSystem actorSystem = getContext().system();
final ClusterConfig clusterConfig = thingsConfig.getClusterConfig();
final ShardRegionExtractor shardRegionExtractor =
ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem);
final ThingEventPubSubFactory pubSubFactory = ThingEventPubSubFactory.of(getContext(), shardRegionExtractor);
final DistributedPub<ThingEvent> distributedPub = pubSubFactory.startDistributedPub();
final ActorRef thingsShardRegion = ClusterSharding.get(actorSystem)
.start(ThingsMessagingConstants.SHARD_REGION,
getThingSupervisorActorProps(pubSubMediator, distributedPub, propsFactory),
ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE),
shardRegionExtractor);
startChildActor(ThingPersistenceOperationsActor.ACTOR_NAME,
ThingPersistenceOperationsActor.props(pubSubMediator, thingsConfig.getMongoDbConfig(),
actorSystem.settings().config(), thingsConfig.getPersistenceOperationsConfig()));
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(thingsShardRegion,
ThingsMessagingConstants.SHARD_REGION, log);
final HealthCheckConfig healthCheckConfig = thingsConfig.getHealthCheckConfig();
final HealthCheckingActorOptions.Builder hcBuilder =
HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval());
if (healthCheckConfig.getPersistenceConfig().isEnabled()) {
hcBuilder.enablePersistenceCheck();
}
final HealthCheckingActorOptions healthCheckingActorOptions = hcBuilder.build();
final MetricsReporterConfig metricsReporterConfig =
healthCheckConfig.getPersistenceConfig().getMetricsReporterConfig();
final ActorRef healthCheckingActor = startChildActor(DefaultHealthCheckingActorFactory.ACTOR_NAME,
DefaultHealthCheckingActorFactory.props(healthCheckingActorOptions,
MongoHealthChecker.props(),
MongoMetricsReporter.props(
metricsReporterConfig.getResolution(),
metricsReporterConfig.getHistory(),
pubSubMediator
)
));
final TagsConfig tagsConfig = thingsConfig.getTagsConfig();
final ActorRef eventStreamingActor =
ThingsPersistenceStreamingActorCreator.startEventStreamingActor(tagsConfig.getStreamingCacheSize(),
this::startChildActor);
final ActorRef snapshotStreamingActor =
ThingsPersistenceStreamingActorCreator.startSnapshotStreamingActor(this::startChildActor);
pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
pubSubMediator.tell(DistPubSubAccess.put(eventStreamingActor), getSelf());
pubSubMediator.tell(DistPubSubAccess.put(snapshotStreamingActor), getSelf());
final HttpConfig httpConfig = thingsConfig.getHttpConfig();
String hostname = httpConfig.getHostname();
if (hostname.isEmpty()) {
hostname = LocalHostAddressSupplier.getInstance().get();
log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
}
final CompletionStage<ServerBinding> binding = Http.get(actorSystem)
.bindAndHandle(
createRoute(actorSystem, healthCheckingActor).flow(actorSystem,
materializer),
ConnectHttp.toHost(hostname, httpConfig.getPort()), materializer);
binding.thenAccept(theBinding -> CoordinatedShutdown.get(getContext().getSystem()).addTask(
CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
log.info("Gracefully shutting down status/health HTTP endpoint ...");
return theBinding.terminate(Duration.ofSeconds(1))
.handle((httpTerminated, e) -> Done.getInstance());
})
);
binding.thenAccept(this::logServerBinding)
.exceptionally(failure -> {
log.error(failure, "Something very bad happened: {}", failure.getMessage());
actorSystem.terminate();
return null;
});
}
示例7
@SuppressWarnings("unused")
private PoliciesRootActor(final PoliciesConfig policiesConfig,
final SnapshotAdapter<Policy> snapshotAdapter,
final ActorRef pubSubMediator,
final ActorMaterializer materializer) {
final ActorSystem actorSystem = getContext().system();
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE);
final Props policySupervisorProps = PolicySupervisorActor.props(pubSubMediator, snapshotAdapter);
final TagsConfig tagsConfig = policiesConfig.getTagsConfig();
final ActorRef persistenceStreamingActor = startChildActor(PoliciesPersistenceStreamingActorCreator.ACTOR_NAME,
PoliciesPersistenceStreamingActorCreator.props(tagsConfig.getStreamingCacheSize()));
pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
pubSubMediator.tell(DistPubSubAccess.put(persistenceStreamingActor), getSelf());
final ClusterConfig clusterConfig = policiesConfig.getClusterConfig();
final ActorRef policiesShardRegion = ClusterSharding.get(actorSystem)
.start(PoliciesMessagingConstants.SHARD_REGION, policySupervisorProps, shardingSettings,
ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem));
startChildActor(PolicyPersistenceOperationsActor.ACTOR_NAME,
PolicyPersistenceOperationsActor.props(pubSubMediator, policiesConfig.getMongoDbConfig(),
actorSystem.settings().config(), policiesConfig.getPersistenceOperationsConfig()));
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(policiesShardRegion,
PoliciesMessagingConstants.SHARD_REGION, log);
final HealthCheckConfig healthCheckConfig = policiesConfig.getHealthCheckConfig();
final HealthCheckingActorOptions.Builder hcBuilder =
HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval());
if (healthCheckConfig.getPersistenceConfig().isEnabled()) {
hcBuilder.enablePersistenceCheck();
}
final HealthCheckingActorOptions healthCheckingActorOptions = hcBuilder.build();
final MetricsReporterConfig metricsReporterConfig =
healthCheckConfig.getPersistenceConfig().getMetricsReporterConfig();
final Props healthCheckingActorProps = DefaultHealthCheckingActorFactory.props(healthCheckingActorOptions,
MongoHealthChecker.props(),
MongoMetricsReporter.props(
metricsReporterConfig.getResolution(),
metricsReporterConfig.getHistory(),
pubSubMediator
)
);
final ActorRef healthCheckingActor =
startChildActor(DefaultHealthCheckingActorFactory.ACTOR_NAME, healthCheckingActorProps);
final HttpConfig httpConfig = policiesConfig.getHttpConfig();
String hostname = httpConfig.getHostname();
if (hostname.isEmpty()) {
hostname = LocalHostAddressSupplier.getInstance().get();
log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
}
final CompletionStage<ServerBinding> binding = Http.get(actorSystem)
.bindAndHandle(createRoute(actorSystem, healthCheckingActor).flow(actorSystem,
materializer), ConnectHttp.toHost(hostname, httpConfig.getPort()), materializer);
binding.thenAccept(theBinding -> CoordinatedShutdown.get(actorSystem).addTask(
CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
log.info("Gracefully shutting down status/health HTTP endpoint..");
return theBinding.terminate(Duration.ofSeconds(1))
.handle((httpTerminated, e) -> Done.getInstance());
})
);
binding.thenAccept(this::logServerBinding)
.exceptionally(failure -> {
log.error(failure, "Something very bad happened: {}", failure.getMessage());
actorSystem.terminate();
return null;
});
}