Skip to content

Commit

Permalink
Add example reading data from an mmaped IPC file
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 15, 2025
1 parent cc18d0f commit 1c0b81e
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 0 deletions.
8 changes: 8 additions & 0 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
///
/// For a higher-level interface see [`FileReader`]
///
/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
///
/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::*;
Expand Down Expand Up @@ -994,6 +998,10 @@ impl FileReaderBuilder {
}

/// Arrow File reader
///
/// For an example creating Arrays with memory mapped (`mmap`) files see the [`zero_copy_ipc`] example.
///
/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
pub struct FileReader<R> {
/// File reader that supports reading and seeking
reader: R,
Expand Down
8 changes: 8 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ criterion = { version = "0.5", default-features = false }
half = { version = "2.1", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
# used in examples
memmap2 = "0.9.3"
bytes = "1.9"

[build-dependencies]

Expand All @@ -105,6 +108,11 @@ name = "read_csv_infer_schema"
required-features = ["prettyprint", "csv"]
path = "./examples/read_csv_infer_schema.rs"

[[example]]
name = "zero_copy_ipc"
required-features = ["prettyprint"]
path = "examples/zero_copy_ipc.rs"

[[bench]]
name = "aggregate_kernels"
harness = false
Expand Down
1 change: 1 addition & 0 deletions arrow/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
- [`dynamic_types.rs`](dynamic_types.rs): Dealing with mixed types dynamically at runtime
- [`read_csv.rs`](read_csv.rs): Reading CSV files with explicit schema, pretty printing Arrays
- [`read_csv_infer_schema.rs`](read_csv_infer_schema.rs): Reading CSV files, pretty printing Arrays
- [`zero_copy_ipc`](zero_copy_ipc): Zero copy read of Arrow IPC streams and file from memory buffers and `mmap`d files
- [`tensor_builder.rs`](tensor_builder.rs): Using tensor builder
- [`version.rs`](version.rs): Print the arrow version and exit
157 changes: 157 additions & 0 deletions arrow/examples/zero_copy_ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Example of reading arrow IPC files and streams using "zero copy" API
//!
//! Zero copy in this case means the Arrow arrays refer directly to a user
//! provided buffer or memory region.
use arrow::array::{record_batch, RecordBatch};
use arrow::error::Result;
use arrow_buffer::Buffer;
use arrow_cast::pretty::pretty_format_batches;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::reader::{read_footer_length, FileDecoder};
use arrow_ipc::writer::FileWriter;
use arrow_ipc::{root_as_footer, Block};
use std::path::PathBuf;
use std::sync::Arc;

/// This example shows how to read data from an Arrow IPC file without copying
/// using `mmap` and the [`FileDecoder`] API
fn main() {
// Create a temporary file with 3 record batches
let ipc_path = ipc_file();
// Open the file and memory map it using the mmap2 crate:
let ipc_file = std::fs::File::open(&ipc_path.path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays. We
// do this by first creating a `bytes::Bytes` (which is zero copy) and then
// creating a Buffer from the `Bytes` (which is also zero copy)
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);

// Now, use the FileDecoder API (wrapped by `IPCBufferDecoder` for
// convenience) to crate Arrays re-using the data in the underlying buffer
let decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 3);

// Create the Arrays and print them
for i in 0..decoder.num_batches() {
let batch = decoder.get_batch(i).unwrap().expect("failed to read batch");
assert_eq!(3, batch.num_rows());
println!("Batch {i}\n{}", pretty_format_batches(&[batch]).unwrap());
}
}

/// Return 3 [`RecordBatch`]es with a single column of type `Int32`
fn example_data() -> Vec<RecordBatch> {
vec![
record_batch!(("my_column", Int32, [1, 2, 3])).unwrap(),
record_batch!(("my_column", Int32, [4, 5, 6])).unwrap(),
record_batch!(("my_column", Int32, [7, 8, 9])).unwrap(),
]
}

/// Return a temporary file that contains an IPC file with 3 [`RecordBatch`]es
fn ipc_file() -> TempFile {
let path = PathBuf::from("example.arrow");
let file = std::fs::File::create(&path).unwrap();
let data = example_data();
let mut writer = FileWriter::try_new(file, &data[0].schema()).unwrap();
for batch in &data {
writer.write(batch).unwrap();
}
writer.finish().unwrap();
TempFile { path }
}

/// Incrementally decodes [`RecordBatch`]es from an IPC file stored in a Arrow
/// [`Buffer`] using the [`FileDecoder`] API.
///
/// This is a wrapper around the example in the `FileDecoder` which handles the
/// low level interaction with the Arrow IPC format.
struct IPCBufferDecoder {
/// Memory (or memory mapped) Buffer with the data
buffer: Buffer,
/// Decoder that reads Arrays that refers to the underlying buffers
decoder: FileDecoder,
/// Location of the batches within the buffer
batches: Vec<Block>,
}

impl IPCBufferDecoder {
fn new(buffer: Buffer) -> Self {
let trailer_start = buffer.len() - 10;
let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();

let schema = fb_to_schema(footer.schema().unwrap());

let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());

// Read dictionaries
for block in footer.dictionaries().iter().flatten() {
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
let data = buffer.slice_with_length(block.offset() as _, block_len);
decoder.read_dictionary(block, &data).unwrap();
}

// convert to Vec from the flatbuffers Vector to avoid having a direct dependency on flatbuffers
let batches = footer
.recordBatches()
.map(|b| b.iter().copied().collect())
.unwrap_or_default();

Self {
buffer,
decoder,
batches,
}
}

/// Return the number of [`RecordBatch`]es in this buffer
fn num_batches(&self) -> usize {
self.batches.len()
}

/// Return the [`RecordBatch`] at message index `i`.
///
/// This may return `None` if the IPC message was None
fn get_batch(&self, i: usize) -> Result<Option<RecordBatch>> {
let block = &self.batches[i];
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
let data = self
.buffer
.slice_with_length(block.offset() as _, block_len);
self.decoder.read_record_batch(block, &data)
}
}

/// This structure deletes the file when it is dropped
struct TempFile {
path: PathBuf,
}

impl Drop for TempFile {
fn drop(&mut self) {
if let Err(e) = std::fs::remove_file(&self.path) {
println!("Error deleting '{:?}': {:?}", self.path, e);
}
}
}

0 comments on commit 1c0b81e

Please sign in to comment.