Java源码示例:org.apache.accumulo.core.client.MultiTableBatchWriter
示例1
/**
* Prepares the value to be inserted as history, then calls insert
*
* @param writer
* @param shardId
* @param datatype
* @param eventUid
* @param viz
* @param fieldName
* @param fieldValue
* @param timestamp
* @param isIndexOnlyField
* @param isIndexed
* @param isReverseIndexed
* @param dataTypes
* @param user
* @param mode
* @throws Exception
*/
protected void insertHistory(MultiTableBatchWriter writer, String shardId, String datatype, String eventUid, ColumnVisibility viz, String fieldName,
String fieldValue, long timestamp, boolean isIndexOnlyField, boolean isIndexed, boolean isReverseIndexed, Set<Type<?>> dataTypes,
String user, MODE mode) throws Exception {
// Capture the fact of the insert in a history element
// History element has the following structure
// FIELD NAME: HISTORY_<ORIGINAL FIELD NAME>
// for Deletes - FIELD VALUE: <timestamp of original field value> : <user that modified/deleted it> : < original field value> : <operation type>
// for Inserts - FIELD VALUE: <timestamp of operation> : <user that modified it> : < original field value> : <operation type>
// Timestamp of history element is now.
String historyFieldName = HISTORY_PREFIX + fieldName;
String historyFieldValue;
if (mode.equals(MODE.INSERT)) {
historyFieldValue = System.currentTimeMillis() + ":" + user + ":" + fieldValue;
historyFieldValue += ":insert";
} else if (mode.equals(MODE.DELETE)) {
historyFieldValue = System.currentTimeMillis() + ":" + user + ":" + fieldValue;
historyFieldValue += ":delete";
} else { // update
historyFieldValue = System.currentTimeMillis() + ":" + user + ":" + fieldValue;
historyFieldValue += ":update";
}
insert(writer, shardId, datatype, eventUid, viz, historyFieldName, historyFieldValue, timestamp, isIndexOnlyField, isIndexed, isReverseIndexed,
dataTypes, true, false, user, mode);
}
示例2
protected Multimap<String,Key> removeMetadataEntries(Set<String> fields, Text cf) throws AccumuloSecurityException, AccumuloException,
TableNotFoundException {
Multimap<String,Key> metadataEntries = HashMultimap.create();
MultiTableBatchWriter multiTableWriter = connector.createMultiTableBatchWriter(new BatchWriterConfig());
BatchWriter writer = multiTableWriter.getBatchWriter(QueryTestTableHelper.METADATA_TABLE_NAME);
for (String field : fields) {
Mutation mutation = new Mutation(new Text(field));
Scanner scanner = connector.createScanner(QueryTestTableHelper.METADATA_TABLE_NAME, new Authorizations());
scanner.fetchColumnFamily(cf);
scanner.setRange(new Range(new Text(field)));
boolean foundEntries = false;
for (Map.Entry<Key,Value> entry : scanner) {
foundEntries = true;
metadataEntries.put(field, entry.getKey());
mutation.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), entry.getKey().getColumnVisibilityParsed());
}
scanner.close();
if (foundEntries) {
writer.addMutation(mutation);
}
}
writer.close();
connector.tableOperations().compact(QueryTestTableHelper.METADATA_TABLE_NAME, new Text("\0"), new Text("~"), true, true);
return metadataEntries;
}
示例3
private static FreeTextIndexer getFreeTextIndexer(final Configuration conf) throws IOException {
if (!conf.getBoolean(ENABLE_FREETEXT, true)) {
return null;
}
final AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
freeText.setConf(conf);
Connector connector;
try {
connector = ConfigUtils.getConnector(conf);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Error when attempting to create a connection for writing the freeText index.", e);
}
final MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
freeText.setConnector(connector);
freeText.setMultiTableBatchWriter(mtbw);
freeText.init();
return freeText;
}
示例4
private static TemporalIndexer getTemporalIndexer(final Configuration conf) throws IOException {
if (!conf.getBoolean(ENABLE_TEMPORAL, true)) {
return null;
}
final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
temporal.setConf(conf);
Connector connector;
try {
connector = ConfigUtils.getConnector(conf);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Error when attempting to create a connection for writing the temporal index.", e);
}
final MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
temporal.setConnector(connector);
temporal.setMultiTableBatchWriter(mtbw);
temporal.init();
return temporal;
}
示例5
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "triplestore_");
conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
// The temporal predicates are from http://linkedevents.org/ontology
// and http://motools.sourceforge.net/event/event.html
conf.setStrings(ConfigUtils.TEMPORAL_PREDICATES_LIST, ""
+ URI_PROPERTY_AT_TIME + ","
+ URI_PROPERTY_CIRCA + ","
+ URI_PROPERTY_EVENT_TIME);
tIndexer = new AccumuloTemporalIndexer();
tIndexer.setConf(conf);
Connector connector = ConfigUtils.getConnector(conf);
MultiTableBatchWriter mt_bw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
tIndexer.setConnector(connector);
tIndexer.setMultiTableBatchWriter(mt_bw);
tIndexer.init();
}
示例6
protected void addMetadataEntries(Multimap<String,Key> metadataEntries) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
MultiTableBatchWriter multiTableWriter = connector.createMultiTableBatchWriter(new BatchWriterConfig());
BatchWriter writer = multiTableWriter.getBatchWriter(QueryTestTableHelper.METADATA_TABLE_NAME);
for (String field : metadataEntries.keySet()) {
Mutation mutation = new Mutation(new Text(field));
for (Key key : metadataEntries.get(field)) {
metadataEntries.put(field, key);
mutation.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), new Value());
}
writer.addMutation(mutation);
}
writer.close();
connector.tableOperations().compact(QueryTestTableHelper.METADATA_TABLE_NAME, new Text("\0"), new Text("~"), true, true);
}
示例7
public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf)
throws AccumuloException, AccumuloSecurityException {
final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
final Connector connector = ConfigUtils.getConnector(conf);
return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
}
示例8
/**
* Insert new field value with provided timestamp
*
* @param writer
* @param shardId
* @param datatype
* @param eventUid
* @param viz
* @param fieldName
* @param fieldValue
* @param timestamp
* @param isIndexed
* @param isReverseIndexed
* @param dataTypes
* @param historicalValue
* @param insertHistory
* @param user
* @param mode
* @throws Exception
*/
protected void insert(MultiTableBatchWriter writer, String shardId, String datatype, String eventUid, ColumnVisibility viz, String fieldName,
String fieldValue, long timestamp, boolean isIndexOnlyField, boolean isIndexed, boolean isReverseIndexed, Set<Type<?>> dataTypes,
boolean historicalValue, boolean insertHistory, String user, MODE mode) throws Exception {
// increment the term frequency
Mutation m = new Mutation(fieldName);
if (!isIndexOnlyField) {
m.put(ColumnFamilyConstants.COLF_E, new Text(datatype), NULL_VALUE);
m.put(ColumnFamilyConstants.COLF_F, new Text(datatype + NULL_BYTE + DateHelper.format(timestamp)),
new Value(SummingCombiner.VAR_LEN_ENCODER.encode(1L)));
}
// Insert the new field.
Mutation e = new Mutation(shardId);
if (!isIndexOnlyField) {
e.put(new Text(datatype + NULL_BYTE + eventUid), new Text(fieldName + NULL_BYTE + fieldValue), viz, timestamp, NULL_VALUE);
}
if (isIndexed) {
long tsToDay = (timestamp / MS_PER_DAY) * MS_PER_DAY;
// Create a UID object for the Value
Builder uidBuilder = Uid.List.newBuilder();
uidBuilder.setIGNORE(false);
uidBuilder.setCOUNT(1);
uidBuilder.addUID(eventUid);
Uid.List uidList = uidBuilder.build();
Value val = new Value(uidList.toByteArray());
for (Type<?> n : dataTypes) {
String indexTerm = fieldValue;
if (historicalValue) {
int lastColon = fieldValue.lastIndexOf(":");
// The next two lines build up to the beginning of the indexTerm by finding the first two colons
// We could use split if we could guarantee a colon never appears in the index term itself
int indexTermLeadingColon = fieldValue.indexOf(":", 0);
indexTermLeadingColon = fieldValue.indexOf(":", indexTermLeadingColon + 1);
indexTerm = fieldValue.substring(indexTermLeadingColon + 1, lastColon);
}
String indexedValue = n.normalize(indexTerm);
// Insert the global index entry
Mutation i = new Mutation(indexedValue);
i.put(fieldName, shardId + NULL_BYTE + datatype, viz, tsToDay, val);
writer.getBatchWriter(this.getIndexTableName()).addMutation(i);
m.put(ColumnFamilyConstants.COLF_I, new Text(datatype + NULL_BYTE + n.getClass().getName()), NULL_VALUE);
if (isReverseIndexed) {
String reverseIndexedValue = StringUtils.reverse(indexedValue);
// Insert the global reverse index entry
Mutation rm = new Mutation(reverseIndexedValue);
rm.put(fieldName, shardId + NULL_BYTE + datatype, viz, tsToDay, val);
writer.getBatchWriter(this.getReverseIndexTableName()).addMutation(rm);
m.put(ColumnFamilyConstants.COLF_RI, new Text(datatype + NULL_BYTE + n.getClass().getName()), NULL_VALUE);
}
// Insert the field index entry
e.put(new Text(FIELD_INDEX_PREFIX + fieldName), new Text(indexedValue + NULL_BYTE + datatype + NULL_BYTE + eventUid), viz, timestamp,
NULL_VALUE);
}
}
writer.getBatchWriter(this.getEventTableName()).addMutation(e);
writer.getBatchWriter(this.getMetadataTableName()).addMutation(m);
writer.flush();
if (!isIndexOnlyField && insertHistory) {
insertHistory(writer, shardId, datatype, eventUid, viz, fieldName, fieldValue, timestamp, isIndexOnlyField, isIndexed, isReverseIndexed, dataTypes,
user, mode);
}
}
示例9
@Override
@Deprecated
public MultiTableBatchWriter createMultiTableBatchWriter(long l, long l1, int i) {
return delegate.createMultiTableBatchWriter(l, l1, i);
}
示例10
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig batchWriterConfig) {
return delegate.createMultiTableBatchWriter(batchWriterConfig);
}
示例11
@Override
public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
}
示例12
protected MultiTableBatchWriter getMultiTableBatchWriter(){
return mt_bw;
}
示例13
@Test
public void testTemporalIndexing() throws Exception {
final TemporalInstant[] instants = {
new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 01),
new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 02),
new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03),
new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03)
};
final Statement[] statements = new Statement[instants.length];
RyaOutputFormat.setCoreTablesEnabled(job, false);
RyaOutputFormat.setFreeTextEnabled(job, false);
RyaOutputFormat.setTemporalEnabled(job, true);
RyaOutputFormat.setEntityEnabled(job, false);
final ValueFactory vf = SimpleValueFactory.getInstance();
for (int i = 0; i < instants.length; i++) {
final RyaType time = RdfToRyaConversions.convertLiteral(vf.createLiteral(instants[i].toString()));
final RyaStatement input = RyaStatement.builder()
.setSubject(new RyaIRI(GRAPH + ":s"))
.setPredicate(new RyaIRI(GRAPH + ":p"))
.setObject(time)
.build();
write(input);
statements[i] = RyaToRdfConversions.convertStatement(input);
}
final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
temporal.setConf(conf);
Connector connector = ConfigUtils.getConnector(conf);
MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
temporal.setConnector(connector);
temporal.setMultiTableBatchWriter(mtbw);
temporal.init();
final Set<Statement> empty = new HashSet<>();
final Set<Statement> head = new HashSet<>();
final Set<Statement> tail = new HashSet<>();
head.add(statements[0]);
tail.add(statements[2]);
tail.add(statements[3]);
Assert.assertEquals(empty, getSet(temporal.queryInstantBeforeInstant(instants[0], new StatementConstraints())));
Assert.assertEquals(empty, getSet(temporal.queryInstantAfterInstant(instants[3], new StatementConstraints())));
Assert.assertEquals(head, getSet(temporal.queryInstantBeforeInstant(instants[1], new StatementConstraints())));
Assert.assertEquals(tail, getSet(temporal.queryInstantAfterInstant(instants[1], new StatementConstraints())));
temporal.close();
}
示例14
@Override
public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
mtbw = writer;
}
示例15
@Override
public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
mtbw = writer;
}
示例16
@Override
public void setMultiTableBatchWriter(final MultiTableBatchWriter writer)
throws IOException {
// We do not need to use the writer that also writes to the core RYA
// tables.
}
示例17
public GlobalInstances(AccumuloGraphConfiguration config,
MultiTableBatchWriter mtbw, ElementCaches caches) {
this.config = config;
this.mtbw = mtbw;
this.caches = caches;
}
示例18
public MultiTableBatchWriter getMtbw() {
return mtbw;
}
示例19
/**
* Insert new field value with original event timestamp
*
* @param writer
* @param shardId
* @param datatype
* @param eventUid
* @param fieldName
* @param fieldValue
* @param isIndexed
* @param isReverseIndexed
* @param dataTypes
* @param user
* @param mode
* @throws Exception
*/
protected void insert(MultiTableBatchWriter writer, String shardId, String datatype, String eventUid, Map<String,String> markings, ColumnVisibility viz,
String fieldName, String fieldValue, boolean isIndexOnlyField, boolean isIndexed, boolean isReverseIndexed, Set<Type<?>> dataTypes,
String user, MODE mode, long ts, boolean insertHistory) throws Exception {
if (null == viz) {
if (null == markings || markings.isEmpty())
throw new IllegalArgumentException("No security information specified. Security markings must be supplied");
viz = markingFunctions.translateToColumnVisibility(markings);
}
insert(writer, shardId, datatype, eventUid, viz, fieldName, fieldValue, ts, isIndexOnlyField, isIndexed, isReverseIndexed, dataTypes, false,
insertHistory, user, mode);
}
示例20
public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException;