Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3v3 support #73

Open
wants to merge 4 commits into
base: parquetjs-lite
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,32 @@ let reader = await parquet.ParquetReader.openUrl(request,'https://domain/fruits.

Parquet files can be read from an S3 object without having to download the whole file.
You will have to supply the aws-sdk client as first argument and the bucket/key information
as second argument to the function `parquetReader.openS3`.
as second argument to the function `parquetReader.openS3`.

If using version 3 of the aws-sdk for your S3 client, supply as the client argument an object
containing an already constructed S3Client along with the plain HeadObjectCommand and
GetObjectCommand modules.

``` js
const params = {
Bucket: 'xxxxxxxxxxx',
Key: 'xxxxxxxxxxx'
};
// v2 example
const AWS = require('aws-sdk');
const client = new AWS.S3({
accessKeyId: 'xxxxxxxxxxx',
secretAccessKey: 'xxxxxxxxxxx'
});

const params = {
Bucket: 'xxxxxxxxxxx',
Key: 'xxxxxxxxxxx'
};

let reader = await parquet.ParquetReader.openS3(client,params);

//v3 example
const {S3Client, HeadObjectCommand, GetObjectCommand} = require('@aws-sdk/client-s3');
const client = new S3Client({region:"us-east-1"});
let reader = await parquet.ParquetReader.openS3(
{S3Client:client, HeadObjectCommand, GetObjectCommand},
params
);
```

### Reading data from a buffer
Expand Down
90 changes: 86 additions & 4 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,47 @@ class ParquetReader {
}

/**
* Open the parquet file from S3 using the supplied aws client and params
* The params have to include `Bucket` and `Key` to the file requested
* This function returns a new parquet reader
* Open the parquet file from S3 using the supplied aws client [,commands]
* and params. The params have to include `Bucket` and `Key` to the file
* requested. If using v3 of the aws sdk, combine the client and commands
* into an object with keys matching the original module names, and do not
* instantiate the commands; pass them as classes/modules. This function
* returns a new parquet reader or throws an Error.
*/
static async openS3(client, params, options) {
let envelopeReader = await ParquetEnvelopeReader.openS3(client, params, options);
let envelopeReader;
/*
* sanity checks for client and params arguments
*/
if (! client){
throw new Error("Missing S3 client object argument. Pass an instantiated "
+ "AWS.S3 object (v2) or S3Client object (v3) from the AWS SDK."
);
}
// throw common error in case of null dereference, or missing property
try {
let {Key, Bucket} = params;
if (! Key || ! Bucket) throw "";
} catch (e){
throw new Error(
"Invalid params argument. Must be in the form {Key: 'x', Bucket: 'y'}."
);
}
/*
* determine which aws client should be assumed for envelopeReader (v2 | v3)
*/
try {
if ('function' === typeof client.getObject){
// S3 client v2
envelopeReader = await ParquetEnvelopeReader.openS3(client, params, options);
}
else{ // S3 client v3
let {S3Client:c, HeadObjectCommand:h, GetObjectCommand:g} = client;
envelopeReader = await ParquetEnvelopeReader.openS3v3(c, h, g, params, options);
}
} catch (e){
throw new Error("Error accessing S3 bucket: " + e.message);
}
return this.openEnvelopeReader(envelopeReader, options);
}

Expand Down Expand Up @@ -327,6 +362,53 @@ class ParquetEnvelopeReader {
return new ParquetEnvelopeReader(readFn, closeFn, buffer.length, options);
}

static async openS3v3(client, HeadObjectCommand, GetObjectCommand, params, options) {
let fileStat = async () => {
try {
let headObjectCommand = await client.send(new HeadObjectCommand (params));
return headObjectCommand.ContentLength;
} catch (e){
// having params match command names makes e.message clear to user
throw new Error("rejected headObjectCommand: " + e.message);
}
}

let readFn = async (offset, length, file) => {
if (file) {
throw new Error('external references are not supported');
}
let range = `bytes=${offset}-${offset+length-1}`;
try {
let getObjectCommand = await client.send(
new GetObjectCommand({Range: range, ...params})
);
let body = await new Promise ((resolve, reject) => {
let bodyBuffer, data;
getObjectCommand.Body.on('error', reject);
getObjectCommand.Body.on('end', function() {
resolve(bodyBuffer);
});
getObjectCommand.Body.on('readable', function() {
while (data = this.read()) {
if (bodyBuffer) {
bodyBuffer = Buffer.concat([bodyBuffer, data]);
} else {
bodyBuffer = data;
}
}
});
});
return body;
} catch (e) {
throw new Error("rejected getObject command " + e.message);
}
};

let closeFn = () => ({});

return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
}

static async openS3(client, params, options) {
let fileStat = async () => client.headObject(params).promise().then(d => d.ContentLength);

Expand Down