How to Read ORC Files from S3 or an InputStream in Java

Published Dec 20, 2021  ∙  Updated Aug 25, 2022

Let’s see how we can read ORC files directly from S3 or an InputStream in a Java class not running in a Spark context.

Problem statement: given an S3Object from the AWS SDK for Java that contains an ORC file, how can we evaluate the contents of the file?

1. OrcReader example usage

In this article, we’ll create an OrcReader that lazily evaluates each row in our ORC file. It can be instantiated and used like so:

void readOrcFromS3(S3Object s3Object) {
  InputStream in = s3Object.getObjectContent();
  OrcReader orcReader = new OrcReader(in);
  orcReader.getStream().forEach(row -> {
    // Do something with `row`
  });
}

Each row in our file will be returned as a Map<String, Object>.

2. Relevant org.apache.orc dependencies

The code below was tested with the following dependency.

<dependency>
  <groupId>org.apache.orc</groupId>
  <artifactId>orc-core</artifactId>
  <version>1.7.5</version>
  <classifier>nohive</classifier>
</dependency>

3. OrcReader explanation and code

The Apache-provided OrcFile::createReader is designed to read from the Hadoop filesystem, not directly from S3. However, it can also accept a local file.

The general steps are as follows:

  1. Create a local file with an arbitrary filename
  2. Populate the file with the contents of the input stream
  3. Initialize the ORC reader by specifying the local file path

Upcoming is a big wall of code. I recommend copying this code into your IDE and reading through.

import java.io.*;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.*;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.*;
import org.apache.orc.storage.ql.exec.vector.*;

