Native Metadata Schema Strategy

The following implementation of metadata schema strategy uses a simple schema encoding in the sequence file metadata to generate its Datameer schema. The keys to the metadata TreeMap are the column names. The values are the Java types. Since TreeMap is ordered, Datameer uses the insertion order of the metadata as the column ordering. This class takes advantage of a base support class provided by the plug-in, BaseMetadataSchemaStrategy, so all implementations should subclass this type:

package datameer.das.plugin.sequencefile.importjob;

import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.function.FieldType;

public class NaiveMetadataSchemaStrategy extends BaseMetadataSchemaStrategy {

  static final String LABEL = "Naive Metadata Strategy";
  static final String COLUMN_NAMES_KEY = "columnNames";
  static final String COLUMN_TYPES_KEY = "columnTypes";

  @Override
  public Field[] createSchemaFromMetadata(Metadata metadata) {
    String[] columnNames = extractStringArray(metadata, COLUMN_NAMES_KEY);
    String[] columnTypes = extractStringArray(metadata, COLUMN_TYPES_KEY);

    validate(columnNames, columnTypes);

    return buildSchema(columnNames, columnTypes);
  }

  @Override
  public Object parseFieldValue(Field field, String[] rawFields) {
    return parseFromString(field.getType(), rawFields[Integer.parseInt(field.getOrigin())]);
  }

  String[] extractStringArray(Metadata metadata, String key) {
    Text delimitedRow = metadata.get(new Text(key));
    if (delimitedRow != null) {
      return delimitedRow.toString().split(",");
    }

    return new String[] {};
  }

  Field[] buildSchema(String[] columnNames, String[] columnTypes) {
    Field[] schema = new Field[columnNames.length];
    for (int i = 0; i < columnNames.length; i++) {
      schema[i] = new Field(columnNames[i], FieldType.lookup(translateToDasType(columnTypes[i])));
      schema[i].setOrigin(i + ""); // store the index of the field in the columnArray
    }

    return schema;
  }

  Class<?> translateToDasType(String incomingJavaClassName) {
    if ("java.lang.Integer".equals(incomingJavaClassName)) {
      return Long.class;
    } else if ("java.lang.Float".equals(incomingJavaClassName)) {
      return Double.class;
    }

    try {
      return Class.forName(incomingJavaClassName);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException("Unable to translate class name to Datameer type", e);
    }
  }

  void validate(String[] columnNames, String[] columnTypes) {
    if (columnNames.length == 0 || columnTypes.length == 0 || columnNames.length != columnTypes.length) {
      throw new RuntimeException("Unable to create schema, SequenceFile.Metadata is invalid");
    }
  }

  Object parseFromString(FieldType fieldType, String rawFieldValue) {
    switch (fieldType) {
    case STRING:
      return rawFieldValue;
    case INTEGER:
      return Long.parseLong(rawFieldValue);
    case ANY:
    case BOOLEAN:
    case DATE:
    case FLOAT:
    case ERROR:
      throw new UnsupportedOperationException();
    }

    throw new IllegalArgumentException();
  }

  @Override
  public String getId() {
    return getClass().getName();
  }

  @Override
  public String getName() {
    return LABEL;
  }
}