Tutorial05 - Building an Import Adaptor for Custom Connections
Introduction
You can add support for custom connections, by creating a class that extends datameer.dap.sdk.datastore.DataStoreType. Your plug-in automatically scans for all classes that extend this class and custom connections are automatically registered to Datameer.
Example Connection
As a simple example, you write a connection that generates random data instead of reading an external data source. Do this to keep the example code as simple as possible. It should be straight forward to modify the example to read from an external source instead of generating random records.
Explaining Connections and Import Jobs
There are two levels to configure your custom connection. One is the connection, which is used to configure global connection properties that can be reused for multiple import jobs. Additional properties that configure which exact data is imported happen on the import job level.
For example, for databases, connection string, username, and password are configured on the connection level, while table names are configured on the import job level.
Extending Connection Types
Here is the implementation for our ExampleDataStoreModel. It is very simple, because you don't have any properties that have to be configured on this level:
package datameer.das.plugin.tutorial05;
import org.apache.hadoop.conf.Configuration;
import datameer.dap.sdk.common.DasContext;
import datameer.dap.sdk.datastore.DataStoreModel;
import datameer.dap.sdk.datastore.DataStoreType;
import datameer.dap.sdk.entity.DataStore;
import datameer.dap.sdk.property.WizardPageDefinition;
public class ExampleDataStoreType extends DataStoreType {
public final static String ID = ExampleDataStoreType.class.getName();
@SuppressWarnings("serial")
public static class ExampleDataStoreModel extends DataStoreModel {
public ExampleDataStoreModel(DataStore dataStore) {
super(dataStore);
}
@Override
public boolean isLocal() {
return false;
}
@Override
public void setupConf(Configuration conf) {
}
@Override
public void testConnect(DasContext dasContext) throws InterruptedException {
}
}
public ExampleDataStoreType() {
super(new ExampleDataImportJobType());
}
@Override
public DataStoreModel createModel(DataStore dataStore) {
return new ExampleDataStoreModel(dataStore);
}
@Override
public String getId() {
return ID;
}
@Override
public String getName() {
return "Example Data Creator";
}
@Override
protected void populateWizardPage(WizardPageDefinition page) {
}
}
Extending ImportJobType
Here is the implementation for our ExampleDataImportJobType. It is very simple, because the main custom logic is in the ImportJobModel that is created by this type.
package datameer.das.plugin.tutorial05;
import datameer.dap.sdk.entity.DataSourceConfiguration;
import datameer.dap.sdk.importjob.ImportJobType;
public class ExampleDataImportJobType extends ImportJobType<String> {
@Override
public ExampleDataImportJobModel createModel(DataSourceConfiguration configuration) {
return new ExampleDataImportJobModel(configuration);
}
}
All the logic to read and parse the custom data and create Datameer X records goes to the ExampleDataImportJobModel. It looks like this:
package datameer.das.plugin.tutorial05;
import java.util.List;
import datameer.dap.sdk.entity.DataSourceConfiguration;
import datameer.dap.sdk.importjob.ImportFormat;
import datameer.dap.sdk.importjob.ImportJobModel;
import datameer.dap.sdk.property.NumberValidator;
import datameer.dap.sdk.property.PropertyDefinition;
import datameer.dap.sdk.property.PropertyGroupDefinition;
import datameer.dap.sdk.property.PropertyType;
import datameer.dap.sdk.property.WizardPageDefinition;
import datameer.dap.sdk.util.ManifestMetaData;
/**
* Creates n files with m random records each. Number of files (n) and number of records (m) can be
* configured using the import job wizard.
*/
public class ExampleDataImportJobModel extends ImportJobModel<String> {
private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID;
private static final String NUMBER_OF_FILES = "files-number";
private static final String RECORDS_PER_FILE = "file-records";
private int _numberOfFiles;
private int _recordsPerFile;
public ExampleDataImportJobModel(DataSourceConfiguration conf) {
super(conf);
_numberOfFiles = conf.getIntProperty(NUMBER_OF_FILES, 1);
_recordsPerFile = conf.getIntProperty(RECORDS_PER_FILE, 1);
}
public int getNumberOfFiles() {
return _numberOfFiles;
}
public int getRecordsPerFile() {
return _recordsPerFile;
}
@Override
public ImportFormat<String> createImportFormat() {
return new ExampleDataImportFormat(this);
}
@Override
public void addPropertyValuesThatTriggerAFilterReset(List<Object> propertyValues) {
}
@Override
public void addPropertyValuesThatTriggerAFieldReset(List<Object> propertyValues) {
}
@Override
public WizardPageDefinition createDetailsWizardPage() {
WizardPageDefinition page = new WizardPageDefinition("Details");
PropertyGroupDefinition group = page.addGroup("Example Data");
PropertyDefinition propertyDefinition = new PropertyDefinition(NUMBER_OF_FILES, "Number of files", PropertyType.STRING);
propertyDefinition.setRequired(true);
propertyDefinition.setValidators(new NumberValidator(1));
group.addPropertyDefinition(propertyDefinition);
propertyDefinition = new PropertyDefinition(RECORDS_PER_FILE, "Records per file", PropertyType.STRING);
propertyDefinition.setRequired(true);
propertyDefinition.setValidators(new NumberValidator(1));
group.addPropertyDefinition(propertyDefinition);
return page;
}
@Override
public WizardPageDefinition createImportJobFilterPage() {
throw new UnsupportedOperationException();
}
@Override
public boolean isSupportImportJobFilterPage() {
return false;
}
@Override
public boolean canAutoMergeNewFields() {
return true;
}
@Override
public void resetFilters() {
}
}
package datameer.das.plugin.tutorial05;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.function.FieldType;
import datameer.dap.sdk.importjob.AbstractImportFormat;
import datameer.dap.sdk.importjob.DasDelegateSplit;
import datameer.dap.sdk.importjob.MapBasedRecordParser;
import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector;
import datameer.dap.sdk.importjob.MapParser;
import datameer.dap.sdk.importjob.NoDataRecordSchemaDetector;
import datameer.dap.sdk.importjob.RecordParser;
import datameer.dap.sdk.importjob.RecordSchemaDetector;
import datameer.dap.sdk.importjob.RecordSourceReader;
import datameer.dap.sdk.importjob.Splitter;
import datameer.dap.sdk.importjob.TextFieldAnalyzer;
import datameer.dap.sdk.util.ManifestMetaData;
/**
* Describes the file format and generates the random records. This would be the class that is used
* to connect to external data and convert it to Datameer X records.
*/
@SuppressWarnings("deprecation")
public class ExampleDataImportFormat extends AbstractImportFormat<String> {
private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID;
/**
* Split that just keeps the number of records that should be written.
*/
public static class ExampleDataSplit implements InputSplit {
private int _records;
@SuppressWarnings("unused")
private ExampleDataSplit() {
}
public ExampleDataSplit(int records) {
_records = records;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(_records);
}
@Override
public void readFields(DataInput in) throws IOException {
_records = in.readInt();
}
@Override
public String[] getLocations() throws IOException {
return new String[0];
}
@Override
public long getLength() throws IOException {
return _records;
}
}
// Unique keys to identify the column origins
private static final String ORIGIN_NAME = "name";
private static final String ORIGIN_AMOUNT = "amount";
private static final String ORIGIN_DOUBLE_VALUE = "doubleValue";
public ExampleDataImportFormat(ExampleDataImportJobModel model) {
super(model, false);
}
@Override
public ExampleDataImportJobModel getImportJobModel() {
return (ExampleDataImportJobModel) super.getImportJobModel();
}
/**
* Describes the schema of the data.
*/
@Override
public RecordSchemaDetector<String> createRecordSchemaDetector() throws IOException {
return new NoDataRecordSchemaDetector<String>() {
@Override
public Field[] detectFields() {
return new Field[] { new Field("name", ORIGIN_NAME, FieldType.STRING, true), new Field("amount", ORIGIN_AMOUNT, FieldType.INTEGER, true),
new Field("double", ORIGIN_DOUBLE_VALUE, FieldType.FLOAT, true) };
}
};
}
/**
* Converts records read from an external data source into Datameer X records. For simplicity we are
* just producing random records.
*/
@Override
public RecordParser<String> createRecordParser(Field[] fields) throws IOException {
return new MapBasedRecordParser<String>(fields, new MapParser<String>() {
private Random _random = new Random();
@Override
public void configureSchemaDetection(MapBasedRecordSchemaDetector<String> schemaDetector, TextFieldAnalyzer fieldAnalyzer) {
}
@Override
public Map<String, Object> parseRecordSource(String recordSource) throws Exception {
Map<String, Object> map = new HashMap<String, Object>();
map.put(ORIGIN_NAME, ORIGIN_NAME + _random.nextInt(1000));
map.put(ORIGIN_AMOUNT, _random.nextInt(10000));
map.put(ORIGIN_DOUBLE_VALUE, _random.nextDouble());
return map;
}
});
}
/**
* Normally reads from an external data source and returns any Java object that will later be
* converted to Datameer X records using the {@link RecordParser} created by
* {@link #createRecordParser(Field[])}. In our case we just create fake String records.
*/
@Override
public RecordSourceReader<String> createRecordSourceReader(final InputSplit inputSplit) throws IOException {
return new RecordSourceReader<String>() {
private int _counter = 0;
@Override
public String readNext() throws IOException {
return _counter++ < inputSplit.getLength() ? "record" : null;
}
@Override
public long getPos() throws IOException {
return _counter;
}
@Override
public void close() throws IOException {
}
};
}
/**
* Splitter that creates as many splits as configured files.
*/
@Override
public Splitter<DasDelegateSplit> getSplitter() throws IOException {
return new Splitter<DasDelegateSplit>() {
@Override
public DasDelegateSplit[] createSplits(SplitHint splitHint) throws IOException {
int numberOfFiles = getImportJobModel().getNumberOfFiles();
int recordsPerFile = getImportJobModel().getRecordsPerFile();
DasDelegateSplit[] splits = new DasDelegateSplit[numberOfFiles];
for (int i = 0; i < numberOfFiles; i++) {
splits[i] = toDasDelegateSplit(new ExampleDataSplit(recordsPerFile));
}
return splits;
}
@Override
public DasDelegateSplit[] createPreviewSplits(Configuration conf, int desiredNumberOfRecords) throws IOException {
return new DasDelegateSplit[] { toDasDelegateSplit(new ExampleDataSplit(10)) };
}
private DasDelegateSplit toDasDelegateSplit(final ExampleDataSplit split) {
return new DasDelegateSplit(split, "plugin-tutorial");
}
};
}
@Override
protected void onConfigure(JobConf conf) {
}
}
For more information, refer to the Java documentation.
Source Code
This tutorial can by found in the Datameer X plug-in SDK under plugin-tutorials/tutorial05.