Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with rust-s3 and caching #53

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

mateuszkj
Copy link

@mateuszkj mateuszkj commented Apr 10, 2022

This merge requests provide:

  • update to datafusion master branch
  • drop aws-sdk-rust in favor of much simpler rust-s3
  • Tested with self-hosted S3 servers: MinIO, SeaweedFS, s3-server (https://crates.io/crates/s3-server) (no unit tests)
  • caching buffers
  • concurrently pre-feching metadata for all parquet files during listing
  • invalidate cached parquet metadata when file last modification changed
  • Tested with partitioned tables (no unit tests)

Caching

Motivation why caching is needed is because current datafusion implementation is not optimal for network file operations:

  • datafusion fetch and parse metadata for every parquet file 2 times during every query (3 time if you count registering table). Once during creating physical plan and second while executing it.
  • datafusion sometimes requests very small file chucks (I got a lots of 14 bytes queries)
  • datafusion executing plan don't read file in sequence. It's start from beginning for every column.

Results of very unscientific tests on local machine with my example SQL query:

  • Query using filesystem took 0.5 seconds
  • Query using S3 without cache took 25 seconds
  • Query using S3 with default cache options took 0.8 seconds

Why not aws-sdk-rust?

I don't know how AWS works, units test on my local machine took several minutes to complete (with success). I think it was because of IMDS timeouts. I couldn't find how to disable it, so I have chosen much simpler implementation of S3 client.

I had also problems with rust_s3 and other S3 self-hosted server. I created MRs for fix it: durch/rust-s3#267 and datenlord/s3-server#114

What can be done next?

  • check etag/last modification in every file query and return error when file has been changed during query execution.
@matthewmturner
Copy link
Collaborator

thank you very much for the contribution @mateuszkj 👍

CC @seddonm1 (as the original core author) and @houqp (has helped with reviewing changes / design here and has experience with delta-rs which i believe connects to s3 with rusoto).

To confirm the primary reason for switching to rust-s3 is performance?

Indeed rust-s3 does look much lighter, has significant usage, appears to be actively maintained (release a month ago), and it seems it might enable GCS as well. however, im a little concerned about moving from official aws implementation. i think we know that the official implementation will continue to be maintained and enhanced moving forward while its unclear what what the status of 3rd party libraries will be (i.e. rusoto going into maintenance mode https://github.com/rusoto/rusoto). my specific concern being we will have to rewrite again if the same thing happens to this library. can you provide any insight on this?

regarding the cache - can you please include a more implementation details and a description of how specifically it is used? Could we perhaps start with just using the cache functionality with aws-sdk-rust?

regarding the IMDS timeout - do you have an example in other languages of how to disable it that we could use as reference? and rust-s3 allowed disabling IMDS timeout?

regarding testing other s3 servers - if it isnt too much work would you be able to run CI against them using docker files (it looks like seaweed makes that easy)? I think this would be a great value add to the project regardless and would make comparing results easier.

Given the size of the change i will need some time to review the code - so please bare with me as i will likely need at least a few days to review and test.

thank you again for the contribution.

@mateuszkj
Copy link
Author

mateuszkj commented Apr 10, 2022

thanks for your fast response @matthewmturner

After second thought I think this MR should not be merged. It can be done better. Can you mark this MR as draft?.

Moving cache mechanism to different crate

Now I consider that caching mechanism could be moved to its own ObjectStore implementation in another crate.

for example

let s3_file_system = S3FileSystem::new(...).await?;
let s3_cached_file_system = Arc::new(ObjectStoreCache::new(s3_file_system, ObjectStoreCacheOptions::from_envs()?));
runtime_env.register_object_store("s3", s3_cached_file_system);

with cache in different crate, we could have two S3 client implementations or other remote storage systems.

Cache description

First let look how datafusion read files without cache.

To disable cache set environment variables:

export RUST_LOG=info,datafusion_objectstore_s3=debug
export DATAFUSION_S3_MIN_REQUEST_SIZE=0
export DATAFUSION_S3_USE_METADATA_PREFETCH=false

Below is example of querying data using SQL simillar to:

SELECT
  s.col3_text,
  s.col4_int
FROM stops s
WHERE s.col1_time >= CAST('2022-03-02 00:00:00') AS Timestamp)
  AND s.col1_time < CAST('2022-03-03 00:00:00') AS Timestamp)
  AND s.col2_text = 'xxx'
ORDER BY s.col1_time;

