Tutorial01 - Building an Input Adaptor for Custom File Types
Introduction
You can add support for custom file types by creating a class that extends datameer.dap.sdk.importjob.ImportFileType
. The plug-in shown in this tutorial automatically scans for all classes and your custom file types are automatically registered to Datameer.
File Format
Let's assume you have a file that looks like this:
Transaction:1 Customer:SomeCustomer Product-Id:product1 Transaction:2 Customer:OtherCustomer Product-Id:product2
Records span multiple lines in this example.
Extending ImportFileType
You can have a look at some of the subclasses of ImportFileType
that come with the Datameer X SDK. In this case, it makes sense to implement TextBasedFileType
, because you are writing an import format that operates on text input.
Here is the implementation:
package datameer.das.plugin.tutorial01; import org.apache.hadoop.conf.Configuration; import datameer.dap.sdk.common.Field; import datameer.dap.sdk.importjob.RecordParser; import datameer.dap.sdk.importjob.RecordSchemaDetector; import datameer.dap.sdk.importjob.TextBasedFileType; public class CustomTransactionFileType extends TextBasedFileType { private static final String ID = CustomTransactionFileType.class.getName(); @Override public RecordSchemaDetector<String> createRecordSchemaDetector(FileTypeModel<String> model) { return new CustomTransactionRecordSchemaDetector(); } @Override public RecordParser<String> createRecordParser(Field[] fields, Configuration conf, FileTypeModel<String> model) { return new CustomTransactionRecordParser(fields); } @Override public String getName() { return "Custom Transaction File Type (Tutorial01)"; } @Override public String getId() { return ID; } @Override public boolean canAutoMergeNewFields() { return true; } }
The main things you have to provide in this implementation are:
- A record parser that does the actual parsing of the file type
- A name for this file type, which is used for displaying it in the Datameer X web UI
- A unique ID, which is used to identify this file type
Providing a Record Schema Detector
A schema detector can detect the fields that can be extracted from the file. This is fairly simple in your case, since you have a fixed schema. In other cases (such as .csv) one would have to look at the file to see how many columns it contains and of what type the columns might be. Have a look at available subclasses and interfaces of datameer.dap.sdk.importjob.RecordSchemaDetector<RS>
that might help you when writing your own schema detector.
package datameer.das.plugin.tutorial01; import java.util.ArrayList; import java.util.List; import datameer.dap.sdk.common.Field; import datameer.dap.sdk.importjob.NoDataRecordSchemaDetector; import datameer.dap.sdk.schema.ValueType; public class CustomTransactionRecordSchemaDetector extends NoDataRecordSchemaDetector<String> { static final String TRANSACTION_PREFIX = "Transaction"; static final String CUSTOMER = "Customer"; static final String PRODUCT_ID = "ProductId"; /** * Makes a good guess for the field definition. This includes: * <ul> * <li>The number of fields</li> * <li>Suggestion for header names</li> * <li>Type for each field.</li> * </ul> * * @return a good first guess of how fields for this data source might look like. */ @Override public Field[] detectFields() { List<Field> fields = new ArrayList<Field>(); fields.add(new Field(TRANSACTION_PREFIX, "0", ValueType.INTEGER)); fields.add(new Field(CUSTOMER, "1", ValueType.STRING)); fields.add(new Field(PRODUCT_ID, "2", ValueType.STRING)); return fields.toArray(new Field[fields.size()]); } }
The true logic in mapping your custom records to fields lies in the implementation of the record parser. Again, it makes sense to look at the existing subclasses that come with the SDK. In this example, you extend MultiLineFixedStructureTextRecordParser, because you have records of a fixed structure that span multiple lines.
Providing a Record Parser
package datameer.das.plugin.tutorial01; import datameer.dap.sdk.common.Field; import datameer.dap.sdk.common.RawRecordCollector; import datameer.dap.sdk.importjob.FixedStructureTextParser; import datameer.dap.sdk.importjob.MultiLineFixedStructureTextRecordParser; import datameer.dap.sdk.importjob.TextFieldAnalyzer; import datameer.dap.sdk.util.ManifestMetaData; public class CustomTransactionRecordParser extends MultiLineFixedStructureTextRecordParser { public static class CustomTransactionTextParser implements FixedStructureTextParser { private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID; @Override public String[] parseStringFields(String line) throws Exception { String[] linesOfRecord = line.split("\n[\r]?"); for (int i = 0; i < linesOfRecord.length; i++) { linesOfRecord[i] = linesOfRecord[i].split(":", 2)[1]; } return linesOfRecord; } @Override public void configureSchemaDetection(TextFieldAnalyzer fieldAnalyzer) { // nothing todo } @Override public void configureRecordCollector(RawRecordCollector recordCollector) { // nothing todo } } public CustomTransactionRecordParser(Field[] allFields) { super(allFields, new CustomTransactionTextParser()); } /** * This method helps to ensure the validness of splits over record sources. If the containing * record source element is a file and the record source is a line of that file, then this * method should return false only if the given line can't be parsed without previous lines. * This will not be called for the first split of a file. * * Only implement if {@link #recordCanSpanMultipleRecordSources()} is true! * * @return true if the give record source is valid as a first read record source */ @Override public boolean isValidFirstReadRecordSource(String line) { return line.startsWith(CustomTransactionRecordSchemaDetector.TRANSACTION_PREFIX); } @Override protected boolean isLastRow(String line) { return line.startsWith(CustomTransactionRecordSchemaDetector.PRODUCT_ID); } @Override protected boolean isRecordSeparatorLine(String line) { return line.trim().isEmpty(); } }
Other File Types
You can also parse binary files or use other parsers to convert a file into Datameer X records. To do this, you have to do the following:
Extend
ImportFileType<Path>
instead ofImportFileType<Record>
; thecreateImportFormat()
function should look like this:@Override public FileBasedInputFormat<Path> createImportFormat(FileTypeModel<Path> model) { return new CompleteFileImportFormat(model.getFileImportJobModel()); }
- Extend
AbstractMapBasedRecordParser<Path>
to write the parser.
Single Output Record Optimization
If you know your RecordParser
implementation only outputs 1 RawRecord
per logical input record, then your RecordParser
implementation should be annotated with the @SingleRecordPerSource
annotation.
package datameer.das.plugin.tutorial01; import datameer.dap.sdk.common.Field; import datameer.dap.sdk.common.RawRecordCollector; import datameer.dap.sdk.importjob.FixedStructureTextParser; import datameer.dap.sdk.importjob.MultiLineFixedStructureTextRecordParser; import datameer.dap.sdk.importjob.SingleRecordPerSource; import datameer.dap.sdk.importjob.TextFieldAnalyzer; import datameer.dap.sdk.util.ManifestMetaData; @SingleRecordPerSource public class CustomTransactionRecordParser extends MultiLineFixedStructureTextRecordParser { // ... }
The annotation can be added to any RecordParser
implementation. It tells the Datameer X SDK that the RecordParser
only outputs 1
RawRecord
per "logical" input record. This lets Datameer optimize the import logic. If the RecordParser
does output more than one record, via the
RawRecordCollector
then the last one wins (the previous records collected in via the calls to RecordParser#parse(ParseConfiguration, RawRecordCollector, Object)
are ignored). This annotation works with
MultipleSourceRecordParser
implementations as well (as shown above), since in that case multiple incoming record sources are being converted to a single "logical" input record.
Source Code
This tutorial can by found in the Datameer X plug-in SDK under plugin-tutorials/tutorial01
.
Sequence File With Custom Key Value Types
In this example, you want to add a custom adapter to import a custom sequence file made up of custom key and value Writable types. The right extension point for this in the SDK would be FileBasedImportFormat
. Here is an example for a import format that would import sequence files created by Datameer X
package datameer.dap.common.importjob.filetype; import datameer.dap.sdk.importjob.RecordSourceReader; /** * Hold key value pairs from a sequence file which can then be translated to fields by a * {@link RecordSourceReader} implementation. */ public class KeyValuePair { private Object _key; private Object _value; public KeyValuePair(Object key, Object value) { _key = key; _value = value; } public Object getKey() { return _key; } public Object getValue() { return _value; } }
package datameer.dap.common.importjob.filetype; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import datameer.dap.common.job.mr.input.DapFileInputFormat; import datameer.dap.sdk.common.Record; import datameer.dap.sdk.importjob.FileBasedImportFormat; import datameer.dap.sdk.importjob.FileImportJobModel; import datameer.dap.sdk.importjob.FileSplitter.SplitInstruction; import datameer.dap.sdk.importjob.FileSplitter.SplitInstruction.CanSplit; import datameer.dap.sdk.importjob.RecordReaderBackedSourceReader; import datameer.dap.sdk.importjob.RecordSourceReader; import datameer.dap.sdk.util.ManifestMetaData; @SuppressWarnings("deprecation") public class DasSequenceFileImportFormat extends FileBasedImportFormat<KeyValuePair> { private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID; public DasSequenceFileImportFormat(FileImportJobModel<KeyValuePair> importJobModel, boolean reparsableRecordSource) { super(importJobModel, reparsableRecordSource); } @Override public RecordSourceReader<KeyValuePair> createRecordSourceReader(InputSplit inputSplit) throws IOException { RecordReader<Object, Record> reader = DapFileInputFormat.createRecordReader(getConf(), (FileSplit) inputSplit); return new RecordReaderBackedSourceReader<Object, Record, KeyValuePair>(reader) { @Override protected KeyValuePair convertToRecordSource(Object key, Record value) { return new KeyValuePair(key, value); } }; } @Override public SplitInstruction getSplitInstruction(Path[] inputPaths, JobConf conf) { return new SplitInstruction(CanSplit.ALL); } @Override protected void onConfigure(JobConf conf) { } }
DasSequenceFileImportFormat.createRecordSourceReader()
has to use a custom implementation to read records from the sequence files and convert them into a KeyValuePair
object.
package datameer.dap.common.importjob.filetype; import datameer.dap.sdk.importjob.FileBasedImportFormat; import datameer.dap.sdk.importjob.FileImportJobModel; import datameer.dap.sdk.importjob.ImportFileType.FileTypeModel; public class DasSequenceImportFileModel extends FileTypeModel<KeyValuePair> { public DasSequenceImportFileModel(FileImportJobModel<KeyValuePair> fileImportJobModel) { super(fileImportJobModel); } @Override public FileBasedImportFormat<KeyValuePair> createImportFormat() { return new DasSequenceFileImportFormat(getFileImportJobModel(), false); } }
package datameer.dap.common.importjob.filetype; import org.apache.hadoop.conf.Configuration; import datameer.dap.sdk.common.Field; import datameer.dap.sdk.importjob.FileImportJobModel; import datameer.dap.sdk.importjob.ImportFileType; import datameer.dap.sdk.importjob.MapBasedRecordParser; import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector; import datameer.dap.sdk.importjob.RecordParser; import datameer.dap.sdk.importjob.RecordSchemaDetector; import datameer.dap.sdk.plugin.annotation.IgnoreExtension; @IgnoreExtension public class DasSequenceImportFileType extends ImportFileType<KeyValuePair> { @Override public String getId() { return "DasSequenceImportFileType"; } @Override public String getName() { return "Datameer X sequence file type"; } @Override public RecordSchemaDetector<KeyValuePair> createRecordSchemaDetector(FileTypeModel<KeyValuePair> model) { return new MapBasedRecordSchemaDetector<KeyValuePair>(new DasSequenceFileParser(), 1000); } @Override public RecordParser<KeyValuePair> createRecordParser(Field[] fields, Configuration conf, FileTypeModel<KeyValuePair> model) { return new MapBasedRecordParser<KeyValuePair>(fields, new DasSequenceFileParser()); } @Override public DasSequenceImportFileModel createModel(FileImportJobModel<KeyValuePair> model) { return new DasSequenceImportFileModel(model); } @Override public boolean canAutoMergeNewFields() { return true; } }
DasSequenceFileParser.parseRecordSource()
has to be changed to extract columns out of the key and value that was read from the sequence file.
package datameer.dap.common.importjob.filetype; import java.util.Map; import com.google.common.collect.Maps; import datameer.dap.sdk.common.Record; import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector; import datameer.dap.sdk.importjob.MapParser; import datameer.dap.sdk.importjob.TextFieldAnalyzer; public class DasSequenceFileParser implements MapParser<KeyValuePair> { @Override public void configureSchemaDetection(MapBasedRecordSchemaDetector<KeyValuePair> schemaDetector, TextFieldAnalyzer fieldAnalyzer) { } @Override public Map<String, Object> parseRecordSource(KeyValuePair recordSource) throws Exception { Map<String, Object> map = Maps.newLinkedHashMap(); Record record = (Record) recordSource.getValue(); for (int i = 0; i < record.getFieldCount(); i++) { map.put("column" + i, record.getValue(i)); } return map; } }