Java源码示例:akka.cluster.sharding.ClusterShardingSettings

示例1
/**
 * Starts the cluster router (ShardRegion) for this persistent actor type on the given actor system,
 * and returns its ActorRef. If it's already running, just returns the ActorRef.
 */
public ActorRef shardRegion(ActorSystem system) {
	return ClusterSharding.get(system).start(
			persistenceIdPrefix,
			props,
			ClusterShardingSettings.create(system),
			messageExtractor);
}
 
示例2
public SagaShardRegionActor() {
  ActorSystem system = getContext().getSystem();
  ClusterShardingSettings settings = ClusterShardingSettings.create(system);
  sagaActorRegion = ClusterSharding.get(system)
      .start(
          SagaActor.class.getSimpleName(),
          SagaActor.props(null),
          settings,
          messageExtractor);
}
 
示例3
/**
 * Returns a new Sharding Region for the Search Updater.
 *
 * @param numberOfShards the number of shards to use.
 * @param thingUpdaterProps the Props of the ThingUpdater actor.
 * @return the Sharding Region.
 * @throws NullPointerException if {@code thingUpdaterProps} is {@code null}.
 */
@Nonnull
public ActorRef getSearchUpdaterShardRegion(final int numberOfShards,
        @Nonnull final Props thingUpdaterProps,
        final String clusterRole) {
    checkNotNull(thingUpdaterProps, "Props of ThingUpdater");

    final ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
    final ClusterShardingSettings shardingSettings =
            ClusterShardingSettings.create(actorSystem).withRole(clusterRole);
    final ShardRegionExtractor shardRegionExtractor = ShardRegionExtractor.of(numberOfShards, actorSystem);

    return clusterSharding.start(UPDATER_SHARD_REGION, thingUpdaterProps, shardingSettings, shardRegionExtractor);
}
 
示例4
private static ActorRef getConnectionShardRegion(final ActorSystem actorSystem,
        final Props connectionSupervisorProps, final ClusterConfig clusterConfig) {

    final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(actorSystem)
            .withRole(ConnectivityMessagingConstants.CLUSTER_ROLE);

    return ClusterSharding.get(actorSystem)
            .start(ConnectivityMessagingConstants.SHARD_REGION,
                    connectionSupervisorProps,
                    shardingSettings,
                    ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem));
}
 
示例5
/**
 * Starts the cluster router (ShardRegion) for this persistent actor type on the given actor system,
 * and returns its ActorRef. If it's already running, just returns the ActorRef.
 */
public ActorRef shardRegion(ActorSystem system) {
    return ClusterSharding.get(system).start(
        typeName,
        props,
        ClusterShardingSettings.create(system),
        messageExtractor);
}
 
示例6
@SuppressWarnings("unused")
private ThingsRootActor(final ThingsConfig thingsConfig,
        final ActorRef pubSubMediator,
        final ActorMaterializer materializer,
        final ThingPersistenceActorPropsFactory propsFactory) {

    final ActorSystem actorSystem = getContext().system();

    final ClusterConfig clusterConfig = thingsConfig.getClusterConfig();
    final ShardRegionExtractor shardRegionExtractor =
            ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem);
    final ThingEventPubSubFactory pubSubFactory = ThingEventPubSubFactory.of(getContext(), shardRegionExtractor);
    final DistributedPub<ThingEvent> distributedPub = pubSubFactory.startDistributedPub();

    final ActorRef thingsShardRegion = ClusterSharding.get(actorSystem)
            .start(ThingsMessagingConstants.SHARD_REGION,
                    getThingSupervisorActorProps(pubSubMediator, distributedPub, propsFactory),
                    ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE),
                    shardRegionExtractor);

    startChildActor(ThingPersistenceOperationsActor.ACTOR_NAME,
            ThingPersistenceOperationsActor.props(pubSubMediator, thingsConfig.getMongoDbConfig(),
                    actorSystem.settings().config(), thingsConfig.getPersistenceOperationsConfig()));

    retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(thingsShardRegion,
            ThingsMessagingConstants.SHARD_REGION, log);

    final HealthCheckConfig healthCheckConfig = thingsConfig.getHealthCheckConfig();
    final HealthCheckingActorOptions.Builder hcBuilder =
            HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval());
    if (healthCheckConfig.getPersistenceConfig().isEnabled()) {
        hcBuilder.enablePersistenceCheck();
    }

    final HealthCheckingActorOptions healthCheckingActorOptions = hcBuilder.build();
    final MetricsReporterConfig metricsReporterConfig =
            healthCheckConfig.getPersistenceConfig().getMetricsReporterConfig();
    final ActorRef healthCheckingActor = startChildActor(DefaultHealthCheckingActorFactory.ACTOR_NAME,
            DefaultHealthCheckingActorFactory.props(healthCheckingActorOptions,
                    MongoHealthChecker.props(),
                    MongoMetricsReporter.props(
                            metricsReporterConfig.getResolution(),
                            metricsReporterConfig.getHistory(),
                            pubSubMediator
                    )
            ));

    final TagsConfig tagsConfig = thingsConfig.getTagsConfig();
    final ActorRef eventStreamingActor =
            ThingsPersistenceStreamingActorCreator.startEventStreamingActor(tagsConfig.getStreamingCacheSize(),
                    this::startChildActor);
    final ActorRef snapshotStreamingActor =
            ThingsPersistenceStreamingActorCreator.startSnapshotStreamingActor(this::startChildActor);

    pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
    pubSubMediator.tell(DistPubSubAccess.put(eventStreamingActor), getSelf());
    pubSubMediator.tell(DistPubSubAccess.put(snapshotStreamingActor), getSelf());

    final HttpConfig httpConfig = thingsConfig.getHttpConfig();
    String hostname = httpConfig.getHostname();
    if (hostname.isEmpty()) {
        hostname = LocalHostAddressSupplier.getInstance().get();
        log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
    }
    final CompletionStage<ServerBinding> binding = Http.get(actorSystem)
            .bindAndHandle(
                    createRoute(actorSystem, healthCheckingActor).flow(actorSystem,
                            materializer),
                    ConnectHttp.toHost(hostname, httpConfig.getPort()), materializer);

    binding.thenAccept(theBinding -> CoordinatedShutdown.get(getContext().getSystem()).addTask(
            CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
                log.info("Gracefully shutting down status/health HTTP endpoint ...");
                return theBinding.terminate(Duration.ofSeconds(1))
                        .handle((httpTerminated, e) -> Done.getInstance());
            })
    );
    binding.thenAccept(this::logServerBinding)
            .exceptionally(failure -> {
                log.error(failure, "Something very bad happened: {}", failure.getMessage());
                actorSystem.terminate();
                return null;
            });
}
 