Query uses 4 columns and filters by time and some text field. I have multiple *.parquet files in my table (89 files). I greped only one for clarity.

# Regier table. datafusion read metadata of every *.parquet file to collect schemas.
2022-04-10T18:28:13.751524Z direct id: 25 file: s3://data-2022-02.parquet start: 982029 len: 65536, rlen: 65536
2022-04-10T18:28:13.760407Z direct id: 25 file: s3://data-2022-02.parquet start: 951802 len: 30227, rlen: 30227

# datafusion read metadata of every *.parquet file to create phisical plan
2022-04-10T18:28:17.134288Z direct id: 203 file: s3://data-2022-02.parquet start: 982029 len: 65536, rlen: 65536
2022-04-10T18:28:17.150239Z direct id: 203 file: s3://data-2022-02.parquet start: 951802 len: 30227, rlen: 30227

# datafusion read metadata of every *.parquet file to execute phisical plan.
2022-04-10T18:28:19.187678Z direct id: 290 file: s3://data-2022-02.parquet start: 982029 len: 65536, rlen: 65536
2022-04-10T18:28:19.203489Z direct id: 290 file: s3://data-2022-02.parquet start: 951802 len: 30227, rlen: 30227

# datafusion read data from first column (iterdate over row groups)
2022-04-10T18:28:19.223254Z direct id: 290 file: s3://data-2022-02.parquet start: 220862 len: 14, rlen: 14
2022-04-10T18:28:19.235615Z direct id: 290 file: s3://data-2022-02.parquet start: 225370 len: 14, rlen: 14
2022-04-10T18:28:19.248903Z direct id: 290 file: s3://data-2022-02.parquet start: 229878 len: 14, rlen: 14
2022-04-10T18:28:19.257919Z direct id: 290 file: s3://data-2022-02.parquet start: 234386 len: 14, rlen: 14
2022-04-10T18:28:19.271822Z direct id: 290 file: s3://data-2022-02.parquet start: 238894 len: 14, rlen: 14
2022-04-10T18:28:19.284739Z direct id: 290 file: s3://data-2022-02.parquet start: 462074 len: 14, rlen: 14
2022-04-10T18:28:19.298708Z direct id: 290 file: s3://data-2022-02.parquet start: 466582 len: 14, rlen: 14
2022-04-10T18:28:19.313049Z direct id: 290 file: s3://data-2022-02.parquet start: 471090 len: 14, rlen: 14
2022-04-10T18:28:19.325878Z direct id: 290 file: s3://data-2022-02.parquet start: 475598 len: 14, rlen: 14
2022-04-10T18:28:19.338194Z direct id: 290 file: s3://data-2022-02.parquet start: 480106 len: 14, rlen: 14
2022-04-10T18:28:19.350122Z direct id: 290 file: s3://data-2022-02.parquet start: 704027 len: 14, rlen: 14
2022-04-10T18:28:19.362426Z direct id: 290 file: s3://data-2022-02.parquet start: 708535 len: 14, rlen: 14
2022-04-10T18:28:19.374202Z direct id: 290 file: s3://data-2022-02.parquet start: 713043 len: 14, rlen: 14
2022-04-10T18:28:19.386540Z direct id: 290 file: s3://data-2022-02.parquet start: 717551 len: 14, rlen: 14
2022-04-10T18:28:19.400017Z direct id: 290 file: s3://data-2022-02.parquet start: 722059 len: 14, rlen: 14
2022-04-10T18:28:19.412182Z direct id: 290 file: s3://data-2022-02.parquet start: 947346 len: 14, rlen: 14

