Java源码示例:org.apache.kafka.connect.runtime.Connect

示例1
/**
 * here does not seem to be a public interface for embedding a Kafka connect runtime,
 * therefore, this code is modeled from the behavior taken from
 * https://github.com/apache/kafka/blob/2.1/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 * and performs the initialization in a roughly similar manner.
 *
 */
private void init() {
    LOG.info("Started worked initialization");

    Time time = Time.SYSTEM;

    // Initializes the system runtime information and logs some of the information
    WorkerInfo initInfo = new WorkerInfo();
    initInfo.logAll();

    Properties props = kafkaConnectPropertyFactory.getProperties();

    Map<String, String> standAloneProperties = Utils.propsToStringMap(props);

    // Not needed, but we need this one to initialize the worker
    Plugins plugins = new Plugins(standAloneProperties);

    StandaloneConfig config = new StandaloneConfig(standAloneProperties);
    String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
    AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();

    RestServer rest = new RestServer(config);
    rest.initializeServer();

    /*
     According to the Kafka source code "... Worker runs a (dynamic) set of tasks
     in a set of threads, doing the work of actually moving data to/from Kafka ..."
     */
    Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);

    /*
    From Kafka source code: " ... The herder interface tracks and manages workers
    and connectors ..."
     */
    herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy);
    connect = new Connect(herder, rest);
    LOG.info("Finished initializing the worker");
}
 
示例2
void start() {
    final Map<String, String> workerProps = new HashMap<>();
    workerProps.put("bootstrap.servers", bootstrapServers);

    workerProps.put("offset.flush.interval.ms", Integer.toString(offsetFlushInterval));

    // These don't matter much (each connector sets its own converters), but need to be filled with valid classes.
    workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
    workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
    workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
    workerProps.put("internal.key.converter.schemas.enable", "false");
    workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
    workerProps.put("internal.value.converter.schemas.enable", "false");

    // Don't need it since we'll memory MemoryOffsetBackingStore.
    workerProps.put("offset.storage.file.filename", "");

    workerProps.put("plugin.path", pluginDir.getPath());

    final Time time = Time.SYSTEM;
    final String workerId = "test-worker";

    final Plugins plugins = new Plugins(workerProps);
    final StandaloneConfig config = new StandaloneConfig(workerProps);

    final Worker worker = new Worker(
        workerId, time, plugins, config, new MemoryOffsetBackingStore());
    herder = new StandaloneHerder(worker);

    final RestServer rest = new RestServer(config);

    connect = new Connect(herder, rest);

    connect.start();
}
 
示例3
@SuppressWarnings("unchecked")
ConnectStandalone(final Properties workerProperties) {
  Time time = Time.SYSTEM;
  LOGGER.info("Kafka Connect standalone worker initializing ...");
  long initStart = time.hiResClockMs();
  WorkerInfo initInfo = new WorkerInfo();
  initInfo.logAll();

  Map<String, String> workerProps = (Map) workerProperties;

  LOGGER.info("Scanning for plugin classes. This might take a moment ...");
  Plugins plugins = new Plugins(workerProps);
  plugins.compareAndSwapWithDelegatingLoader();
  StandaloneConfig config = new StandaloneConfig(workerProps);

  String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
  LOGGER.debug("Kafka cluster ID: {}", kafkaClusterId);

  RestServer rest = new RestServer(config);
  URI advertisedUrl = rest.advertisedUrl();
  String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

  Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
  this.herder = new StandaloneHerder(worker, kafkaClusterId);
  connectionString = advertisedUrl.toString() + herder.kafkaClusterId();

  this.connect = new Connect(herder, rest);
  LOGGER.info(
      "Kafka Connect standalone worker initialization took {}ms",
      time.hiResClockMs() - initStart);
}
 
示例4
public static void main(String[] argv) {
  Mirus.Args args = new Mirus.Args();
  JCommander jCommander =
      JCommander.newBuilder()
          .programName(OffsetStatus.class.getSimpleName())
          .addObject(args)
          .build();
  try {
    jCommander.parse(argv);
  } catch (Exception e) {
    jCommander.usage();
    throw e;
  }
  if (args.help) {
    jCommander.usage();
    System.exit(1);
  }

  try {
    Map<String, String> workerProps =
        !args.workerPropertiesFile.isEmpty()
            ? Utils.propsToStringMap(Utils.loadProps(args.workerPropertiesFile))
            : Collections.emptyMap();

    applyOverrides(args.overrides, workerProps);

    Mirus mirus = new Mirus();
    Connect connect = mirus.startConnect(workerProps);

    // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
    connect.awaitStop();
  } catch (Throwable t) {
    log.error("Stopping due to error", t);
    Exit.exit(2);
  }
}
 
示例5
public void startup() throws InterruptedException {
    for (int i = 0; i < numNodes; i++) {
        Map<String, String> workerProps = new HashMap<>();
        workerProps.put("listeners", "http://localhost:" + (STARTING_PORT + i));
        workerProps.put("plugin.path", String.join(",", pluginPath));
        workerProps.put("group.id", toString());
        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        workerProps.put("key.converter.schemas.enable", "false");
        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        workerProps.put("value.converter.schemas.enable", "false");
        workerProps.put("offset.storage.topic", getClass().getSimpleName() + "-offsets");
        workerProps.put("offset.storage.replication.factor", "3");
        workerProps.put("config.storage.topic", getClass().getSimpleName() + "-config");
        workerProps.put("config.storage.replication.factor", "3");
        workerProps.put("status.storage.topic", getClass().getSimpleName() + "-status");
        workerProps.put("status.storage.replication.factor", "3");
        workerProps.put("bootstrap.servers", brokerList);
        //DistributedConfig config = new DistributedConfig(workerProps);
        //RestServer rest = new RestServer(config);
        //rest.initializeServer();
        CountDownLatch l = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            ConnectDistributed connectDistributed = new ConnectDistributed();
            Connect connect = connectDistributed.startConnect(workerProps);
            l.countDown();
            connectInstances.add(connect);
            connect.awaitStop();
        });
        thread.setDaemon(false);
        thread.start();
        l.await();
    }
}
 
