Java源码示例:org.apache.kafka.connect.runtime.Connect
示例1
/**
* here does not seem to be a public interface for embedding a Kafka connect runtime,
* therefore, this code is modeled from the behavior taken from
* https://github.com/apache/kafka/blob/2.1/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
* and performs the initialization in a roughly similar manner.
*
*/
private void init() {
LOG.info("Started worked initialization");
Time time = Time.SYSTEM;
// Initializes the system runtime information and logs some of the information
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
Properties props = kafkaConnectPropertyFactory.getProperties();
Map<String, String> standAloneProperties = Utils.propsToStringMap(props);
// Not needed, but we need this one to initialize the worker
Plugins plugins = new Plugins(standAloneProperties);
StandaloneConfig config = new StandaloneConfig(standAloneProperties);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
RestServer rest = new RestServer(config);
rest.initializeServer();
/*
According to the Kafka source code "... Worker runs a (dynamic) set of tasks
in a set of threads, doing the work of actually moving data to/from Kafka ..."
*/
Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
/*
From Kafka source code: " ... The herder interface tracks and manages workers
and connectors ..."
*/
herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy);
connect = new Connect(herder, rest);
LOG.info("Finished initializing the worker");
}
示例2
void start() {
final Map<String, String> workerProps = new HashMap<>();
workerProps.put("bootstrap.servers", bootstrapServers);
workerProps.put("offset.flush.interval.ms", Integer.toString(offsetFlushInterval));
// These don't matter much (each connector sets its own converters), but need to be filled with valid classes.
workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter.schemas.enable", "false");
// Don't need it since we'll memory MemoryOffsetBackingStore.
workerProps.put("offset.storage.file.filename", "");
workerProps.put("plugin.path", pluginDir.getPath());
final Time time = Time.SYSTEM;
final String workerId = "test-worker";
final Plugins plugins = new Plugins(workerProps);
final StandaloneConfig config = new StandaloneConfig(workerProps);
final Worker worker = new Worker(
workerId, time, plugins, config, new MemoryOffsetBackingStore());
herder = new StandaloneHerder(worker);
final RestServer rest = new RestServer(config);
connect = new Connect(herder, rest);
connect.start();
}
示例3
@SuppressWarnings("unchecked")
ConnectStandalone(final Properties workerProperties) {
Time time = Time.SYSTEM;
LOGGER.info("Kafka Connect standalone worker initializing ...");
long initStart = time.hiResClockMs();
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
Map<String, String> workerProps = (Map) workerProperties;
LOGGER.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
LOGGER.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
this.herder = new StandaloneHerder(worker, kafkaClusterId);
connectionString = advertisedUrl.toString() + herder.kafkaClusterId();
this.connect = new Connect(herder, rest);
LOGGER.info(
"Kafka Connect standalone worker initialization took {}ms",
time.hiResClockMs() - initStart);
}
示例4
public static void main(String[] argv) {
Mirus.Args args = new Mirus.Args();
JCommander jCommander =
JCommander.newBuilder()
.programName(OffsetStatus.class.getSimpleName())
.addObject(args)
.build();
try {
jCommander.parse(argv);
} catch (Exception e) {
jCommander.usage();
throw e;
}
if (args.help) {
jCommander.usage();
System.exit(1);
}
try {
Map<String, String> workerProps =
!args.workerPropertiesFile.isEmpty()
? Utils.propsToStringMap(Utils.loadProps(args.workerPropertiesFile))
: Collections.emptyMap();
applyOverrides(args.overrides, workerProps);
Mirus mirus = new Mirus();
Connect connect = mirus.startConnect(workerProps);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
} catch (Throwable t) {
log.error("Stopping due to error", t);
Exit.exit(2);
}
}
示例5
public void startup() throws InterruptedException {
for (int i = 0; i < numNodes; i++) {
Map<String, String> workerProps = new HashMap<>();
workerProps.put("listeners", "http://localhost:" + (STARTING_PORT + i));
workerProps.put("plugin.path", String.join(",", pluginPath));
workerProps.put("group.id", toString());
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("key.converter.schemas.enable", "false");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter.schemas.enable", "false");
workerProps.put("offset.storage.topic", getClass().getSimpleName() + "-offsets");
workerProps.put("offset.storage.replication.factor", "3");
workerProps.put("config.storage.topic", getClass().getSimpleName() + "-config");
workerProps.put("config.storage.replication.factor", "3");
workerProps.put("status.storage.topic", getClass().getSimpleName() + "-status");
workerProps.put("status.storage.replication.factor", "3");
workerProps.put("bootstrap.servers", brokerList);
//DistributedConfig config = new DistributedConfig(workerProps);
//RestServer rest = new RestServer(config);
//rest.initializeServer();
CountDownLatch l = new CountDownLatch(1);
Thread thread = new Thread(() -> {
ConnectDistributed connectDistributed = new ConnectDistributed();
Connect connect = connectDistributed.startConnect(workerProps);
l.countDown();
connectInstances.add(connect);
connect.awaitStop();
});
thread.setDaemon(false);
thread.start();
l.await();
}
}
示例6
private Connect startConnect(Map<String, String> workerProps) {
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
// ignore this TCCL switch plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
Object connectorClientConfigOverridePolicy = Compatibility.createConnectorClientConfigOverridePolicy(plugins, config);
Worker worker = Compatibility.createWorker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer);
DistributedHerder herder = Compatibility.createDistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString(), connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
throw new IllegalStateException(e);
}
return connect;
}
示例7
/**
* This method is based on the the standard Kafka Connect start logic in {@link
* org.apache.kafka.connect.cli.ConnectDistributed#startConnect(Map)}, but with `clientid` prefix
* support, to prevent JMX metric names from clashing. Also supports command-line property
* overrides (useful for run-time port configuration), and starts the Mirus {@link
* HerderStatusMonitor}.
*/
public Connect startConnect(Map<String, String> workerProps) {
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = configWithClientIdSuffix(workerProps, "herder");
MirusConfig mirusConfig = new MirusConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(configWithClientIdSuffix(workerProps, "rest"));
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(configWithClientIdSuffix(workerProps, "offset"));
WorkerConfig workerConfigs = configWithClientIdSuffix(workerProps, "worker");
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy =
plugins.newPlugin(
distributedConfig.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
workerConfigs,
ConnectorClientConfigOverridePolicy.class);
Worker worker =
new Worker(
workerId,
time,
plugins,
workerConfigs,
offsetBackingStore,
connectorClientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore =
new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(configWithClientIdSuffix(workerProps, "status"));
ConfigBackingStore configBackingStore =
new KafkaConfigBackingStore(
internalValueConverter,
configWithClientIdSuffix(workerProps, "config"),
configTransformer);
DistributedHerder herder =
new DistributedHerder(
distributedConfig,
time,
worker,
kafkaClusterId,
statusBackingStore,
configBackingStore,
advertisedUrl.toString(),
connectorClientConfigOverridePolicy);
// Initialize HerderStatusMonitor
boolean autoStartTasks = mirusConfig.getTaskAutoRestart();
boolean autoStartConnectors = mirusConfig.getConnectorAutoRestart();
long pollingCycle = mirusConfig.getTaskStatePollingInterval();
HerderStatusMonitor herderStatusMonitor =
new HerderStatusMonitor(
herder, workerId, pollingCycle, autoStartTasks, autoStartConnectors);
Thread herderStatusMonitorThread = new Thread(herderStatusMonitor);
herderStatusMonitorThread.setName("herder-status-monitor");
final Connect connect = new Connect(herder, rest);
log.info("Mirus worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Mirus", e);
connect.stop();
Exit.exit(3);
}
herderStatusMonitorThread.start();
return connect;
}