Java源码示例:org.kitesdk.morphline.base.Fields
示例1
@Override
protected boolean doProcess(Record inputRecord) {
Object datum = inputRecord.getFirstValue(Fields.ATTACHMENT_BODY);
Preconditions.checkNotNull(datum);
Record outputRecord = inputRecord.copy();
for (Map.Entry<String, Collection<String>> entry : stepMap.entrySet()) {
String fieldName = entry.getKey();
List<String> steps = (List<String>) entry.getValue();
try {
extractPath(datum, fieldName, steps, outputRecord, 0);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return false;
}
}
// pass record to next command in chain:
return getChild().process(outputRecord);
}
示例2
public void testGrokSyslogNgCiscoWithoutSubFacility() throws Exception {
morphline = createMorphline("test-morphlines/grokSyslogNgCisco");
Record record = new Record();
String msg = "<179>Jun 10 04:42:51 www.foo.com Jun 10 2013 04:42:51 : %myproduct-3-mysubfacility-251010: " +
"Health probe failed for server 1.2.3.4 on port 8083, connection refused by server";
record.put(Fields.MESSAGE, msg);
assertTrue(morphline.process(record));
Record expected = new Record();
expected.put(Fields.MESSAGE, msg);
expected.put("cisco_message_code", "%myproduct-3-251010");
expected.put("cisco_product", "myproduct");
expected.put("cisco_level", "3");
// expected.put("cisco_subfacility", "mysubfacility");
expected.put("cisco_message_id", "251010");
expected.put("syslog_message", "%myproduct-3-mysubfacility-251010: Health probe failed for server 1.2.3.4 " +
"on port 8083, connection refused by server");
assertEquals(expected, collector.getFirstRecord());
assertNotSame(record, collector.getFirstRecord());
}
示例3
@Test
public void testGrokEmail() throws Exception {
morphline = createMorphline("test-morphlines/grokEmail");
Record record = new Record();
byte[] bytes = Files.toByteArray(new File(RESOURCES_DIR + "/test-documents/email.txt"));
record.put(Fields.ATTACHMENT_BODY, bytes);
assertTrue(morphline.process(record));
Record expected = new Record();
String msg = new String(bytes, "UTF-8"); //.replaceAll("(\r)?\n", "\n");
expected.put(Fields.MESSAGE, msg);
expected.put("message_id", "[email protected]");
expected.put("date", "Wed, 6 Feb 2012 06:06:05 -0800");
expected.put("from", "[email protected]");
expected.put("to", "[email protected]");
expected.put("subject", "WEDNESDAY WEATHER HEADLINES");
expected.put("from_names", "Foo Bar <[email protected]>@xxx");
expected.put("to_names", "'Weather News Distribution' <[email protected]>");
expected.put("text",
"Average 1 to 3- degrees above normal: Mid-Atlantic, Southern Plains.." +
"\nAverage 4 to 6-degrees above normal: Ohio Valley, Rockies, Central Plains");
assertEquals(expected, collector.getFirstRecord());
assertNotSame(record, collector.getFirstRecord());
}
示例4
@Test
public void testExtractJsonPathsFlattened() throws Exception {
morphline = createMorphline("test-morphlines/extractJsonPathsFlattened");
File file = new File(RESOURCES_DIR + "/test-documents/arrays.json");
InputStream in = new BufferedInputStream(new FileInputStream(file));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
assertEquals(1, collector.getRecords().size());
List expected = Arrays.asList(1, 2, 3, 4, 5, 10, 20, 100, 200);
assertEquals(1, collector.getRecords().size());
assertEquals(expected, collector.getFirstRecord().get("/price"));
assertEquals(expected, collector.getFirstRecord().get("/price/[]"));
assertEquals(Arrays.asList(), collector.getFirstRecord().get("/unknownField"));
in.close();
}
示例5
@Test
@Ignore
/** Crude quick n' dirty benchmark */
// Before running this disable debug logging
// via log4j.logger.org.kitesdk.morphline=INFO in log4j.properties
public void benchmarkJson() throws Exception {
String morphlineConfigFile = "test-morphlines/readJson";
long durationSecs = 20;
//File file = new File(RESOURCES_DIR + "/test-documents/stream.json");
File file = new File(RESOURCES_DIR + "/test-documents/sample-statuses-20120906-141433.json");
System.out.println("Now benchmarking " + morphlineConfigFile + " ...");
morphline = createMorphline(morphlineConfigFile);
byte[] bytes = Files.toByteArray(file);
long start = System.currentTimeMillis();
long duration = durationSecs * 1000;
int iters = 0;
while (System.currentTimeMillis() < start + duration) {
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, bytes);
collector.reset();
assertTrue(morphline.process(record));
iters++;
}
float secs = (System.currentTimeMillis() - start) / 1000.0f;
System.out.println("Results: iters=" + iters + ", took[secs]=" + secs + ", iters/secs=" + (iters/secs));
}
示例6
@Override
protected boolean doProcess(Record record) {
if (!hasAtLeastOneAttachment(record)) {
return false;
}
// TODO: make field for stream configurable
String streamMediaType = (String) record.getFirstValue(Fields.ATTACHMENT_MIME_TYPE);
if (!isMimeTypeSupported(streamMediaType, record)) {
return false;
}
InputStream stream = getAttachmentInputStream(record);
try {
return doProcess(record, stream);
} catch (IOException e) {
throw new MorphlineRuntimeException(e);
} finally {
Closeables.closeQuietly(stream);
}
}
示例7
private boolean isMimeTypeSupported(String mediaTypeStr, Record record) {
if (supportedMimeTypes == null) {
return true;
}
if (!hasAtLeastOneMimeType(record)) {
return false;
}
MediaType mediaType = parseMimeType(mediaTypeStr);
if (supportedMimeTypes.contains(mediaType)) {
return true; // fast path
}
// wildcard matching
for (MediaType rangePattern : supportedMimeTypes) {
if (isMimeTypeMatch(mediaType, rangePattern)) {
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("No supported MIME type found for " + Fields.ATTACHMENT_MIME_TYPE + "=" + mediaTypeStr);
}
return false;
}
示例8
@Override
protected boolean doProcess(Record inputRecord) {
// Preconditions.checkState(ReadAvroBuilder.AVRO_MEMORY_MIME_TYPE.equals(inputRecord.getFirstValue(Fields.ATTACHMENT_MIME_TYPE)));
GenericContainer datum = (GenericContainer) inputRecord.getFirstValue(Fields.ATTACHMENT_BODY);
Preconditions.checkNotNull(datum);
Preconditions.checkNotNull(datum.getSchema());
Record outputRecord = inputRecord.copy();
for (Map.Entry<String, Collection<String>> entry : stepMap.entrySet()) {
String fieldName = entry.getKey();
List<String> steps = (List<String>) entry.getValue();
extractPath(datum, datum.getSchema(), fieldName, steps, outputRecord, 0);
}
// pass record to next command in chain:
return getChild().process(outputRecord);
}
示例9
@Test
@Ignore
public void benchmarkJson() throws Exception {
String morphlineConfigFile = "test-morphlines/readJson";
long durationSecs = 10;
//File file = new File(RESOURCES_DIR + "/test-documents/stream.json");
File file = new File(RESOURCES_DIR + "/test-documents/sample-statuses-20120906-141433.json");
System.out.println("Now benchmarking " + morphlineConfigFile + " ...");
morphline = createMorphline(morphlineConfigFile);
byte[] bytes = Files.toByteArray(file);
long start = System.currentTimeMillis();
long duration = durationSecs * 1000;
int iters = 0;
while (System.currentTimeMillis() < start + duration) {
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, bytes);
collector.reset();
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
iters++;
}
float secs = (System.currentTimeMillis() - start) / 1000.0f;
System.out.println("Results: iters=" + iters + ", took[secs]=" + secs + ", iters/secs=" + (iters/secs));
}
示例10
public GenerateUUID(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) {
super(builder, config, parent, child, context);
this.fieldName = getConfigs().getString(config, FIELD_NAME, Fields.ID);
this.preserveExisting = getConfigs().getBoolean(config, PRESERVE_EXISTING_NAME, true);
this.prefix = getConfigs().getString(config, PREFIX_NAME, "");
Type type = new Validator<Type>().validateEnum(
config,
getConfigs().getString(config, "type", Type.secure.toString()),
Type.class);
if (type == Type.secure) {
prng = null; // secure & slow
} else {
Random rand = new SecureRandom();
int[] seed = new int[624];
for (int i = 0; i < seed.length; i++) {
seed[i] = rand.nextInt();
}
prng = new Well19937c(seed); // non-secure & fast
}
validateArguments();
}
示例11
@Test
public void testExtractJsonPathsFlattened() throws Exception {
morphline = createMorphline("test-morphlines/extractJsonPathsFlattened");
File file = new File(RESOURCES_DIR + "/test-documents/arrays.json");
InputStream in = new FileInputStream(file);
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
assertEquals(1, collector.getRecords().size());
List expected = Arrays.asList(1, 2, 3, 4, 5, 10, 20, 100, 200);
assertEquals(1, collector.getRecords().size());
assertEquals(expected, collector.getFirstRecord().get("/price"));
assertEquals(expected, collector.getFirstRecord().get("/price/[]"));
assertEquals(Arrays.asList(), collector.getFirstRecord().get("/unknownField"));
in.close();
}
示例12
@Test
public void testReadBlobWithDestination() throws IOException {
morphline = createMorphline("test-morphlines/readBlobWithOutputField");
for (int i = 0; i < 3; i++) {
Record record = new Record();
byte[] bytes = "foo".getBytes("UTF-8");
record.put(Fields.ATTACHMENT_BODY, bytes);
collector.reset();
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
assertSame(record, collector.getFirstRecord());
assertSame(bytes, record.getFirstValue(Fields.ATTACHMENT_BODY));
assertNotSame(bytes, record.getFirstValue("myAwesomeDestination"));
assertArrayEquals(bytes, (byte[])record.getFirstValue("myAwesomeDestination"));
}
}
示例13
@Test
public void testReadClob() throws Exception {
morphline = createMorphline("test-morphlines/readClob");
for (int i = 0; i < 3; i++) {
Record record = new Record();
String msg = "foo";
record.put(Fields.ATTACHMENT_BODY, msg.getBytes("UTF-8"));
collector.reset();
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
Record expected = new Record();
expected.put(Fields.MESSAGE, msg);
assertEquals(expected, collector.getFirstRecord());
assertNotSame(record, collector.getFirstRecord());
}
}
示例14
@Test
public void testReadClobWithDestination() throws IOException {
morphline = createMorphline("test-morphlines/readClobDestField");
for (int i = 0; i < 3; i++) {
Record record = new Record();
String msg = "foo";
record.put(Fields.ATTACHMENT_BODY, msg.getBytes("UTF-8"));
collector.reset();
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
Record expected = new Record();
expected.put("myAwesomeDestination", msg);
assertEquals(expected, collector.getFirstRecord());
assertNotSame(record, collector.getFirstRecord());
}
}
示例15
@Test
public void testReadCSVAndIgnoreTooLongRecords() throws Exception {
morphline = createMorphline("test-morphlines/readCSVAndIgnoreTooLongRecords");
for (int i = 0; i < 3; i++) {
InputStream in = new FileInputStream(new File(RESOURCES_DIR + "/test-documents/cars2.csv"));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
processAndVerifySuccess(record,
ImmutableMultimap.of("Age", "Age", "Extras", "Extras", "Type", "Type", "column4", "Used"),
ImmutableMultimap.of("Age", "2", "Extras", "GPS", "Type", "Gas, with electric", "column4", ""),
ImmutableMultimap.of("Age", "100", "Extras", "Labeled \"Vintage 1913\"", "Type", "yes"),
ImmutableMultimap.of("Age", "6", "Extras", "many", "Type", "multi line2", "column4", "maybe")
);
in.close();
}
}
示例16
@Test
public void testExtractJsonPaths() throws Exception {
morphline = createMorphline("test-morphlines/extractJsonPaths");
File file = new File(RESOURCES_DIR + "/test-documents/arrays.json");
InputStream in = new FileInputStream(file);
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
assertEquals(1, collector.getRecords().size());
JsonNode rootNode = (JsonNode) new ObjectMapper().reader(JsonNode.class).readValues(file).next();
assertTrue(rootNode.get("price").isArray());
List<JsonNode> expected = Arrays.asList(rootNode.get("price"));
assertEquals(1, collector.getRecords().size());
assertEquals(expected, collector.getFirstRecord().get("/price"));
assertEquals(expected, collector.getFirstRecord().get("/price/[]"));
assertEquals(Arrays.asList(), collector.getFirstRecord().get("/unknownField"));
in.close();
}
示例17
public void testGrokSyslogNgCiscoWithoutSubFacility() throws Exception {
morphline = createMorphline("test-morphlines/grokSyslogNgCisco");
Record record = new Record();
String msg = "<179>Jun 10 04:42:51 www.foo.com Jun 10 2013 04:42:51 : %myproduct-3-mysubfacility-251010: " +
"Health probe failed for server 1.2.3.4 on port 8083, connection refused by server";
record.put(Fields.MESSAGE, msg);
assertTrue(morphline.process(record));
Record expected = new Record();
expected.put(Fields.MESSAGE, msg);
expected.put("cisco_message_code", "%myproduct-3-251010");
expected.put("cisco_product", "myproduct");
expected.put("cisco_level", "3");
// expected.put("cisco_subfacility", "mysubfacility");
expected.put("cisco_message_id", "251010");
expected.put("syslog_message", "%myproduct-3-mysubfacility-251010: Health probe failed for server 1.2.3.4 " +
"on port 8083, connection refused by server");
assertEquals(expected, collector.getFirstRecord());
assertNotSame(record, collector.getFirstRecord());
}
示例18
@Override
protected boolean doProcess(Record inputRecord, InputStream in) throws IOException {
Object protoObjectInstance;
try {
protoObjectInstance = parseMethod.invoke(null, in);
} catch (Exception e) {
throw new IOException("Instance creation of '" + outputClass.getName() + "' from inputStream failed. "
+ e.getMessage(), e);
}
incrementNumRecords();
LOG.trace("protoObject: {}", protoObjectInstance);
Record outputRecord = inputRecord.copy();
removeAttachments(outputRecord);
outputRecord.put(Fields.ATTACHMENT_BODY, protoObjectInstance);
outputRecord.put(Fields.ATTACHMENT_MIME_TYPE, MIME_TYPE);
// pass record to next command in chain:
if (!getChild().process(outputRecord)) {
return false;
}
return true;
}
示例19
@Test
public void testSplitWithEdgeCases() throws Exception {
morphline = createMorphline("test-morphlines/splitWithEdgeCases");
Record record = new Record();
String msg = ",, _a ,_b_ ,,";
record.put(Fields.MESSAGE, msg);
Record expected = new Record();
expected.put(Fields.MESSAGE, msg);
expected.put("output", "");
expected.put("output", "");
expected.put("output", "_a");
expected.put("output", "_b_");
expected.put("output", "");
expected.put("output", "");
processAndVerifySuccess(record, expected);
}
示例20
@Test
public void testSplitKeyValueWithIPTables() throws Exception {
morphline = createMorphline("test-morphlines/splitKeyValueWithIPTables");
Record record = new Record();
String msg = "Feb 6 12:04:42 IN=eth1 OUT=eth0 SRC=1.2.3.4 DST=6.7.8.9 ACK DF WINDOW=0";
record.put(Fields.ATTACHMENT_BODY, msg.getBytes("UTF-8"));
Record expected = new Record();
expected.put(Fields.MESSAGE, msg);
expected.put("timestamp", "Feb 6 12:04:42");
expected.put("IN", "eth1");
expected.put("OUT", "eth0");
expected.put("SRC", "1.2.3.4");
expected.put("DST", "6.7.8.9");
expected.put("WINDOW", "0");
processAndVerifySuccess(record, expected, false);
}
示例21
@Override
protected boolean doProcess(Record inputRecord, InputStream in) throws IOException {
Record template = inputRecord.copy();
removeAttachments(template);
template.put(Fields.ATTACHMENT_MIME_TYPE, ReadAvroBuilder.AVRO_MEMORY_MIME_TYPE);
Decoder decoder = prepare(in);
try {
while (true) {
GenericContainer datum = datumReader.read(null, decoder);
if (!extract(datum, template)) {
return false;
}
}
} catch (EOFException e) {
; // ignore
} finally {
in.close();
}
return true;
}
示例22
/**
* Test that Solr queries on a parsed SequenceFile document
* return the expected content and fields. Don't pass
* in our own parser via the context.
*/
@Test
public void testSequenceFileContentSimple() throws Exception {
morphline = createMorphline("test-morphlines/sequenceFileMorphlineSimple");
String path = RESOURCES_DIR;
File sequenceFile = new File(path, "testSequenceFileContentSimple.seq");
int numRecords = 5;
HashMap<String, Record> expected = createTextSequenceFile(sequenceFile, numRecords);
InputStream in = new FileInputStream(sequenceFile.getAbsolutePath());
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
assertTrue(areFieldsEqual(expected, collector.getRecords()));
}
示例23
/**
* Test that Solr queries on a parsed SequenceFile document
* return the expected content and fields.
*/
@Test
public void testSequenceFileContentCustomParsers() throws Exception {
morphline = createMorphline("test-morphlines/sequenceFileMorphlineSimple");
String path = RESOURCES_DIR;
File sequenceFile = new File(path, "testSequenceFileContentCustomParsers.seq");
int numRecords = 10;
HashMap<String, Record> expected = createTextSequenceFile(sequenceFile, numRecords);
InputStream in = new FileInputStream(sequenceFile.getAbsolutePath());
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
assertTrue(areFieldsEqual(expected, collector.getRecords()));
}
示例24
@Test
public void testTokenizeText() throws Exception {
morphline = createMorphline("test-morphlines" + File.separator + "tokenizeText");
for (int i = 0; i < 3; i++) {
Record record = new Record();
record.put(Fields.MESSAGE, "Hello World!");
record.put(Fields.MESSAGE, "\[email protected] #%()123");
Record expected = record.copy();
expected.getFields().putAll("tokens", Arrays.asList("hello", "world", "foo", "bar.com", "123"));
collector.reset();
startSession();
Notifications.notifyBeginTransaction(morphline);
assertTrue(morphline.process(record));
assertEquals(1, collector.getNumStartEvents());
Notifications.notifyCommitTransaction(morphline);
assertEquals(expected, collector.getFirstRecord());
}
}
示例25
@Test
public void testReadJsonWithMap() throws Exception {
morphline = createMorphline("test-morphlines/readJsonWithMap");
for (int j = 0; j < 3; j++) { // also test reuse of objects and low level avro buffers
InputStream in = new FileInputStream(new File(RESOURCES_DIR + "/test-documents/stream.json"));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
collector.reset();
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(record));
Iterator<Record> iter = collector.getRecords().iterator();
assertTrue(iter.hasNext());
Map node = (Map) iter.next().getFirstValue(Fields.ATTACHMENT_BODY);
assertEquals(ImmutableMap.of("firstObject", "foo"), node);
assertTrue(iter.hasNext());
node = (Map) iter.next().getFirstValue(Fields.ATTACHMENT_BODY);
assertEquals(ImmutableMap.of("secondObject", "bar"), node);
assertFalse(iter.hasNext());
in.close();
}
}
示例26
@Test
public void testRCFileRowWise() throws Exception {
morphline = createMorphline("test-morphlines/rcFileMorphlineRow");
String rcFileName = "testRCFileRowWise.rc";
List<Record> expected = setupRCFile(rcFileName, NUM_RECORDS, NUM_COLUMNS,
true);
Path inputFile = dfs.makeQualified(new Path(testDirectory, rcFileName));
Record input = new Record();
input.put(Fields.ATTACHMENT_NAME, inputFile.toString());
input.put(Fields.ATTACHMENT_BODY, readPath(inputFile));
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(input));
assertTrue(
areFieldsEqual(expected, collector.getRecords(), NUM_COLUMNS, true));
}
示例27
protected Record getRecord(PathParts parts) {
FileStatus stats;
try {
stats = parts.getFileStatus();
} catch (IOException e) {
stats = null;
}
if (stats == null) {
LOG.warn("Ignoring file that somehow has become unavailable since the job was submitted: {}",
parts.getUploadURL());
return null;
}
Record headers = new Record();
//headers.put(getSchema().getUniqueKeyField().getName(), parts.getId()); // use HDFS file path as docId if no docId is specified
headers.put(Fields.BASE_ID, parts.getId()); // with sanitizeUniqueKey command, use HDFS file path as docId if no docId is specified
headers.put(Fields.ATTACHMENT_NAME, parts.getName()); // Tika can use the file name in guessing the right MIME type
// enable indexing and storing of file meta data in Solr
headers.put(HdfsFileFieldNames.FILE_UPLOAD_URL, parts.getUploadURL());
headers.put(HdfsFileFieldNames.FILE_DOWNLOAD_URL, parts.getDownloadURL());
headers.put(HdfsFileFieldNames.FILE_SCHEME, parts.getScheme());
headers.put(HdfsFileFieldNames.FILE_HOST, parts.getHost());
headers.put(HdfsFileFieldNames.FILE_PORT, String.valueOf(parts.getPort()));
headers.put(HdfsFileFieldNames.FILE_PATH, parts.getURIPath());
headers.put(HdfsFileFieldNames.FILE_NAME, parts.getName());
headers.put(HdfsFileFieldNames.FILE_LAST_MODIFIED, String.valueOf(stats.getModificationTime())); // FIXME also add in SpoolDirectorySource
headers.put(HdfsFileFieldNames.FILE_LENGTH, String.valueOf(stats.getLen())); // FIXME also add in SpoolDirectorySource
headers.put(HdfsFileFieldNames.FILE_OWNER, stats.getOwner());
headers.put(HdfsFileFieldNames.FILE_GROUP, stats.getGroup());
headers.put(HdfsFileFieldNames.FILE_PERMISSIONS_USER, stats.getPermission().getUserAction().SYMBOL);
headers.put(HdfsFileFieldNames.FILE_PERMISSIONS_GROUP, stats.getPermission().getGroupAction().SYMBOL);
headers.put(HdfsFileFieldNames.FILE_PERMISSIONS_OTHER, stats.getPermission().getOtherAction().SYMBOL);
headers.put(HdfsFileFieldNames.FILE_PERMISSIONS_STICKYBIT, String.valueOf(stats.getPermission().getStickyBit()));
// TODO: consider to add stats.getAccessTime(), stats.getReplication(), stats.isSymlink(), stats.getBlockSize()
return headers;
}
示例28
private void processAndVerifySuccess(Record input, Record expected, boolean isSame) {
collector.reset();
startSession();
assertEquals(1, collector.getNumStartEvents());
assertTrue(morphline.process(input));
collector.getFirstRecord().removeAll(Fields.ATTACHMENT_BODY);
assertEquals(expected, collector.getFirstRecord());
if (isSame) {
assertSame(input, collector.getFirstRecord());
} else {
assertNotSame(input, collector.getFirstRecord());
}
}
示例29
@Test
public void testXQueryShakespeareSpeakers() throws Exception {
morphline = createMorphline("test-morphlines/xquery-shakespeare-speakers");
InputStream in = new FileInputStream(new File(RESOURCES_DIR + "/test-documents/othello.xml"));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
processAndVerifySuccess(record,
ImmutableMultimap.of("name", "OTHELLO", "frequency", "274"),
ImmutableMultimap.of("name", "IAGO", "frequency", "272"),
ImmutableMultimap.of("name", "DESDEMONA", "frequency", "165")
);
in.close();
}
示例30
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
record.put(Fields.ATTACHMENT_BODY, new ByteArrayInputStream(value.toString().getBytes()));
if (!morphline.process(record)) {
LOGGER.info("Morphline failed to process record: {}", record);
}
record.removeAll(Fields.ATTACHMENT_BODY);
}