Native Metadata Schema Strategy

The following implementation of metadata schema strategy uses a simple schema encoding in the sequence file metadata to generate its Datameer X schema. The keys to the metadata TreeMap are the column names. The values are the Java types. Since TreeMap is ordered, Datameer X uses the insertion order of the metadata as the column ordering. This class takes advantage of a base support class provided by the plug-in, BaseMetadataSchemaStrategy, so all implementations should subclass this type:

package datameer.das.plugin.sequencefile.importjob;

import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.function.FieldType;

public class NaiveMetadataSchemaStrategy extends BaseMetadataSchemaStrategy {

  static final String LABEL = "Naive Metadata Strategy";
  static final String COLUMN_NAMES_KEY = "columnNames";
  static final String COLUMN_TYPES_KEY = "columnTypes";

  @Override
  public Field[] createSchemaFromMetadata(Metadata metadata) {
    String[] columnNames = extractStringArray(metadata, COLUMN_NAMES_KEY);
    String[] columnTypes = extractStringArray(metadata, COLUMN_TYPES_KEY);

    validate(columnNames, columnTypes);

    return buildSchema(columnNames, columnTypes);
  }

  @Override
  public Object parseFieldValue(Field field, String[] rawFields) {
    return parseFromString(field.getType(), rawFields[Integer.parseInt(field.getOrigin())]);
  }

  String[] extractStringArray(Metadata metadata, String key) {
    Text delimitedRow = metadata.get(new Text(key));
    if (delimitedRow != null) {
      return delimitedRow.toString().split(",");
    }

    return new String[] {};
  }

  Field[] buildSchema(String[] columnNames, String[] columnTypes) {
    Field[] schema = new Field[columnNames.length];
    for (int i = 0; i < columnNames.length; i++) {
      schema[i] = new Field(columnNames[i], FieldType.lookup(translateToDasType(columnTypes[i])));
      schema[i].setOrigin(i + ""); // store the index of the field in the columnArray
    }

    return schema;
  }

  Class<?> translateToDasType(String incomingJavaClassName) {
    if ("java.lang.Integer".equals(incomingJavaClassName)) {
      return Long.class;
    } else if ("java.lang.Float".equals(incomingJavaClassName)) {
      return Double.class;
    }

    try {
      return Class.forName(incomingJavaClassName);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException("Unable to translate class name to Datameer X type", e);
    }
  }

  void validate(String[] columnNames, String[] columnTypes) {
    if (columnNames.length == 0 || columnTypes.length == 0 || columnNames.length != columnTypes.length) {
      throw new RuntimeException("Unable to create schema, SequenceFile.Metadata is invalid");
    }
  }

  Object parseFromString(FieldType fieldType, String rawFieldValue) {
    switch (fieldType) {
    case STRING:
      return rawFieldValue;
    case INTEGER:
      return Long.parseLong(rawFieldValue);
    case ANY:
    case BOOLEAN:
    case DATE:
    case FLOAT:
    case ERROR:
      throw new UnsupportedOperationException();
    }

    throw new IllegalArgumentException();
  }

  @Override
  public String getId() {
    return getClass().getName();
  }

  @Override
  public String getName() {
    return LABEL;
  }
}