# datafusion read data from second column (iterdate over row groups)
2022-04-10T18:28:19.424140Z direct id: 290 file: s3://data-2022-02.parquet start: 220981 len: 14, rlen: 14
2022-04-10T18:28:19.436095Z direct id: 290 file: s3://data-2022-02.parquet start: 225489 len: 14, rlen: 14
2022-04-10T18:28:19.448831Z direct id: 290 file: s3://data-2022-02.parquet start: 229997 len: 14, rlen: 14
2022-04-10T18:28:19.461895Z direct id: 290 file: s3://data-2022-02.parquet start: 234505 len: 14, rlen: 14
2022-04-10T18:28:19.474107Z direct id: 290 file: s3://data-2022-02.parquet start: 239013 len: 14, rlen: 14
2022-04-10T18:28:19.488363Z direct id: 290 file: s3://data-2022-02.parquet start: 462193 len: 14, rlen: 14
2022-04-10T18:28:19.503114Z direct id: 290 file: s3://data-2022-02.parquet start: 466701 len: 14, rlen: 14
2022-04-10T18:28:19.515595Z direct id: 290 file: s3://data-2022-02.parquet start: 471209 len: 14, rlen: 14
2022-04-10T18:28:19.527923Z direct id: 290 file: s3://data-2022-02.parquet start: 475717 len: 14, rlen: 14
2022-04-10T18:28:19.539998Z direct id: 290 file: s3://data-2022-02.parquet start: 480225 len: 14, rlen: 14
2022-04-10T18:28:19.553284Z direct id: 290 file: s3://data-2022-02.parquet start: 704146 len: 14, rlen: 14
2022-04-10T18:28:19.566201Z direct id: 290 file: s3://data-2022-02.parquet start: 708654 len: 14, rlen: 14
2022-04-10T18:28:19.578198Z direct id: 290 file: s3://data-2022-02.parquet start: 713162 len: 14, rlen: 14
2022-04-10T18:28:19.589092Z direct id: 290 file: s3://data-2022-02.parquet start: 717670 len: 14, rlen: 14
2022-04-10T18:28:19.602429Z direct id: 290 file: s3://data-2022-02.parquet start: 722178 len: 14, rlen: 14
2022-04-10T18:28:19.615445Z direct id: 290 file: s3://data-2022-02.parquet start: 947465 len: 14, rlen: 14

# datafusion read data from third column (iterdate over row groups)
2022-04-10T18:28:19.628900Z direct id: 290 file: s3://data-2022-02.parquet start: 221621 len: 14, rlen: 14
2022-04-10T18:28:19.642048Z direct id: 290 file: s3://data-2022-02.parquet start: 226129 len: 14, rlen: 14
2022-04-10T18:28:19.656429Z direct id: 290 file: s3://data-2022-02.parquet start: 230637 len: 14, rlen: 14
2022-04-10T18:28:19.669087Z direct id: 290 file: s3://data-2022-02.parquet start: 235145 len: 14, rlen: 14
2022-04-10T18:28:19.681621Z direct id: 290 file: s3://data-2022-02.parquet start: 239653 len: 14, rlen: 14
2022-04-10T18:28:19.696082Z direct id: 290 file: s3://data-2022-02.parquet start: 462833 len: 14, rlen: 14
2022-04-10T18:28:19.708401Z direct id: 290 file: s3://data-2022-02.parquet start: 467341 len: 14, rlen: 14
2022-04-10T18:28:19.721417Z direct id: 290 file: s3://data-2022-02.parquet start: 471849 len: 14, rlen: 14
2022-04-10T18:28:19.733059Z direct id: 290 file: s3://data-2022-02.parquet start: 476357 len: 14, rlen: 14
2022-04-10T18:28:19.745427Z direct id: 290 file: s3://data-2022-02.parquet start: 480865 len: 14, rlen: 14
2022-04-10T18:28:19.758242Z direct id: 290 file: s3://data-2022-02.parquet start: 704786 len: 14, rlen: 14
2022-04-10T18:28:19.770439Z direct id: 290 file: s3://data-2022-02.parquet start: 709294 len: 14, rlen: 14
2022-04-10T18:28:19.783198Z direct id: 290 file: s3://data-2022-02.parquet start: 713802 len: 14, rlen: 14
2022-04-10T18:28:19.795203Z direct id: 290 file: s3://data-2022-02.parquet start: 718310 len: 14, rlen: 14
2022-04-10T18:28:19.808062Z direct id: 290 file: s3://data-2022-02.parquet start: 722818 len: 14, rlen: 14
2022-04-10T18:28:19.827283Z direct id: 290 file: s3://data-2022-02.parquet start: 948105 len: 14, rlen: 14