public class OrcReader implements AutoCloseable {
  private RecordReader recordReader;
  private boolean readBatch = false;
  private VectorizedRowBatch batch;
  private TypeDescription schema;
  private int batchNum = 0;
  private int fileNum = 0;
  private Reader orcReader;
  private File tempFile;
  public OrcReader(InputStream in) throws IOException {
    // 1. Create local temp file
    tempFile = File.createTempFile("file", ".orc");
    // 2. Copy input stream to the file
    try (OutputStream out = new FileOutputStream(tempFile, false)) {
      in.transferTo(out);
    }
    // 3. Initialize the ORC reader
    orcReader =
      OrcFile.createReader(
        new Path(tempFile.getPath()), 
        OrcFile.readerOptions(new Configuration())
      );
    tempFile.deleteOnExit();
    recordReader = orcReader.rows();
    schema = orcReader.getSchema();
    batch = schema.createRowBatch();
  }
  @Override
  public void close() throws IOException {
    if (recordReader != null) { recordReader.close(); }
    if (tempFile != null) { tempFile.delete(); }
  }
  public Map<String, Object> readRow() throws IOException {
    Map<String, Object> row = new HashMap<>();
    if (!readBatch) {
      recordReader.nextBatch(batch);
      readBatch = true;
    }
    if (batchNum == batch.getMaxSize()) {
      recordReader.nextBatch(batch);
      batchNum = 0;
    }
    if (fileNum < orcReader.getNumberOfRows()) {
      readBatchRow(batchNum, row);
      batchNum++;
      fileNum++;
    }
    return row;
  }
  public Stream<Map<String, Object>> getStream() {
    Iterator<Map<String, Object>> iterator = new Iterator<>() {
      Map<String, Object> nextRow = null;
      @Override
      public boolean hasNext() {
        if (nextRow != null) { return true; }
        else {
          try {
            Map<String, Object> nextRow;
            if ((nextRow = readRow()).size() > 0) {
              this.nextRow = nextRow;
              return true;
            }
            return false;
          } catch (IOException e) {
            throw new UncheckedIOException(e);
          }
        }
      }
      @Override
      public Map<String, Object> next() {
        if (nextRow != null || hasNext()) {
          Map<String, Object> line = nextRow;
          nextRow = null;
          return line;
        } else {
          throw new NoSuchElementException();
        }
      }
    };
    return StreamSupport.stream(
      Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.NONNULL),
      false
    );
  }
  private void readBatchRow(int rowNum, Map<String, Object> row) {
    final int numCols = batch.numCols;
    final ColumnVector[] cols = batch.cols;
    List<TypeDescription> colTypes = schema.getChildren();
    for (var colNum = 0; colNum < numCols; colNum++) {
      if (cols[colNum] == null) continue;
      Object colObj = readColumn(cols[colNum], colTypes.get(colNum), rowNum);
      row.put(schema.getFieldNames().get(colNum), colObj);
    }
  }
  private Object readColumn(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Object columnObj = null;
    if (!colVec.isNull[rowNum]) {
      switch (colVec.type) {
        case LONG: columnObj = readLongVal(colVec, colType, rowNum); break;
        case DOUBLE: columnObj = ((DoubleColumnVector) colVec).vector[rowNum]; break;
        case BYTES: columnObj = readBytesVal(colVec, rowNum); break;
        case DECIMAL: columnObj = readDecimalVal(colVec, rowNum); break;
        case TIMESTAMP: columnObj = readTimestampVal(colVec, colType, rowNum); break;
        case STRUCT: columnObj = readStructVal(colVec, colType, rowNum); break;
        case LIST: columnObj = readListVal(colVec, colType, rowNum); break;
        case MAP: columnObj = readMapVal(colVec, colType, rowNum); break;
        case UNION: columnObj = readUnionVal(colVec, colType, rowNum); break;
        default: throw new RuntimeException("readColumn: unsupported ORC file column type: " + colVec.type.name());
      }
    }
    return columnObj;
  }
  private Object readLongVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Object colObj = null;
    if (!colVec.isNull[rowNum]) {
      LongColumnVector longVec = (LongColumnVector) colVec;
      long longVal = longVec.vector[rowNum];
      colObj = longVal;
      if (colType.getCategory() == TypeDescription.Category.INT) {
        colObj = (int) longVal;
      } else if (colType.getCategory() == TypeDescription.Category.BOOLEAN) {
        colObj = longVal == 1 ? Boolean.TRUE : Boolean.FALSE;
      } else if (colType.getCategory() == TypeDescription.Category.DATE) {
        colObj = new Date(longVal);
      }
    }
    return colObj;
  }
  private Object readBytesVal(ColumnVector colVec, int rowNum) {
    Object bytesObj = null;
    if (!colVec.isNull[rowNum]) {
      BytesColumnVector bytesVector = (BytesColumnVector) colVec;
      bytesObj = bytesVector.toString(rowNum);
    }
    return bytesObj;
  }
  private Object readDecimalVal(ColumnVector colVec, int rowNum) {
    Object decimalObj = null;
    if (!colVec.isNull[rowNum]) {
      DecimalColumnVector decimalVec = (DecimalColumnVector) colVec;
      decimalObj = decimalVec.vector[rowNum].getHiveDecimal().bigDecimalValue();
    }
    return decimalObj;
  }
  private Object readTimestampVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Object timestampVal = null;
    if (!colVec.isNull[rowNum]) {
      TimestampColumnVector timestampVec = (TimestampColumnVector) colVec;
      int nanos = timestampVec.nanos[rowNum];
      long millis = timestampVec.time[rowNum];
      Timestamp timestamp = new Timestamp(millis);
      timestamp.setNanos(nanos);
      timestampVal = timestamp;
      if (colType.getCategory() == TypeDescription.Category.DATE) {
        timestampVal = new Date(timestamp.getTime());
      }
    }
    return timestampVal;
  }
  private Object readStructVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Object structObj = null;
    if (!colVec.isNull[rowNum]) {
      List<Object> fieldValList = new ArrayList<>();
      StructColumnVector structVector = (StructColumnVector) colVec;
      ColumnVector[] fieldVec = structVector.fields;
      List<TypeDescription> fieldTypes = colType.getChildren();
      for (var i = 0; i < fieldVec.length; i++) {
        Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), rowNum);
        fieldValList.add(fieldObj);
      }
      structObj = fieldValList;
    }
    return structObj;
  }
  private Object readMapVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Map<Object, Object> objMap = new HashMap<>();
    MapColumnVector mapVector = (MapColumnVector) colVec;
    if (checkMapColumnVectorTypes(mapVector)) {
      int mapSize = (int) mapVector.lengths[rowNum];
      int offset = (int) mapVector.offsets[rowNum];
      List<TypeDescription> mapTypes = colType.getChildren();
      TypeDescription keyType = mapTypes.get(0);
      TypeDescription valueType = mapTypes.get(1);
      ColumnVector keyChild = mapVector.keys;
      ColumnVector valueChild = mapVector.values;
      List<Object> keyList = readMapVector(keyChild, keyType, offset, mapSize);
      List<Object> valueList = readMapVector(valueChild, valueType, offset, mapSize);
      for (var i = 0; i < keyList.size(); i++) {
        objMap.put(keyList.get(i), valueList.get(i));
      }
    } else {
      throw new RuntimeException("readMapVal: unsupported key or value types");
    }
    return objMap;
  }
  private boolean checkMapColumnVectorTypes(MapColumnVector mapVector) {
    ColumnVector.Type keyType = mapVector.keys.type;
    ColumnVector.Type valueType = mapVector.values.type;
    return ((
      keyType == ColumnVector.Type.BYTES ||
      keyType == ColumnVector.Type.LONG ||
      keyType == ColumnVector.Type.DOUBLE
    ) && (
      valueType == ColumnVector.Type.LONG ||
      valueType == ColumnVector.Type.DOUBLE ||
      valueType == ColumnVector.Type.BYTES ||
      valueType == ColumnVector.Type.DECIMAL ||
      valueType == ColumnVector.Type.TIMESTAMP
    ));
  }
  private List<Object> readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
    @SuppressWarnings("unchecked")
    List<Object> mapList;
    switch (mapVector.type) {
      case BYTES: mapList = (List<Object>) readBytesListVector((BytesColumnVector) mapVector, childType, offset, numValues); break;
      case LONG: mapList = (List<Object>) readLongListVector((LongColumnVector) mapVector, childType, offset, numValues); break;
      case DOUBLE: mapList = (List<Object>) readDoubleListVector((DoubleColumnVector) mapVector, offset, numValues); break;
      case DECIMAL: mapList = (List<Object>) readDecimalListVector((DecimalColumnVector) mapVector, offset, numValues); break;
      case TIMESTAMP: mapList = (List<Object>) readTimestampListVector((TimestampColumnVector) mapVector, childType, offset, numValues); break;
      default: throw new RuntimeException(mapVector.type.name() + " is not supported for MapColumnVectors");
    }
    return mapList;
  }
  private Object readUnionVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Pair<TypeDescription, Object> columnValuePair;
    UnionColumnVector unionVector = (UnionColumnVector) colVec;
    int tagVal = unionVector.tags[rowNum];
    List<TypeDescription> unionFieldTypes = colType.getChildren();
    if (tagVal < unionFieldTypes.size()) {
      TypeDescription fieldType = unionFieldTypes.get(tagVal);
      if (tagVal < unionVector.fields.length) {
        ColumnVector fieldVector = unionVector.fields[tagVal];
        Object unionValue = readColumn(fieldVector, fieldType, rowNum);
        columnValuePair = new Pair<>(fieldType, unionValue);
      } else {
        throw new RuntimeException("readUnionVal: union tag value out of range for union column vectors");
      }
    } else {
      throw new RuntimeException("readUnionVal: union tag value out of range for union types");
    }
    return columnValuePair;
  }
  private Object readListVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
    Object listValues = null;
    if (!colVec.isNull[rowNum]) {
      ListColumnVector listVector = (ListColumnVector) colVec;
      ColumnVector listChildVector = listVector.child;
      TypeDescription childType = colType.getChildren().get(0);
      switch (listChildVector.type) {
        case LONG: listValues = readLongListValues(listVector, childType, rowNum); break;
        case DOUBLE: listValues = readDoubleListValues(listVector, rowNum); break;
        case BYTES: listValues = readBytesListValues(listVector, childType, rowNum); break;
        case DECIMAL: listValues = readDecimalListValues(listVector, rowNum); break;
        case TIMESTAMP: listValues = readTimestampListValues(listVector, childType, rowNum); break;
        default:
          throw new RuntimeException(
            listVector.type.name() + " is not supported for ListColumnVectors"
          );
      }
    }
    return listValues;
  }
  private Object readLongListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
    int offset = (int) listVector.offsets[rowNum];
    int numValues = (int) listVector.lengths[rowNum];
    LongColumnVector longVector = (LongColumnVector) listVector.child;
    return readLongListVector(longVector, childType, offset, numValues);
  }
  private List<Object> readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
    List<Object> longList = new ArrayList<>();
    for (var i = 0; i < numValues; i++) {
      if (!longVector.isNull[offset + i]) {
        long longVal = longVector.vector[offset + i];
        if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
          Boolean boolVal = longVal == 0 ? Boolean.valueOf(false) : Boolean.valueOf(true);
          longList.add(boolVal);
        } else if (childType.getCategory() == TypeDescription.Category.INT) {
          Integer intObj = (int) longVal;
          longList.add(intObj);
        } else {
          longList.add(longVal);
        }
      } else {
        longList.add(null);
      }
    }
    return longList;
  }
  private Object readDoubleListValues(ListColumnVector listVector, int rowNum) {
    int offset = (int) listVector.offsets[rowNum];
    int numValues = (int) listVector.lengths[rowNum];
    DoubleColumnVector doubleVec = (DoubleColumnVector) listVector.child;
    return readDoubleListVector(doubleVec, offset, numValues);
  }
  private Object readDoubleListVector(DoubleColumnVector doubleVec, int offset, int numValues) {
    List<Object> doubleList = new ArrayList<>();
    for (var i = 0; i < numValues; i++) {
      if (!doubleVec.isNull[offset + i]) {
        Double doubleVal = doubleVec.vector[offset + i];
        doubleList.add(doubleVal);
      } else {
        doubleList.add(null);
      }
    }
    return doubleList;
  }
  private Object readBytesListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
    int offset = (int) listVector.offsets[rowNum];
    int numValues = (int) listVector.lengths[rowNum];
    BytesColumnVector bytesVec = (BytesColumnVector) listVector.child;
    return readBytesListVector(bytesVec, childType, offset, numValues);
  }
  private Object readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
    List<Object> bytesValList = new ArrayList<>();
    for (var i = 0; i < numValues; i++) {
      if (!bytesVec.isNull[offset + i]) {
        byte[] byteArray = bytesVec.vector[offset + i];
        int vecLen = bytesVec.length[offset + i];
        int vecStart = bytesVec.start[offset + i];
        byte[] vecCopy = Arrays.copyOfRange(byteArray, vecStart, vecStart + vecLen);
        if (childType.getCategory() == TypeDescription.Category.STRING) {
          String str = new String(vecCopy);
          bytesValList.add(str);
        } else {
          bytesValList.add(vecCopy);
        }
      } else {
        bytesValList.add(null);
      }
    }
    return bytesValList;
  }
  private Object readDecimalListValues(ListColumnVector listVector, int rowNum) {
    int offset = (int) listVector.offsets[rowNum];
    int numValues = (int) listVector.lengths[rowNum];
    DecimalColumnVector decimalVec = (DecimalColumnVector) listVector.child;
    return readDecimalListVector(decimalVec, offset, numValues);
  }
  private Object readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
    List<Object> decimalList = new ArrayList<>();
    for (var i = 0; i < numValues; i++) {
      if (!decimalVector.isNull[offset + i]) {
        BigDecimal bigDecimal =
          decimalVector.vector[batchNum].getHiveDecimal().bigDecimalValue();
        decimalList.add(bigDecimal);
      } else {
        decimalList.add(null);
      }
    }
    return decimalList;
  }
  private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
    int offset = (int) listVector.offsets[rowNum];
    int numValues = (int) listVector.lengths[rowNum];
    TimestampColumnVector timestampVec = (TimestampColumnVector) listVector.child;
    return readTimestampListVector(timestampVec, childType, offset, numValues);
  }
  private Object readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
    List<Object> timestampList = new ArrayList<>();
    for (var i = 0; i < numValues; i++) {
      if (!timestampVector.isNull[offset + i]) {
        int nanos = timestampVector.nanos[offset + i];
        long millis = timestampVector.time[offset + i];
        Timestamp timestamp = new Timestamp(millis);
        timestamp.setNanos(nanos);
        if (childType.getCategory() == TypeDescription.Category.DATE) {
          Date date = new Date(timestamp.getTime());
          timestampList.add(date);
        } else {
          timestampList.add(timestamp);
        }
      } else {
        timestampList.add(null);
      }
    }
    return timestampList;
  }
  private static void copyStreamToFile(InputStream input, File file)
    throws IOException {
    try (OutputStream output = new FileOutputStream(file, false)) {
      input.transferTo(output);
    }
  }
}