Java源码示例:org.apache.helix.messaging.handling.MessageHandler
示例1
@Transition(to = "COMPLETED", from = "OFFLINE")
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
logger.info(_partitionKey + " onBecomeCompletedFromOffline");
// Construct the inner task message from the mapfields of scheduledTaskQueue resource group
Map<String, String> messageInfo =
message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
ZNRecord record = new ZNRecord(_partitionKey);
record.getSimpleFields().putAll(messageInfo);
Message taskMessage = new Message(record);
if (logger.isDebugEnabled()) {
logger.debug(taskMessage.getRecord().getSimpleFields().toString());
}
MessageHandler handler =
_executor.createMessageHandler(taskMessage, new NotificationContext(null));
if (handler == null) {
throw new HelixException("Task message " + taskMessage.getMsgType()
+ " handler not found, task id " + _partitionKey);
}
// Invoke the internal handler to complete the task
handler.handleMessage();
logger.info(_partitionKey + " onBecomeCompletedFromOffline completed");
}
示例2
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String msgSubType = message.getMsgSubType();
switch (msgSubType) {
case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
return new RefreshSegmentMessageHandler(new SegmentRefreshMessage(message), context);
case TableConfigRefreshMessage.REFRESH_TABLE_CONFIG_MSG_SUB_TYPE:
return new RefreshTableConfigMessageHandler(new TableConfigRefreshMessage(message), context);
default:
// NOTE: Log a warning and return no-op message handler for unsupported message sub-types. This can happen when
// a new message sub-type is added, and the sender gets deployed first while receiver is still running the
// old version.
LOGGER.warn("Received message with unsupported sub-type: {}, using no-op message handler", msgSubType);
return new NoOpMessageHandler(message, context);
}
}
示例3
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ message.getMsgType());
}
return new DefaultControllerMessageHandler(message, context);
}
示例4
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ message.getMsgType());
}
return new DefaultParticipantErrorMessageHandler(message, context, _manager);
}
示例5
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String type = message.getMsgType();
if (!type.equals(getMessageType())) {
throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+ message.getMsgType());
}
return new DefaultSchedulerMessageHandler(message, context, _manager);
}
示例6
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String msgSubType = message.getMsgSubType();
switch (msgSubType) {
case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context);
case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
default:
LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
message.getPartitionName());
return new DefaultMessageHandler(message, _metrics, context);
}
}
示例7
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ControllerUserDefinedMessageHandler(message, context, serviceName, flowCatalogLocalCommit, jobScheduler, resourceHandler);
}
示例8
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ControllerUserDefinedMessageHandler(message, context);
}
示例9
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ParticipantUserDefinedMessageHandler(message, context);
}
示例10
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ParticipantUserDefinedMessageHandler(message, context);
}
示例11
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ControllerUserDefinedMessageHandler(message, context);
}
示例12
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ControllerShutdownMessageHandler(message, context);
}
示例13
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ControllerUserDefinedMessageHandler(message, context);
}
示例14
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ParticipantShutdownMessageHandler(message, context);
}
示例15
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new ParticipantUserDefinedMessageHandler(message, context);
}
示例16
@Override
public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
return new TestShutdownMessageHandler(message, notificationContext, this.helixMessageTestBase);
}
示例17
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new CustomMessageHandler(message, context);
}
示例18
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
String type = message.getMsgType();
if (!type.equals(MessageType.STATE_TRANSITION.name()) && !type
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
throw new HelixException("Expect state-transition message type, but was "
+ message.getMsgType() + ", msgId: " + message.getMsgId());
}
String partitionKey = message.getPartitionName();
String stateModelName = message.getStateModelDef();
String resourceName = message.getResourceName();
String sessionId = message.getTgtSessionId();
int bucketSize = message.getBucketSize();
if (stateModelName == null) {
logger
.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
+ message.getId());
return null;
}
String factoryName = message.getStateModelFactoryName();
if (factoryName == null) {
factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
}
StateModelFactory<? extends StateModel> stateModelFactory =
getStateModelFactory(stateModelName, factoryName);
if (stateModelFactory == null) {
logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+ stateModelName + " using factoryName: " + factoryName + " for resource: "
+ resourceName);
return null;
}
// check if the state model definition exists and cache it
if (!_stateModelDefs.containsKey(stateModelName)) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
StateModelDefinition stateModelDef =
accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
if (stateModelDef == null) {
throw new HelixException("fail to create msg-handler because stateModelDef for "
+ stateModelName + " does NOT exist");
}
_stateModelDefs.put(stateModelName, stateModelDef);
}
if (!message.getBatchMessageMode()) {
String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
if (stateModel == null) {
stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME)
&& message.getToState().equals(TaskPartitionState.DROPPED.name())) {
// If stateModel is null, that means there was a reboot of the Participant. Then the
// purpose of this first message must be to drop the task. We manually set the current
// state to be the same state of fromState (which Controller inferred from JobContext) to
// allow the Participant to successfully process this dropping transition
stateModel.updateState(message.getFromState());
} else {
stateModel.updateState(initState);
}
}
if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
return new HelixStateTransitionCancellationHandler(stateModel, message, context);
} else {
// create currentStateDelta for this partition
// TODO: move currentStateDelta to StateTransitionMsgHandler
CurrentState currentStateDelta = new CurrentState(resourceName);
currentStateDelta.setSessionId(sessionId);
currentStateDelta.setStateModelDefRef(stateModelName);
currentStateDelta.setStateModelFactoryName(factoryName);
currentStateDelta.setBucketSize(bucketSize);
currentStateDelta.setState(partitionKey,
(stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState());
return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
currentStateDelta);
}
} else {
BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
if (wrapper == null) {
wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
}
// get executor-service for the message
TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
if (executor == null) {
logger.error(
"fail to get executor-service for batch message: " + message.getId() + ". msgType: "
+ message.getMsgType() + ", resource: " + message.getResourceName());
return null;
}
return new BatchMessageHandler(message, context, this, wrapper, executor);
}
}
示例19
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new DummyMessageHandler(message, context, _handledMsgSet);
}
示例20
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new TestMessagingHandler(message, context);
}
示例21
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return new TestMessagingHandler(message, context);
}
示例22
@Override
public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
_messageCount++;
return new TestMessagingHandlerLatch(message, context);
}
示例23
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
// TODO Auto-generated method stub
return new TestMessageHandler(message, context);
}
示例24
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
return null;
}