# datafusion read data from four column (iterdate over row groups)
2022-04-10T18:28:19.840199Z direct id: 290 file: s3://data-2022-02.parquet start: 223858 len: 14, rlen: 14
2022-04-10T18:28:19.853255Z direct id: 290 file: s3://data-2022-02.parquet start: 228366 len: 14, rlen: 14
2022-04-10T18:28:19.866894Z direct id: 290 file: s3://data-2022-02.parquet start: 232874 len: 14, rlen: 14
2022-04-10T18:28:19.878996Z direct id: 290 file: s3://data-2022-02.parquet start: 237382 len: 14, rlen: 14
2022-04-10T18:28:19.892441Z direct id: 290 file: s3://data-2022-02.parquet start: 241890 len: 14, rlen: 14
2022-04-10T18:28:19.906870Z direct id: 290 file: s3://data-2022-02.parquet start: 465070 len: 14, rlen: 14
2022-04-10T18:28:19.925541Z direct id: 290 file: s3://data-2022-02.parquet start: 469578 len: 14, rlen: 14
2022-04-10T18:28:19.939221Z direct id: 290 file: s3://data-2022-02.parquet start: 474086 len: 14, rlen: 14
2022-04-10T18:28:19.951420Z direct id: 290 file: s3://data-2022-02.parquet start: 478594 len: 14, rlen: 14
2022-04-10T18:28:19.964162Z direct id: 290 file: s3://data-2022-02.parquet start: 483102 len: 14, rlen: 14
2022-04-10T18:28:19.977930Z direct id: 290 file: s3://data-2022-02.parquet start: 707023 len: 14, rlen: 14
2022-04-10T18:28:19.992568Z direct id: 290 file: s3://data-2022-02.parquet start: 711531 len: 14, rlen: 14
2022-04-10T18:28:20.005620Z direct id: 290 file: s3://data-2022-02.parquet start: 716039 len: 14, rlen: 14
2022-04-10T18:28:20.016705Z direct id: 290 file: s3://data-2022-02.parquet start: 720547 len: 14, rlen: 14
2022-04-10T18:28:20.028415Z direct id: 290 file: s3://data-2022-02.parquet start: 725055 len: 14, rlen: 14
2022-04-10T18:28:20.040173Z direct id: 290 file: s3://data-2022-02.parquet start: 950342 len: 14, rlen: 14

What we see in logs:

  • 70 requests for one file. 2744 request for total of 89 files.
  • 3 requests for the same metadata from parquet file (and 3 * 89 in total).
  • During creating physical plan metadata from parquet files are read in sequence on single thread.
  • Datafusion always perform listing files before reading metadata during creating physical plan
  • Datafusion first iterate over (column id, row group id), not (row group id, column id).
  • I got lot's of small requests with 14 bytes.
  • file data-2022-02.parquet have 0 records with my example SQL, despite this datafusion reached this parquet file. I think I have statistics in my *.parquet file.

My understanding to implement cache which was:

  • Store metadata for all parquet files in RAM. So, we don't have to read it from S3 server every single SQL query.
  • Read metadata for all parquet files in parallel/concurrently. Listing files looks like good place for it. It's async and is done every time datafusion create physical plan.
  • Invalidate metadata cache when file last modification time changed.
  • Use multiple cache buffers per file, so when datafusion read next column there is good change we already have that data (my implementation is not idea here)

CI with other self-hosted S3 servers

About running CI with other self-hosted S3 servers is great idea. I could do it when i will moving cache to another crate. But for now I want to focus on sth else.

aws-sdk-rust and rust-s3

About why I dropped aws-sdk-rust. I don't have experience with AWS. I don't have a clue what IMDS is and why is required (?). When I run unit tests on my linux machine I got logs with IMDS timeout for every S3 request (somehow unit tests passed successfully but took long time to complete). Maybe I have to install something or disable (?). After spending haft of day on this problem i just rewritten it to rust-s3 which worked at first try and it acceptable fast.

One of drawbacks of rust-s3 is that only one person can publish to crates.io.

Maybe we need two S3 implementations, one for AWS services with official aws-sdk-rust crate and another for self-hosted S3 servers without AWS complexity. Or maybe IMDS complexity can be disabled with aws-sdk-rust.

@mateuszkj mateuszkj changed the title Rewite with rust-s3 and caching Apr 10, 2022
@mateuszkj mateuszkj changed the title Experiamet with rust-s3 and caching Apr 10, 2022
@matthewmturner matthewmturner marked this pull request as draft April 11, 2022 13:30
@matthewmturner
Copy link
Collaborator

Thanks for the detailed write up @mateuszkj - it makes sense.