示例7
@SuppressWarnings("unused")
private PoliciesRootActor(final PoliciesConfig policiesConfig,
        final SnapshotAdapter<Policy> snapshotAdapter,
        final ActorRef pubSubMediator,
        final ActorMaterializer materializer) {

    final ActorSystem actorSystem = getContext().system();
    final ClusterShardingSettings shardingSettings =
            ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE);

    final Props policySupervisorProps = PolicySupervisorActor.props(pubSubMediator, snapshotAdapter);

    final TagsConfig tagsConfig = policiesConfig.getTagsConfig();
    final ActorRef persistenceStreamingActor = startChildActor(PoliciesPersistenceStreamingActorCreator.ACTOR_NAME,
            PoliciesPersistenceStreamingActorCreator.props(tagsConfig.getStreamingCacheSize()));

    pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
    pubSubMediator.tell(DistPubSubAccess.put(persistenceStreamingActor), getSelf());

    final ClusterConfig clusterConfig = policiesConfig.getClusterConfig();
    final ActorRef policiesShardRegion = ClusterSharding.get(actorSystem)
            .start(PoliciesMessagingConstants.SHARD_REGION, policySupervisorProps, shardingSettings,
                    ShardRegionExtractor.of(clusterConfig.getNumberOfShards(), actorSystem));

    startChildActor(PolicyPersistenceOperationsActor.ACTOR_NAME,
            PolicyPersistenceOperationsActor.props(pubSubMediator, policiesConfig.getMongoDbConfig(),
                    actorSystem.settings().config(), policiesConfig.getPersistenceOperationsConfig()));

    retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(policiesShardRegion,
            PoliciesMessagingConstants.SHARD_REGION, log);

    final HealthCheckConfig healthCheckConfig = policiesConfig.getHealthCheckConfig();
    final HealthCheckingActorOptions.Builder hcBuilder =
            HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval());
    if (healthCheckConfig.getPersistenceConfig().isEnabled()) {
        hcBuilder.enablePersistenceCheck();
    }

    final HealthCheckingActorOptions healthCheckingActorOptions = hcBuilder.build();
    final MetricsReporterConfig metricsReporterConfig =
            healthCheckConfig.getPersistenceConfig().getMetricsReporterConfig();
    final Props healthCheckingActorProps = DefaultHealthCheckingActorFactory.props(healthCheckingActorOptions,
            MongoHealthChecker.props(),
            MongoMetricsReporter.props(
                    metricsReporterConfig.getResolution(),
                    metricsReporterConfig.getHistory(),
                    pubSubMediator
            )
    );
    final ActorRef healthCheckingActor =
            startChildActor(DefaultHealthCheckingActorFactory.ACTOR_NAME, healthCheckingActorProps);

    final HttpConfig httpConfig = policiesConfig.getHttpConfig();
    String hostname = httpConfig.getHostname();
    if (hostname.isEmpty()) {
        hostname = LocalHostAddressSupplier.getInstance().get();
        log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
    }

    final CompletionStage<ServerBinding> binding = Http.get(actorSystem)
            .bindAndHandle(createRoute(actorSystem, healthCheckingActor).flow(actorSystem,
                    materializer), ConnectHttp.toHost(hostname, httpConfig.getPort()), materializer);

    binding.thenAccept(theBinding -> CoordinatedShutdown.get(actorSystem).addTask(
            CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_health_http_endpoint", () -> {
                log.info("Gracefully shutting down status/health HTTP endpoint..");
                return theBinding.terminate(Duration.ofSeconds(1))
                        .handle((httpTerminated, e) -> Done.getInstance());
            })
    );
    binding.thenAccept(this::logServerBinding)
            .exceptionally(failure -> {
                log.error(failure, "Something very bad happened: {}", failure.getMessage());
                actorSystem.terminate();
                return null;
            });
}