Java源码示例:org.apache.arrow.flight.Location
示例1
public static Pair<Location, FlightServer.Builder> sslHelper(FlightServer.Builder serverBuilder, DremioConfig config, boolean useSsl, String hostname, int port, String sslHostname) {
Location location;
Location exLocation;
try {
if (!useSsl) {
throw new UnsupportedOperationException("Don't use ssl");
}
Pair<InputStream, InputStream> pair = ssl(config, sslHostname);
location = Location.forGrpcTls(hostname, port);
exLocation = Location.forGrpcTls(sslHostname, port);
serverBuilder.useTls(pair.getRight(), pair.getLeft()).location(location);
} catch (Exception e) {
location = Location.forGrpcInsecure(hostname, port);
exLocation = Location.forGrpcInsecure(sslHostname, port);
serverBuilder.location(location);
}
return Pair.of(exLocation, serverBuilder);
}
示例2
@Test
public void connect() throws Exception {
InetAddress ip = InetAddress.getLocalHost();
Location location = Location.forGrpcInsecure(ip.getHostName(), 47470);
try (FlightClient c = flightClient(getAllocator(), location)) {
c.authenticate(new BasicClientAuthHandler(SystemUser.SYSTEM_USERNAME, null));
String sql = "select * from sys.options";
FlightInfo info = c.getInfo(FlightDescriptor.command(sql.getBytes()));
long total = info.getEndpoints().stream()
.map(this::submit)
.map(TestFlightEndpoint::get)
.mapToLong(Long::longValue)
.sum();
Assert.assertTrue(total > 1);
System.out.println(total);
}
}
示例3
@Test
public void connect() throws Exception {
certs();
InetAddress ip = InetAddress.getLocalHost();
Location location = Location.forGrpcTls(ip.getHostName(), 47470);
try (FlightClient c = flightClient(getAllocator(), location)) {
c.authenticate(new BasicClientAuthHandler(SystemUser.SYSTEM_USERNAME, null));
String sql = "select * from sys.options";
FlightInfo info = c.getInfo(FlightDescriptor.command(sql.getBytes()));
long total = info.getEndpoints().stream()
.map(this::submit)
.map(TestSslFlightEndpoint::get)
.mapToLong(Long::longValue)
.sum();
Assert.assertTrue(total > 1);
System.out.println(total);
}
}
示例4
@Override
public Void initialize(BindingProvider provider) throws Exception {
if (!Boolean.parseBoolean(PropertyHelper.getFromEnvProperty("dremio.flight.enabled", Boolean.toString(false)))) {
logger.info("Flight plugin is not enabled, skipping initialization");
return null;
}
this.allocator = provider.provider(BootStrapContext.class).get().getAllocator().newChildAllocator("arrow-flight", 0, Long.MAX_VALUE);
AuthValidator validator = new AuthValidator(provider.provider(UserService.class), provider.provider(SabotContext.class));
FlightServer.Builder serverBuilder = FlightServer.builder().allocator(allocator).authHandler(new BasicServerAuthHandler(validator));
DremioConfig config = null;
try {
config = provider.lookup(DremioConfig.class);
} catch (Throwable t) {
}
Pair<Location, FlightServer.Builder> pair = SslHelper.sslHelper(
serverBuilder,
config,
useSsl,
InetAddress.getLocalHost().getHostName(),
port,
host);
producer = new Producer(
pair.getKey(),
provider.provider(UserWorker.class),
provider.provider(SabotContext.class),
allocator,
validator);
pair.getRight().producer(producer);
server = pair.getRight().build();
server.start();
logger.info("set up flight plugin on port {} and host {}", pair.getKey().getUri().getPort(), pair.getKey().getUri().getHost());
return null;
}
示例5
Producer(Location location, Provider<UserWorker> worker, Provider<SabotContext> context, BufferAllocator allocator, AuthValidator validator) {
super();
this.location = location;
this.worker = worker;
this.context = context;
this.allocator = allocator;
this.validator = validator;
// kvStore = context.get().getKVStoreProvider().getStore(FlightStoreCreator.class);
}
示例6
public FormationPlugin(SabotContext context, String name, Provider<StoragePluginId> pluginIdProvider) {
this.context = context;
this.pluginIdProvider = pluginIdProvider;
if (!Boolean.parseBoolean(PropertyHelper.getFromEnvProperty("dremio.flight.parallel.enabled", Boolean.toString(false)))) { //todo add this and others to DremioConfig
allocator = null;
thisLocation = null;
server = null;
producer = null;
validator = null;
logger.info("Parallel flight plugin is not enabled, skipping initialization");
return;
}
this.allocator = context.getAllocator().newChildAllocator("formation-" + name, 0, Long.MAX_VALUE);
String hostname = PropertyHelper.getFromEnvProperty("dremio.flight.host", context.getEndpoint().getAddress());
int port = Integer.parseInt(PropertyHelper.getFromEnvProperty("dremio.flight.port", Integer.toString(FLIGHT_PORT)));
FlightServer.Builder serverBuilder = FlightServer.builder().allocator(this.allocator);
Pair<Location, FlightServer.Builder> pair = SslHelper.sslHelper(
serverBuilder,
context.getDremioConfig(),
Boolean.parseBoolean(PropertyHelper.getFromEnvProperty("dremio.formation.use-ssl", "false")),
context.getEndpoint().getAddress(),
port,
hostname);
thisLocation = pair.getKey();
this.producer = new FormationFlightProducer(thisLocation, allocator);
this.validator = new AuthValidator(context.isUserAuthenticationEnabled() ? context.getUserService() : null, context);
this.server = pair.getRight().producer(producer).authHandler(new BasicServerAuthHandler(validator)).build();
logger.info("set up formation plugin on port {} and host {}", thisLocation.getUri().getPort(), thisLocation.getUri().getHost());
}
示例7
private synchronized void refreshClients() {
List<FlightClient> oldClients = clients;
clients = context.getExecutors().stream()
.map(e -> FlightClient.builder().allocator(allocator).location(Location.forGrpcInsecure(e.getAddress(), FLIGHT_PORT)).build()).collect(Collectors.toList());
try {
AutoCloseables.close(oldClients);
} catch (Exception ex) {
logger.error("Failure while refreshing clients.", ex);
}
}
示例8
private static FlightClient flightClient(BufferAllocator allocator, Location location) {
try {
InputStream certStream = certs();
return FlightClient.builder()
.allocator(allocator)
.location(location)
.useTls()
.trustedCertificates(certStream)
.build();
} catch (GeneralSecurityException | IOException e) {
throw new RuntimeException(e);
}
}
示例9
public DataSourceReader createReader(DataSourceOptions dataSourceOptions) {
Location defaultLocation = Location.forGrpcInsecure(
dataSourceOptions.get("host").orElse("localhost"),
dataSourceOptions.getInt("port", 47470)
);
String sql = dataSourceOptions.get("path").orElse("");
FlightDataSourceReader.FactoryOptions options = new FlightDataSourceReader.FactoryOptions(
defaultLocation,
sql,
dataSourceOptions.get("username").orElse("anonymous"),
dataSourceOptions.get("password").orElse(null),
dataSourceOptions.getBoolean("parallel", false), null);
Broadcast<FlightDataSourceReader.FactoryOptions> bOptions = lazySparkContext().broadcast(options);
return new FlightDataSourceReader(bOptions);
}
示例10
private List<InputPartition<ColumnarBatch>> planBatchInputPartitionsSerial(FlightInfo info) {
LOGGER.warn("planning partitions for endpoints {}", Joiner.on(", ").join(info.getEndpoints().stream().map(e -> e.getLocations().get(0).getUri().toString()).collect(Collectors.toList())));
List<InputPartition<ColumnarBatch>> batches = info.getEndpoints().stream().map(endpoint -> {
Location location = (endpoint.getLocations().isEmpty()) ?
Location.forGrpcInsecure(defaultLocation.getUri().getHost(), defaultLocation.getUri().getPort()) :
endpoint.getLocations().get(0);
FactoryOptions options = dataSourceOptions.value().copy(location, endpoint.getTicket().getBytes());
LOGGER.warn("X1 {}", dataSourceOptions.value());
return new FlightDataReaderFactory(lazySparkContext().broadcast(options));
}).collect(Collectors.toList());
LOGGER.info("Created {} batches from arrow endpoints", batches.size());
return batches;
}
示例11
FactoryOptions(Location location, String sql, String username, String password, boolean parallel, byte[] ticket) {
this.host = location.getUri().getHost();
this.port = location.getUri().getPort();
this.sql = sql;
this.username = username;
this.password = password;
this.parallel = parallel;
this.ticket = ticket;
}
示例12
FactoryOptions copy(Location location, byte[] ticket) {
return new FactoryOptions(
location,
sql,
username,
password,
parallel,
ticket);
}
示例13
public void planParallelized(PlanningSet planningSet) {
logger.debug("plan parallel called, collecting endpoints");
List<FlightEndpoint> endpoints = Lists.newArrayList();
FlightEndpoint screenEndpoint = null;
for (Wrapper wrapper : planningSet.getFragmentWrapperMap().values()) {
String majorId = String.valueOf(wrapper.getMajorFragmentId());
try {
Boolean isWriter = wrapper.getNode().getRoot().accept(new WriterVisitor(), false);
if (!isWriter) {
continue;
}
} catch (Throwable throwable) {
logger.warn("unable to complete visitor ", throwable);
}
CoreOperatorType op = CoreOperatorType.valueOf(wrapper.getNode().getRoot().getOperatorType());
boolean isScreen = CoreOperatorType.SCREEN.equals(op) && planningSet.getFragmentWrapperMap().size() > 1;
logger.info("Creating tickets for {}. MajorId {}", wrapper.getNode().getRoot(), majorId);
for (int i = 0; i < wrapper.getAssignedEndpoints().size(); i++) {
CoordinationProtos.NodeEndpoint endpoint = wrapper.getAssignedEndpoint(i);
logger.warn("Creating ticket for {} . MajorId {} and index {}", wrapper.getNode().getRoot(), majorId, i);
Ticket ticket = new Ticket(JOINER.join(
majorId,
String.valueOf(i),
endpoint.getAddress(),
endpoint.getUserPort()
).getBytes());
int port = Integer.parseInt(PropertyHelper.getFromEnvProperty("dremio.formation.port", Integer.toString(FormationPlugin.FLIGHT_PORT)));
String host = PropertyHelper.getFromEnvProperty("dremio.formation.host", endpoint.getAddress());
Location location = Location.forGrpcInsecure(host, port);
if (isScreen) {
screenEndpoint = new FlightEndpoint(ticket, location);
} else {
endpoints.add(new FlightEndpoint(ticket, location));
}
}
}
if (screenEndpoint != null && endpoints.isEmpty()) {
logger.info("Adding a screen endpoint as its the only possible endpoint!");
endpoints.add(screenEndpoint);
} else {
logger.warn("Skipping a screen as it lies above the writer.");
}
logger.debug("built {} parallel endpoints", endpoints.size());
future.complete(endpoints);
}
示例14
public FormationFlightProducer(Location port, BufferAllocator allocator) {
this.port = port;
this.allocator = allocator;
}
示例15
private void buildIfNecessary() {
if (config != null) {
return;
}
if (infos == null) {
infos = clients.stream()
.map(c -> c.getInfo(FlightDescriptor.path(key.getName())))
.collect(Collectors.toList());
}
Preconditions.checkArgument(!infos.isEmpty());
Schema schema = null;
long records = 0;
List<FlightEndpoint> endpoints = new ArrayList<>();
for (FlightInfo info : infos) {
schema = info.getSchema();
records += info.getRecords();
endpoints.addAll(info.getEndpoints());
}
config = new DatasetConfig()
.setFullPathList(key.getComponents())
.setName(key.getName())
.setType(DatasetType.PHYSICAL_DATASET)
.setId(new EntityId().setId(UUID.randomUUID().toString()))
.setReadDefinition(new ReadDefinition()
.setScanStats(new ScanStats().setRecordCount(records)
.setScanFactor(ScanCostFactor.PARQUET.getFactor())))
.setOwner(SystemUser.SYSTEM_USERNAME)
.setPhysicalDataset(new PhysicalDataset())
.setRecordSchema(new BatchSchema(schema.getFields()).toByteString())
.setSchemaVersion(DatasetHelper.CURRENT_VERSION);
splits = new ArrayList<>();
List<DatasetSplit> dSplits = Lists.newArrayList();
// int i =0;
for (FlightEndpoint ep : endpoints) {
List<Location> locations = ep.getLocations();
if (locations.size() > 1) {
throw new UnsupportedOperationException("I dont know what more than one location means, not handling it");
}
DatasetSplitAffinity a = DatasetSplitAffinity.of(locations.get(0).getUri().getHost(), 100d);
// split.setSplitKey(Integer.toString(i));
Flight.Ticket ticket = Flight.Ticket.newBuilder().setTicket(ByteString.copyFrom(ep.getTicket().getBytes())).build();
dSplits.add(DatasetSplit.of(ImmutableList.of(a), records / endpoints.size(), records, ticket::writeTo));
}
splits.add(PartitionChunk.of(dSplits));
}
示例16
private static FlightClient flightClient(BufferAllocator allocator, Location location) {
return FlightClient.builder().allocator(allocator).location(location).build();
}
示例17
public Location getLocation() {
return Location.forGrpcInsecure(host, port);
}
示例18
public FlightClientFactory(Location defaultLocation, String username, String password, boolean parallel) {
this.defaultLocation = defaultLocation;
this.username = username;
this.password = (password == null || password.equals("$NULL$")) ? null : password;
this.parallel = parallel;
}
示例19
public FlightDataReader(Broadcast<FlightDataSourceReader.FactoryOptions> options) {
this.options = options;
this.location = Location.forGrpcInsecure(options.value().getHost(), options.value().getPort());
this.ticket = new Ticket(options.value().getTicket());
}