I think that bringing up the cache on the main datafusion repo would be a good starting point to see what the community thinks about this. For example there is an active HDFS community using datafusion as well (https://github.com/datafusion-contrib/datafusion-objectstore-hdfs) and perhaps this could benefit them. In general there is a lot of focus on improving performance - and parallelism within datafusion (for example apache/datafusion#2199) so i think its a good time to bring it up. I can help with creating that if you would like.

Regarding CI and testing with other S3 providers - I can work on this. I think its a valuable addition to the project regardless and should help with whatever changes come from this.

I will look into reproducing the IMDS issue. I think once we have a better understanding of it then we can consider whether it makes sense to use alternative s3 implementations.

Thanks working with me on this!

@tustvold
Copy link

tustvold commented Apr 11, 2022

Hi, thanks for working on this. I'm currently actively working on making it so that IOx can make use of more of the DataFusion functionality for object storage, where it currently does its own thing. I am therefore very interested in helping drive improvements in this space 👍

70 requests for one file. 2744 request for total of 89 files.

This might be a behaviour we want to fix upstream in DataFusion/arrow-rs, as opposed to working around with caching. For reference apache/arrow-rs#1473 contains some investigative work refining the interface exposed by the parquet crate, and in particular apache/arrow-rs#1509 might be of interest. It is all still proposals and experiments at this stage, but I would be more than happy to help draft up some issues if you (or anyone else) wanted to lend a hand getting this over the line.

Store metadata for all parquet files in RAM. So, we don't have to read it from S3 server every single SQL query.

Not needing to scan or list object storage in order to plan the query seems very sensible. One thought might be to integrate with Object Storage at the TableProvider level, as opposed to the ObjectStore level to allow using a dedicated catalog. In IOx we use a custom catalog, but I could see adding support for a Hive compatible MetaStore as being a valuable feature for the DataFusion ecosystem.

Using caching to accelerate queries on datasets that lack a dedicated catalog is definitely still useful, just mentioning that typically one would have some sort of catalog. I've personally had a lot of success using lambdas to populate an AWS Glue catalog, but there are loads of alternatives in this space.

apache/datafusion#2079 might also be relevant and concerns how the TableProvider translates the catalog information into a physical query plan.

IMDS timeout for every S3 request

I'm not familiar with aws-sdk-rust, as I've only ever used rusoto, but IMDS is the mechanism by which instances running in AWS obtain IAM credentials with which to authorize with object storage. It does this by talking to an HTTP service running at a special IP. This won't exist when running on your local machine and so you might need to provide dummy AWS access credentials as environment variables, e.g. AWS_ACCESS_KEY_ID=local, or otherwise manipulate the credential chain being used by the SDK to make this work.

@matthewmturner
Copy link
Collaborator

@tustvold thank you for the insight - very helpful. personally, i am interested in learning about and implementing these types of performance improvements. so if you could draft some issues outlining what you are looking to accomplish i would be happy to try and help.

@tustvold
Copy link

Ok I'll see what I can do, I have quite a few plates in the air at the moment so might be a day or two before I can produce anything coherent, but I'll be sure to tag you on any issues once I file them 😀

@matthewmturner
Copy link
Collaborator

@tustvold No rush - I have a couple things I'll need to close before being able to start on that anyway. Thanks again.

@seddonm1
Copy link
Collaborator

Hi everyone. I wrote the base implementation so any blame can be directed at me 😉.

I think moving away from the official AWS SDK would be a mistake as they are developing a strong Rust practice so I would expect it to be better maintained than the others going forward (maintaining a library for someone else's API is a bit of a thankless task).

This was always a bit of a hacky implementation until Async datasources are properly implemented. It is good the IOx team is onboard as I hope we can arrive on one canonical implementation - that deals with all of the issues above. That means any caching ideas would be good to incorporate. 👍

@matthewmturner
Copy link
Collaborator

@seddonm1 No blame being directed - we wouldn't be having this conversation without your efforts.

@houqp
Copy link
Member

houqp commented Apr 12, 2022

As @tustvold mentioned, IMDS won't work for custom S3 implementations. If this is the problem that's causing the slow down, then it's likely that the official S3 client is not as bad as we observed here. It is usually used when a default credential chain is configured for the client, then it will use IMDS to discover what AWS environment the client is in and what methods to use to fetch the IAM credential for S3 access. The client might use IMDS to discover which region it's in as well. As a quick test, you should be able to turn it off by using a static client credential by hard coding the access keys and region. If that doesn't work, I recommend filing an issue in the upstream official AWS repo, it should be a fairly straight forward fix.

@houqp
Copy link
Member

houqp commented Apr 12, 2022

Another place that I think caching could help is at the object store level to perform look ahead scan, i.e. fetch the next chunk of data from S3 while the system is performing compute on the previous chunk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
5 participants