Java源码示例:org.apache.helix.messaging.handling.MessageHandler

示例1
@Transition(to = "COMPLETED", from = "OFFLINE")
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
    throws InterruptedException {
  logger.info(_partitionKey + " onBecomeCompletedFromOffline");

  // Construct the inner task message from the mapfields of scheduledTaskQueue resource group
  Map<String, String> messageInfo =
      message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
  ZNRecord record = new ZNRecord(_partitionKey);
  record.getSimpleFields().putAll(messageInfo);
  Message taskMessage = new Message(record);
  if (logger.isDebugEnabled()) {
    logger.debug(taskMessage.getRecord().getSimpleFields().toString());
  }
  MessageHandler handler =
      _executor.createMessageHandler(taskMessage, new NotificationContext(null));
  if (handler == null) {
    throw new HelixException("Task message " + taskMessage.getMsgType()
        + " handler not found, task id " + _partitionKey);
  }
  // Invoke the internal handler to complete the task
  handler.handleMessage();
  logger.info(_partitionKey + " onBecomeCompletedFromOffline completed");
}
 
示例2
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  String msgSubType = message.getMsgSubType();
  switch (msgSubType) {
    case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
      return new RefreshSegmentMessageHandler(new SegmentRefreshMessage(message), context);
    case TableConfigRefreshMessage.REFRESH_TABLE_CONFIG_MSG_SUB_TYPE:
      return new RefreshTableConfigMessageHandler(new TableConfigRefreshMessage(message), context);
    default:
      // NOTE: Log a warning and return no-op message handler for unsupported message sub-types. This can happen when
      //       a new message sub-type is added, and the sender gets deployed first while receiver is still running the
      //       old version.
      LOGGER.warn("Received message with unsupported sub-type: {}, using no-op message handler", msgSubType);
      return new NoOpMessageHandler(message, context);
  }
}
 
示例3
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  String type = message.getMsgType();

  if (!type.equals(getMessageType())) {
    throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
        + message.getMsgType());
  }

  return new DefaultControllerMessageHandler(message, context);
}
 
示例4
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  String type = message.getMsgType();

  if (!type.equals(getMessageType())) {
    throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
        + message.getMsgType());
  }

  return new DefaultParticipantErrorMessageHandler(message, context, _manager);
}
 
示例5
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  String type = message.getMsgType();

  if (!type.equals(getMessageType())) {
    throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
        + message.getMsgType());
  }

  return new DefaultSchedulerMessageHandler(message, context, _manager);
}
 
示例6
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  String msgSubType = message.getMsgSubType();
  switch (msgSubType) {
    case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
      return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context);
    case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
      return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
    default:
      LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
          message.getPartitionName());
      return new DefaultMessageHandler(message, _metrics, context);
  }
}
 
示例7
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ControllerUserDefinedMessageHandler(message, context, serviceName, flowCatalogLocalCommit, jobScheduler, resourceHandler);
}
 
示例8
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ControllerUserDefinedMessageHandler(message, context);
}
 
示例9
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ParticipantUserDefinedMessageHandler(message, context);
}
 
示例10
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ParticipantUserDefinedMessageHandler(message, context);
}
 
示例11
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ControllerUserDefinedMessageHandler(message, context);
}
 
示例12
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ControllerShutdownMessageHandler(message, context);
}
 
示例13
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ControllerUserDefinedMessageHandler(message, context);
}
 
示例14
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ParticipantShutdownMessageHandler(message, context);
}
 
示例15
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new ParticipantUserDefinedMessageHandler(message, context);
}
 
示例16
@Override
public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
  return new TestShutdownMessageHandler(message, notificationContext, this.helixMessageTestBase);
}
 
示例17
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {

  return new CustomMessageHandler(message, context);
}
 
示例18
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  String type = message.getMsgType();

  if (!type.equals(MessageType.STATE_TRANSITION.name()) && !type
      .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
    throw new HelixException("Expect state-transition message type, but was "
        + message.getMsgType() + ", msgId: " + message.getMsgId());
  }

  String partitionKey = message.getPartitionName();
  String stateModelName = message.getStateModelDef();
  String resourceName = message.getResourceName();
  String sessionId = message.getTgtSessionId();
  int bucketSize = message.getBucketSize();

  if (stateModelName == null) {
    logger
        .error("Fail to create msg-handler because message does not contain stateModelDef. msgId: "
            + message.getId());
    return null;
  }

  String factoryName = message.getStateModelFactoryName();
  if (factoryName == null) {
    factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
  }

  StateModelFactory<? extends StateModel> stateModelFactory =
      getStateModelFactory(stateModelName, factoryName);
  if (stateModelFactory == null) {
    logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
        + stateModelName + " using factoryName: " + factoryName + " for resource: "
        + resourceName);
    return null;
  }

  // check if the state model definition exists and cache it
  if (!_stateModelDefs.containsKey(stateModelName)) {
    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
    Builder keyBuilder = accessor.keyBuilder();
    StateModelDefinition stateModelDef =
        accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
    if (stateModelDef == null) {
      throw new HelixException("fail to create msg-handler because stateModelDef for "
          + stateModelName + " does NOT exist");
    }
    _stateModelDefs.put(stateModelName, stateModelDef);
  }

  if (!message.getBatchMessageMode()) {
    String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
    StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
    if (stateModel == null) {
      stateModel = stateModelFactory.createAndAddStateModel(resourceName, partitionKey);
      if (stateModelName.equals(TaskConstants.STATE_MODEL_NAME)
          && message.getToState().equals(TaskPartitionState.DROPPED.name())) {
        // If stateModel is null, that means there was a reboot of the Participant. Then the
        // purpose of this first message must be to drop the task. We manually set the current
        // state to be the same state of fromState (which Controller inferred from JobContext) to
        // allow the Participant to successfully process this dropping transition
        stateModel.updateState(message.getFromState());
      } else {
        stateModel.updateState(initState);
      }
    }
    if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
      return new HelixStateTransitionCancellationHandler(stateModel, message, context);
    } else {
      // create currentStateDelta for this partition
      // TODO: move currentStateDelta to StateTransitionMsgHandler
      CurrentState currentStateDelta = new CurrentState(resourceName);
      currentStateDelta.setSessionId(sessionId);
      currentStateDelta.setStateModelDefRef(stateModelName);
      currentStateDelta.setStateModelFactoryName(factoryName);
      currentStateDelta.setBucketSize(bucketSize);

      currentStateDelta.setState(partitionKey,
          (stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState());

      return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
          currentStateDelta);
    }
  } else {
    BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
    if (wrapper == null) {
      wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
    }

    // get executor-service for the message
    TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
    if (executor == null) {
      logger.error(
          "fail to get executor-service for batch message: " + message.getId() + ". msgType: "
              + message.getMsgType() + ", resource: " + message.getResourceName());
      return null;
    }
    return new BatchMessageHandler(message, context, this, wrapper, executor);
  }
}
 
示例19
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new DummyMessageHandler(message, context, _handledMsgSet);
}
 
示例20
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new TestMessagingHandler(message, context);
}
 
示例21
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return new TestMessagingHandler(message, context);
}
 
示例22
@Override
public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
  _messageCount++;
  return new TestMessagingHandlerLatch(message, context);
}
 
示例23
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  // TODO Auto-generated method stub
  return new TestMessageHandler(message, context);
}
 
示例24
@Override
public MessageHandler createHandler(Message message, NotificationContext context) {
  return null;
}