示例6
private Connect startConnect(Map<String, String> workerProps) {
    log.info("Scanning for plugin classes. This might take a moment ...");
    Plugins plugins = new Plugins(workerProps);
    // ignore this TCCL switch plugins.compareAndSwapWithDelegatingLoader();
    DistributedConfig config = new DistributedConfig(workerProps);

    String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
    log.debug("Kafka cluster ID: {}", kafkaClusterId);

    RestServer rest = new RestServer(config);
    rest.initializeServer();

    URI advertisedUrl = rest.advertisedUrl();
    String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

    KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
    offsetBackingStore.configure(config);
    Object connectorClientConfigOverridePolicy = Compatibility.createConnectorClientConfigOverridePolicy(plugins, config);

    Worker worker = Compatibility.createWorker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
    WorkerConfigTransformer configTransformer = worker.configTransformer();

    Converter internalValueConverter = worker.getInternalValueConverter();
    StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
    statusBackingStore.configure(config);

    ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
        internalValueConverter,
        config,
        configTransformer);

    DistributedHerder herder = Compatibility.createDistributedHerder(config, time, worker,
                                                     kafkaClusterId, statusBackingStore, configBackingStore,
                                                     advertisedUrl.toString(), connectorClientConfigOverridePolicy);

    final Connect connect = new Connect(herder, rest);
    log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
    try {
        connect.start();
    } catch (Exception e) {
        log.error("Failed to start Connect", e);
        connect.stop();
        throw new IllegalStateException(e);
    }

    return connect;
}
 
示例7
/**
 * This method is based on the the standard Kafka Connect start logic in {@link
 * org.apache.kafka.connect.cli.ConnectDistributed#startConnect(Map)}, but with `clientid` prefix
 * support, to prevent JMX metric names from clashing. Also supports command-line property
 * overrides (useful for run-time port configuration), and starts the Mirus {@link
 * HerderStatusMonitor}.
 */
public Connect startConnect(Map<String, String> workerProps) {
  log.info("Scanning for plugin classes. This might take a moment ...");
  Plugins plugins = new Plugins(workerProps);
  plugins.compareAndSwapWithDelegatingLoader();
  DistributedConfig distributedConfig = configWithClientIdSuffix(workerProps, "herder");

  MirusConfig mirusConfig = new MirusConfig(workerProps);

  String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
  log.debug("Kafka cluster ID: {}", kafkaClusterId);

  RestServer rest = new RestServer(configWithClientIdSuffix(workerProps, "rest"));
  rest.initializeServer();

  URI advertisedUrl = rest.advertisedUrl();
  String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

  KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
  offsetBackingStore.configure(configWithClientIdSuffix(workerProps, "offset"));

  WorkerConfig workerConfigs = configWithClientIdSuffix(workerProps, "worker");

  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy =
      plugins.newPlugin(
          distributedConfig.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
          workerConfigs,
          ConnectorClientConfigOverridePolicy.class);

  Worker worker =
      new Worker(
          workerId,
          time,
          plugins,
          workerConfigs,
          offsetBackingStore,
          connectorClientConfigOverridePolicy);

  WorkerConfigTransformer configTransformer = worker.configTransformer();

  Converter internalValueConverter = worker.getInternalValueConverter();
  StatusBackingStore statusBackingStore =
      new KafkaStatusBackingStore(time, internalValueConverter);
  statusBackingStore.configure(configWithClientIdSuffix(workerProps, "status"));

  ConfigBackingStore configBackingStore =
      new KafkaConfigBackingStore(
          internalValueConverter,
          configWithClientIdSuffix(workerProps, "config"),
          configTransformer);

  DistributedHerder herder =
      new DistributedHerder(
          distributedConfig,
          time,
          worker,
          kafkaClusterId,
          statusBackingStore,
          configBackingStore,
          advertisedUrl.toString(),
          connectorClientConfigOverridePolicy);

  // Initialize HerderStatusMonitor
  boolean autoStartTasks = mirusConfig.getTaskAutoRestart();
  boolean autoStartConnectors = mirusConfig.getConnectorAutoRestart();
  long pollingCycle = mirusConfig.getTaskStatePollingInterval();
  HerderStatusMonitor herderStatusMonitor =
      new HerderStatusMonitor(
          herder, workerId, pollingCycle, autoStartTasks, autoStartConnectors);
  Thread herderStatusMonitorThread = new Thread(herderStatusMonitor);
  herderStatusMonitorThread.setName("herder-status-monitor");

  final Connect connect = new Connect(herder, rest);
  log.info("Mirus worker initialization took {}ms", time.hiResClockMs() - initStart);
  try {
    connect.start();
  } catch (Exception e) {
    log.error("Failed to start Mirus", e);
    connect.stop();
    Exit.exit(3);
  }

  herderStatusMonitorThread.start();

  return connect;
}