From 63516742e706237563360457233b7961ae24df78 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Tue, 2 Jul 2024 02:07:19 -0700 Subject: [PATCH] Provide Arrow Schema Hint to Parquet Reader - Alternative 2 (#5939) * Adds option for providing a schema to the Arrow Parquet Reader. * Adds more complete tests. Adds a more detailed error message for incompatible columns. Adds nested fields to test_with_schema. Adds test for incompatible nested field. Updates documentation. * Add an example using showing how to use the with_schema option. --------- Co-authored-by: Eric Fredine --- parquet/src/arrow/arrow_reader/mod.rs | 425 ++++++++++++++++++++++++-- 1 file changed, 403 insertions(+), 22 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 070dda97c59a..b38092dbc937 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -25,25 +25,24 @@ use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; +pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; -use crate::arrow::{FieldLevels, ProjectionMask}; +use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; +use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; +use crate::file::footer; use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::index_reader; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; mod filter; mod selection; -pub use crate::arrow::array_reader::RowGroups; -use crate::column::page::{PageIterator, PageReader}; -use crate::file::footer; -use crate::file::page_index::index_reader; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; - /// Builder for constructing parquet readers into arrow. /// /// Most users should use one of the following specializations: @@ -250,6 +249,8 @@ impl ArrowReaderBuilder { pub struct ArrowReaderOptions { /// Should the reader strip any user defined metadata from the Arrow schema skip_arrow_metadata: bool, + /// If provided used as the schema for the file, otherwise the schema is read from the file + supplied_schema: Option, /// If true, attempt to read `OffsetIndex` and `ColumnIndex` pub(crate) page_index: bool, } @@ -273,6 +274,60 @@ impl ArrowReaderOptions { } } + /// Provide a schema to use when reading the parquet file. If provided it + /// takes precedence over the schema inferred from the file or the schema defined + /// in the file's metadata. If the schema is not compatible with the file's + /// schema an error will be returned when constructing the builder. + /// + /// This option is only required if you want to cast columns to a different type. + /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp + /// in the Arrow schema. + /// + /// The supplied schema must have the same number of columns as the parquet schema and + /// the column names need to be the same. + /// + /// # Example + /// ``` + /// use std::io::Bytes; + /// use std::sync::Arc; + /// use tempfile::tempfile; + /// use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + /// use arrow_schema::{DataType, Field, Schema, TimeUnit}; + /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; + /// use parquet::arrow::ArrowWriter; + /// + /// // Write data - schema is inferred from the data to be Int32 + /// let file = tempfile().unwrap(); + /// let batch = RecordBatch::try_from_iter(vec![ + /// ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + /// ]).unwrap(); + /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap(); + /// writer.write(&batch).unwrap(); + /// writer.close().unwrap(); + /// + /// // Read the file back. + /// // Supply a schema that interprets the Int32 column as a Timestamp. + /// let supplied_schema = Arc::new(Schema::new(vec![ + /// Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false) + /// ])); + /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + /// file.try_clone().unwrap(), + /// options + /// ).expect("Error if the schema is not compatible with the parquet file schema."); + /// + /// // Create the reader and read the data using the supplied schema. + /// let mut reader = builder.build().unwrap(); + /// let _batch = reader.next().unwrap().unwrap(); + /// ``` + pub fn with_schema(self, schema: SchemaRef) -> Self { + Self { + supplied_schema: Some(schema), + skip_arrow_metadata: true, + ..self + } + } + /// Enable reading [`PageIndex`], if present (defaults to `false`) /// /// The `PageIndex` can be used to push down predicates to the parquet scan, @@ -353,22 +408,77 @@ impl ArrowReaderMetadata { /// This function does not attempt to load the PageIndex if not present in the metadata. /// See [`Self::load`] for more details. pub fn try_new(metadata: Arc, options: ArrowReaderOptions) -> Result { - let kv_metadata = match options.skip_arrow_metadata { - true => None, - false => metadata.file_metadata().key_value_metadata(), - }; + match options.supplied_schema { + Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()), + None => { + let kv_metadata = match options.skip_arrow_metadata { + true => None, + false => metadata.file_metadata().key_value_metadata(), + }; - let (schema, fields) = parquet_to_arrow_schema_and_fields( - metadata.file_metadata().schema_descr(), + let (schema, fields) = parquet_to_arrow_schema_and_fields( + metadata.file_metadata().schema_descr(), + ProjectionMask::all(), + kv_metadata, + )?; + + Ok(Self { + metadata, + schema: Arc::new(schema), + fields: fields.map(Arc::new), + }) + } + } + } + + fn with_supplied_schema( + metadata: Arc, + supplied_schema: SchemaRef, + ) -> Result { + let parquet_schema = metadata.file_metadata().schema_descr(); + let field_levels = parquet_to_arrow_field_levels( + parquet_schema, ProjectionMask::all(), - kv_metadata, + Some(supplied_schema.fields()), )?; - - Ok(Self { - metadata, - schema: Arc::new(schema), - fields: fields.map(Arc::new), - }) + let fields = field_levels.fields; + let inferred_len = fields.len(); + let supplied_len = supplied_schema.fields().len(); + // Ensure the supplied schema has the same number of columns as the parquet schema. + // parquet_to_arrow_field_levels is expected to throw an error if the schemas have + // different lengths, but we check here to be safe. + if inferred_len != supplied_len { + Err(arrow_err!(format!( + "incompatible arrow schema, expected {} columns received {}", + inferred_len, supplied_len + ))) + } else { + let diff_fields: Vec<_> = supplied_schema + .fields() + .iter() + .zip(fields.iter()) + .filter_map(|(field1, field2)| { + if field1 != field2 { + Some(field1.name().clone()) + } else { + None + } + }) + .collect(); + + if !diff_fields.is_empty() { + Err(ParquetError::ArrowError(format!( + "incompatible arrow schema, the following fields could not be cast: [{}]", + diff_fields.join(", ") + ))) + } else { + Ok(Self { + metadata, + schema: supplied_schema, + fields: field_levels.levels.map(Arc::new), + }) + } + } } /// Returns a reference to the [`ParquetMetaData`] for this parquet file @@ -842,7 +952,7 @@ mod tests { use arrow_array::*; use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime}; use arrow_data::ArrayDataBuilder; - use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema}; + use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef}; use arrow_select::concat::concat_batches; use crate::arrow::arrow_reader::{ @@ -2307,10 +2417,12 @@ mod tests { fn test_invalid_utf8_string_array() { test_invalid_utf8_string_array_inner::(); } + #[test] fn test_invalid_utf8_large_string_array() { test_invalid_utf8_string_array_inner::(); } + fn test_invalid_utf8_string_array_inner() { let cases = [ ( @@ -2620,6 +2732,275 @@ mod tests { assert_eq!(reader.schema(), schema_without_metadata); } + fn write_parquet_from_iter(value: I) -> File + where + I: IntoIterator, + F: AsRef, + { + let batch = RecordBatch::try_from_iter(value).unwrap(); + let file = tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + file + } + + fn run_schema_test_with_error(value: I, schema: SchemaRef, expected_error: &str) + where + I: IntoIterator, + F: AsRef, + { + let file = write_parquet_from_iter(value); + let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone()); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ); + assert_eq!(builder.err().unwrap().to_string(), expected_error); + } + + #[test] + fn test_schema_too_few_columns() { + run_schema_test_with_error( + vec![ + ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef), + ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef), + ], + Arc::new(Schema::new(vec![Field::new( + "int64", + ArrowDataType::Int64, + false, + )])), + "Arrow: incompatible arrow schema, expected 2 struct fields got 1", + ); + } + + #[test] + fn test_schema_too_many_columns() { + run_schema_test_with_error( + vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)], + Arc::new(Schema::new(vec![ + Field::new("int64", ArrowDataType::Int64, false), + Field::new("int32", ArrowDataType::Int32, false), + ])), + "Arrow: incompatible arrow schema, expected 1 struct fields got 2", + ); + } + + #[test] + fn test_schema_mismatched_column_names() { + run_schema_test_with_error( + vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)], + Arc::new(Schema::new(vec![Field::new( + "other", + ArrowDataType::Int64, + false, + )])), + "Arrow: incompatible arrow schema, expected field named int64 got other", + ); + } + + #[test] + fn test_schema_incompatible_columns() { + run_schema_test_with_error( + vec![ + ( + "col1_invalid", + Arc::new(Int64Array::from(vec![0])) as ArrayRef, + ), + ( + "col2_valid", + Arc::new(Int32Array::from(vec![0])) as ArrayRef, + ), + ( + "col3_invalid", + Arc::new(Date64Array::from(vec![0])) as ArrayRef, + ), + ], + Arc::new(Schema::new(vec![ + Field::new("col1_invalid", ArrowDataType::Int32, false), + Field::new("col2_valid", ArrowDataType::Int32, false), + Field::new("col3_invalid", ArrowDataType::Int32, false), + ])), + "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]", + ); + } + + #[test] + fn test_one_incompatible_nested_column() { + let nested_fields = Fields::from(vec![ + Field::new("nested1_valid", ArrowDataType::Utf8, false), + Field::new("nested1_invalid", ArrowDataType::Int64, false), + ]); + let nested = StructArray::try_new( + nested_fields, + vec![ + Arc::new(StringArray::from(vec!["a"])) as ArrayRef, + Arc::new(Int64Array::from(vec![0])) as ArrayRef, + ], + None, + ) + .expect("struct array"); + let supplied_nested_fields = Fields::from(vec![ + Field::new("nested1_valid", ArrowDataType::Utf8, false), + Field::new("nested1_invalid", ArrowDataType::Int32, false), + ]); + run_schema_test_with_error( + vec![ + ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef), + ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef), + ("nested", Arc::new(nested) as ArrayRef), + ], + Arc::new(Schema::new(vec![ + Field::new("col1", ArrowDataType::Int64, false), + Field::new("col2", ArrowDataType::Int32, false), + Field::new( + "nested", + ArrowDataType::Struct(supplied_nested_fields), + false, + ), + ])), + "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]", + ); + } + + #[test] + fn test_with_schema() { + let nested_fields = Fields::from(vec![ + Field::new("utf8_to_dict", ArrowDataType::Utf8, false), + Field::new("int64_to_ts_nano", ArrowDataType::Int64, false), + ]); + + let nested_arrays: Vec = vec![ + Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef, + Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef, + ]; + + let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap(); + + let file = write_parquet_from_iter(vec![ + ( + "int32_to_ts_second", + Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef, + ), + ( + "date32_to_date64", + Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef, + ), + ("nested", Arc::new(nested) as ArrayRef), + ]); + + let supplied_nested_fields = Fields::from(vec![ + Field::new( + "utf8_to_dict", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::Int32), + Box::new(ArrowDataType::Utf8), + ), + false, + ), + Field::new( + "int64_to_ts_nano", + ArrowDataType::Timestamp( + arrow::datatypes::TimeUnit::Nanosecond, + Some("+10:00".into()), + ), + false, + ), + ]); + + let supplied_schema = Arc::new(Schema::new(vec![ + Field::new( + "int32_to_ts_second", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())), + false, + ), + Field::new("date32_to_date64", ArrowDataType::Date64, false), + Field::new( + "nested", + ArrowDataType::Struct(supplied_nested_fields), + false, + ), + ])); + + let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options, + ) + .expect("reader builder with schema") + .build() + .expect("reader with schema"); + + assert_eq!(arrow_reader.schema(), supplied_schema); + let batch = arrow_reader.next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.num_rows(), 4); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .expect("downcast to timestamp second") + .value_as_datetime_with_tz(0, "+01:00".parse().unwrap()) + .map(|v| v.to_string()) + .expect("value as datetime"), + "1970-01-01 01:00:00 +01:00" + ); + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .expect("downcast to date64") + .value_as_date(0) + .map(|v| v.to_string()) + .expect("value as date"), + "1970-01-01" + ); + + let nested = batch + .column(2) + .as_any() + .downcast_ref::() + .expect("downcast to struct"); + + let nested_dict = nested + .column(0) + .as_any() + .downcast_ref::() + .expect("downcast to dictionary"); + + assert_eq!( + nested_dict + .values() + .as_any() + .downcast_ref::() + .expect("downcast to string") + .iter() + .collect::>(), + vec![Some("a"), Some("b")] + ); + + assert_eq!( + nested_dict.keys().iter().collect::>(), + vec![Some(0), Some(0), Some(0), Some(1)] + ); + + assert_eq!( + nested + .column(1) + .as_any() + .downcast_ref::() + .expect("downcast to timestamp nanosecond") + .value_as_datetime_with_tz(0, "+10:00".parse().unwrap()) + .map(|v| v.to_string()) + .expect("value as datetime"), + "1970-01-01 10:00:00.000000001 +10:00" + ); + } + #[test] fn test_empty_projection() { let testdata = arrow::util::test_util::parquet_test_data();