Tutorial05 - Building an Import Adaptor for Custom Connections
Introduction
You can add support for custom connections, by creating a class that extends datameer.dap.sdk.datastore.DataStoreType
. Your plug-in automatically scans for all classes that extend this class and custom connections are automatically registered to Datameer.
Example Connection
As a simple example, you write a connection that generates random data instead of reading an external data source. Do this to keep the example code as simple as possible. It should be straight forward to modify the example to read from an external source instead of generating random records.
Explaining Connections and Import Jobs
There are two levels to configure your custom connection. One is the connection, which is used to configure global connection properties that can be reused for multiple import jobs. Additional properties that configure which exact data is imported happen on the import job level.
For example, for databases, connection string, username, and password are configured on the connection level, while table names are configured on the import job level.
Extending Connection Types
Here is the implementation for our ExampleDataStoreModel. It is very simple, because you don't have any properties that have to be configured on this level:
package datameer.das.plugin.tutorial05; import org.apache.hadoop.conf.Configuration; import datameer.dap.sdk.common.DasContext; import datameer.dap.sdk.datastore.DataStoreModel; import datameer.dap.sdk.datastore.DataStoreType; import datameer.dap.sdk.entity.DataStore; import datameer.dap.sdk.property.WizardPageDefinition; public class ExampleDataStoreType extends DataStoreType { public final static String ID = ExampleDataStoreType.class.getName(); @SuppressWarnings("serial") public static class ExampleDataStoreModel extends DataStoreModel { public ExampleDataStoreModel(DataStore dataStore) { super(dataStore); } @Override public boolean isLocal() { return false; } @Override public void setupConf(Configuration conf) { } @Override public void testConnect(DasContext dasContext) throws InterruptedException { } } public ExampleDataStoreType() { super(new ExampleDataImportJobType()); } @Override public DataStoreModel createModel(DataStore dataStore) { return new ExampleDataStoreModel(dataStore); } @Override public String getId() { return ID; } @Override public String getName() { return "Example Data Creator"; } @Override protected void populateWizardPage(WizardPageDefinition page) { } }
Extending ImportJobType
Here is the implementation for our ExampleDataImportJobType. It is very simple, because the main custom logic is in the ImportJobModel that is created by this type.
package datameer.das.plugin.tutorial05; import datameer.dap.sdk.entity.DataSourceConfiguration; import datameer.dap.sdk.importjob.ImportJobType; public class ExampleDataImportJobType extends ImportJobType<String> { @Override public ExampleDataImportJobModel createModel(DataSourceConfiguration configuration) { return new ExampleDataImportJobModel(configuration); } }
All the logic to read and parse the custom data and create Datameer X records goes to the ExampleDataImportJobModel. It looks like this:
package datameer.das.plugin.tutorial05; import java.util.List; import datameer.dap.sdk.entity.DataSourceConfiguration; import datameer.dap.sdk.importjob.ImportFormat; import datameer.dap.sdk.importjob.ImportJobModel; import datameer.dap.sdk.property.NumberValidator; import datameer.dap.sdk.property.PropertyDefinition; import datameer.dap.sdk.property.PropertyGroupDefinition; import datameer.dap.sdk.property.PropertyType; import datameer.dap.sdk.property.WizardPageDefinition; import datameer.dap.sdk.util.ManifestMetaData; /** * Creates n files with m random records each. Number of files (n) and number of records (m) can be * configured using the import job wizard. */ public class ExampleDataImportJobModel extends ImportJobModel<String> { private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID; private static final String NUMBER_OF_FILES = "files-number"; private static final String RECORDS_PER_FILE = "file-records"; private int _numberOfFiles; private int _recordsPerFile; public ExampleDataImportJobModel(DataSourceConfiguration conf) { super(conf); _numberOfFiles = conf.getIntProperty(NUMBER_OF_FILES, 1); _recordsPerFile = conf.getIntProperty(RECORDS_PER_FILE, 1); } public int getNumberOfFiles() { return _numberOfFiles; } public int getRecordsPerFile() { return _recordsPerFile; } @Override public ImportFormat<String> createImportFormat() { return new ExampleDataImportFormat(this); } @Override public void addPropertyValuesThatTriggerAFilterReset(List<Object> propertyValues) { } @Override public void addPropertyValuesThatTriggerAFieldReset(List<Object> propertyValues) { } @Override public WizardPageDefinition createDetailsWizardPage() { WizardPageDefinition page = new WizardPageDefinition("Details"); PropertyGroupDefinition group = page.addGroup("Example Data"); PropertyDefinition propertyDefinition = new PropertyDefinition(NUMBER_OF_FILES, "Number of files", PropertyType.STRING); propertyDefinition.setRequired(true); propertyDefinition.setValidators(new NumberValidator(1)); group.addPropertyDefinition(propertyDefinition); propertyDefinition = new PropertyDefinition(RECORDS_PER_FILE, "Records per file", PropertyType.STRING); propertyDefinition.setRequired(true); propertyDefinition.setValidators(new NumberValidator(1)); group.addPropertyDefinition(propertyDefinition); return page; } @Override public WizardPageDefinition createImportJobFilterPage() { throw new UnsupportedOperationException(); } @Override public boolean isSupportImportJobFilterPage() { return false; } @Override public boolean canAutoMergeNewFields() { return true; } @Override public void resetFilters() { } }
package datameer.das.plugin.tutorial05; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import datameer.dap.sdk.common.Field; import datameer.dap.sdk.function.FieldType; import datameer.dap.sdk.importjob.AbstractImportFormat; import datameer.dap.sdk.importjob.DasDelegateSplit; import datameer.dap.sdk.importjob.MapBasedRecordParser; import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector; import datameer.dap.sdk.importjob.MapParser; import datameer.dap.sdk.importjob.NoDataRecordSchemaDetector; import datameer.dap.sdk.importjob.RecordParser; import datameer.dap.sdk.importjob.RecordSchemaDetector; import datameer.dap.sdk.importjob.RecordSourceReader; import datameer.dap.sdk.importjob.Splitter; import datameer.dap.sdk.importjob.TextFieldAnalyzer; import datameer.dap.sdk.util.ManifestMetaData; /** * Describes the file format and generates the random records. This would be the class that is used * to connect to external data and convert it to Datameer X records. */ @SuppressWarnings("deprecation") public class ExampleDataImportFormat extends AbstractImportFormat<String> { private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID; /** * Split that just keeps the number of records that should be written. */ public static class ExampleDataSplit implements InputSplit { private int _records; @SuppressWarnings("unused") private ExampleDataSplit() { } public ExampleDataSplit(int records) { _records = records; } @Override public void write(DataOutput out) throws IOException { out.writeInt(_records); } @Override public void readFields(DataInput in) throws IOException { _records = in.readInt(); } @Override public String[] getLocations() throws IOException { return new String[0]; } @Override public long getLength() throws IOException { return _records; } } // Unique keys to identify the column origins private static final String ORIGIN_NAME = "name"; private static final String ORIGIN_AMOUNT = "amount"; private static final String ORIGIN_DOUBLE_VALUE = "doubleValue"; public ExampleDataImportFormat(ExampleDataImportJobModel model) { super(model, false); } @Override public ExampleDataImportJobModel getImportJobModel() { return (ExampleDataImportJobModel) super.getImportJobModel(); } /** * Describes the schema of the data. */ @Override public RecordSchemaDetector<String> createRecordSchemaDetector() throws IOException { return new NoDataRecordSchemaDetector<String>() { @Override public Field[] detectFields() { return new Field[] { new Field("name", ORIGIN_NAME, FieldType.STRING, true), new Field("amount", ORIGIN_AMOUNT, FieldType.INTEGER, true), new Field("double", ORIGIN_DOUBLE_VALUE, FieldType.FLOAT, true) }; } }; } /** * Converts records read from an external data source into Datameer X records. For simplicity we are * just producing random records. */ @Override public RecordParser<String> createRecordParser(Field[] fields) throws IOException { return new MapBasedRecordParser<String>(fields, new MapParser<String>() { private Random _random = new Random(); @Override public void configureSchemaDetection(MapBasedRecordSchemaDetector<String> schemaDetector, TextFieldAnalyzer fieldAnalyzer) { } @Override public Map<String, Object> parseRecordSource(String recordSource) throws Exception { Map<String, Object> map = new HashMap<String, Object>(); map.put(ORIGIN_NAME, ORIGIN_NAME + _random.nextInt(1000)); map.put(ORIGIN_AMOUNT, _random.nextInt(10000)); map.put(ORIGIN_DOUBLE_VALUE, _random.nextDouble()); return map; } }); } /** * Normally reads from an external data source and returns any Java object that will later be * converted to Datameer X records using the {@link RecordParser} created by * {@link #createRecordParser(Field[])}. In our case we just create fake String records. */ @Override public RecordSourceReader<String> createRecordSourceReader(final InputSplit inputSplit) throws IOException { return new RecordSourceReader<String>() { private int _counter = 0; @Override public String readNext() throws IOException { return _counter++ < inputSplit.getLength() ? "record" : null; } @Override public long getPos() throws IOException { return _counter; } @Override public void close() throws IOException { } }; } /** * Splitter that creates as many splits as configured files. */ @Override public Splitter<DasDelegateSplit> getSplitter() throws IOException { return new Splitter<DasDelegateSplit>() { @Override public DasDelegateSplit[] createSplits(SplitHint splitHint) throws IOException { int numberOfFiles = getImportJobModel().getNumberOfFiles(); int recordsPerFile = getImportJobModel().getRecordsPerFile(); DasDelegateSplit[] splits = new DasDelegateSplit[numberOfFiles]; for (int i = 0; i < numberOfFiles; i++) { splits[i] = toDasDelegateSplit(new ExampleDataSplit(recordsPerFile)); } return splits; } @Override public DasDelegateSplit[] createPreviewSplits(Configuration conf, int desiredNumberOfRecords) throws IOException { return new DasDelegateSplit[] { toDasDelegateSplit(new ExampleDataSplit(10)) }; } private DasDelegateSplit toDasDelegateSplit(final ExampleDataSplit split) { return new DasDelegateSplit(split, "plugin-tutorial"); } }; } @Override protected void onConfigure(JobConf conf) { } }
For more information, refer to the Java documentation.
Source Code
This tutorial can by found in the Datameer X plug-in SDK under plugin-tutorials/tutorial05
.