From 571a38fb6d1825233afacf25a2cead555c571f86 Mon Sep 17 00:00:00 2001 From: lenin-falkonry Date: Thu, 9 Nov 2023 07:08:25 -0800 Subject: [PATCH 1/2] optimization for s3-calls --- cpp/CMakePresets.json | 3 - cpp/src/arrow/filesystem/s3fs.cc | 191 ++++++++++++++----- python/examples/minimal_build/Dockerfile-ubi | 34 ++++ python/examples/minimal_build/build_conda.sh | 29 +-- python/examples/minimal_build/build_venv.sh | 67 +++++-- python/pyarrow/parquet/core.py | 7 +- 6 files changed, 237 insertions(+), 94 deletions(-) create mode 100644 python/examples/minimal_build/Dockerfile-ubi diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index d714af54dd..a7c41cf72b 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -149,10 +149,7 @@ { "name": "features-python-maximal", "inherits": [ - "features-cuda", "features-filesystems", - "features-flight", - "features-gandiva", "features-main", "features-python-minimal" ], diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index a22d9c10be..0991b58f1d 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/filesystem/s3fs.h" +#include "arrow/result.h" #include #include @@ -105,6 +106,7 @@ using internal::TaskGroup; using internal::ToChars; using internal::Uri; using io::internal::SubmitIO; +using internal::GetEnvVarNative; namespace fs { @@ -1155,6 +1157,7 @@ class ObjectInputFile final : public io::RandomAccessFile { // so I chose the safer value. // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html) static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024; +static constexpr int64_t kMultipartThreshold = 5 * 1024 * 1024; // An OutputStream that writes to a S3 object class ObjectOutputStream final : public io::OutputStream { @@ -1179,6 +1182,24 @@ class ObjectOutputStream final : public io::OutputStream { } Status Init() { + //If the size of the file is less than 1GB dont do multipart + + int size_enforced_key = metadata_ ? metadata_->FindKey("falkonry:write_options:file_size") : -1; + ARROW_LOG(DEBUG) << "!!! size_enforced_key = " << size_enforced_key; + if (size_enforced_key > -1) { + closed_ = true; + auto maybe_value = metadata_->value(size_enforced_key); + ARROW_LOG(DEBUG) << "!!! size_detected = " << maybe_value; + if (std::stoi(maybe_value) < (kMultipartThreshold * 1024)) { + disable_mulitpart_ = true; + ARROW_LOG(DEBUG) << "!!! Non multipart upload"; + closed_ = false; + return Status::OK(); + } + } + ARROW_LOG(DEBUG) << "!!! multipart upload"; + disable_mulitpart_ = false; + // Initiate the multi-part upload S3Model::CreateMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); @@ -1210,7 +1231,8 @@ class ObjectOutputStream final : public io::OutputStream { } Status Abort() override { - if (closed_) { + ARROW_LOG(DEBUG) << "!!! Abort upload"; + if (closed_ || disable_mulitpart_) { return Status::OK(); } @@ -1235,11 +1257,13 @@ class ObjectOutputStream final : public io::OutputStream { // OutputStream interface Status Close() override { + ARROW_LOG(DEBUG) << "!!! Close upload"; auto fut = CloseAsync(); return fut.status(); } Future<> CloseAsync() override { + ARROW_LOG(DEBUG) << "!!! Close upload"; if (closed_) return Status::OK(); if (current_part_) { @@ -1247,6 +1271,10 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(CommitCurrentPart()); } + if(disable_mulitpart_) + return Status::OK(); + + ARROW_LOG(DEBUG) << "!!! Close Multipart upload"; // S3 mandates at least one part, upload an empty one if necessary if (part_number_ == 1) { RETURN_NOT_OK(UploadPart("", 0)); @@ -1300,6 +1328,21 @@ class ObjectOutputStream final : public io::OutputStream { Status DoWrite(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + if(disable_mulitpart_) { + ARROW_LOG(DEBUG) << "!!! Buffering Non-Multipart upload"; + if (!current_part_) { + ARROW_ASSIGN_OR_RAISE( + current_part_, + io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool())); + current_part_size_ = 0; + } + RETURN_NOT_OK(current_part_->Write(data, nbytes)); + pos_ += nbytes; + current_part_size_ += nbytes; + return Status::OK(); + } + + ARROW_LOG(DEBUG) << "!!! Multipart upload"; if (closed_) { return Status::Invalid("Operation on closed stream"); } @@ -1359,51 +1402,67 @@ class ObjectOutputStream final : public io::OutputStream { Status UploadPart(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { - S3Model::UploadPartRequest req; - req.SetBucket(ToAwsString(path_.bucket)); - req.SetKey(ToAwsString(path_.key)); - req.SetUploadId(upload_id_); - req.SetPartNumber(part_number_); - req.SetContentLength(nbytes); - - if (!background_writes_) { + if(disable_mulitpart_) { + ARROW_LOG(DEBUG) << "!!! Non multipart upload finalizing"; + S3Model::PutObjectRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); req.SetBody(std::make_shared(data, nbytes)); - auto outcome = client_->UploadPart(req); + auto outcome = client_->PutObject(std::move(req)); if (!outcome.IsSuccess()) { - return UploadPartError(req, outcome); - } else { - AddCompletedPart(upload_state_, part_number_, outcome.GetResult()); + return ErrorToStatus( + std::forward_as_tuple("When uploading part for key '", req.GetKey(), + "' in bucket '", req.GetBucket(), "': "), + "PutObject", outcome.GetError()); } } else { - // If the data isn't owned, make an immutable copy for the lifetime of the closure - if (owned_buffer == nullptr) { - ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); - memcpy(owned_buffer->mutable_data(), data, nbytes); + ARROW_LOG(DEBUG) << "!!! multipart upload intermediate"; + S3Model::UploadPartRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetPartNumber(part_number_); + req.SetContentLength(nbytes); + + if (!background_writes_) { + req.SetBody(std::make_shared(data, nbytes)); + auto outcome = client_->UploadPart(req); + if (!outcome.IsSuccess()) { + return UploadPartError(req, outcome); + } else { + AddCompletedPart(upload_state_, part_number_, outcome.GetResult()); + } } else { - DCHECK_EQ(data, owned_buffer->data()); - DCHECK_EQ(nbytes, owned_buffer->size()); - } - req.SetBody( - std::make_shared(owned_buffer->data(), owned_buffer->size())); + // If the data isn't owned, make an immutable copy for the lifetime of the closure + if (owned_buffer == nullptr) { + ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); + memcpy(owned_buffer->mutable_data(), data, nbytes); + } else { + DCHECK_EQ(data, owned_buffer->data()); + DCHECK_EQ(nbytes, owned_buffer->size()); + } + req.SetBody( + std::make_shared(owned_buffer->data(), owned_buffer->size())); - { - std::unique_lock lock(upload_state_->mutex); - if (upload_state_->parts_in_progress++ == 0) { - upload_state_->pending_parts_completed = Future<>::Make(); + { + std::unique_lock lock(upload_state_->mutex); + if (upload_state_->parts_in_progress++ == 0) { + upload_state_->pending_parts_completed = Future<>::Make(); + } } + auto client = client_; + ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() { + return client->UploadPart(req); + })); + // The closure keeps the buffer and the upload state alive + auto state = upload_state_; + auto part_number = part_number_; + auto handler = [owned_buffer, state, part_number, + req](const Result& result) -> void { + HandleUploadOutcome(state, part_number, req, result); + }; + fut.AddCallback(std::move(handler)); } - auto client = client_; - ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() { - return client->UploadPart(req); - })); - // The closure keeps the buffer and the upload state alive - auto state = upload_state_; - auto part_number = part_number_; - auto handler = [owned_buffer, state, part_number, - req](const Result& result) -> void { - HandleUploadOutcome(state, part_number, req, result); - }; - fut.AddCallback(std::move(handler)); } ++part_number_; @@ -1478,6 +1537,7 @@ class ObjectOutputStream final : public io::OutputStream { Aws::String upload_id_; bool closed_ = true; + bool disable_mulitpart_ = false; int64_t pos_ = 0; int32_t part_number_ = 1; std::shared_ptr current_part_; @@ -2285,20 +2345,26 @@ Result S3FileSystem::GetFileInfo(const std::string& s) { return ErrorToStatus(msg, "HeadObject", outcome.GetError(), impl_->options().region); } - // Not found => perhaps it's an empty "directory" - ARROW_ASSIGN_OR_RAISE(bool is_dir, impl_->IsEmptyDirectory(path, &outcome)); - if (is_dir) { - info.set_type(FileType::Directory); + auto maybe_env_var = GetEnvVarNative("ARROW_S3_OPTIMIZED_KEY_LOOKUP"); + if (maybe_env_var.ok()) { + info.set_type(FileType::NotFound); return info; - } - // Not found => perhaps it's a non-empty "directory" - ARROW_ASSIGN_OR_RAISE(is_dir, impl_->IsNonEmptyDirectory(path)); - if (is_dir) { - info.set_type(FileType::Directory); } else { - info.set_type(FileType::NotFound); + // Not found => perhaps it's an empty "directory" + ARROW_ASSIGN_OR_RAISE(bool is_dir, impl_->IsEmptyDirectory(path, &outcome)); + if (is_dir) { + info.set_type(FileType::Directory); + return info; + } + // Not found => perhaps it's a non-empty "directory" + ARROW_ASSIGN_OR_RAISE(is_dir, impl_->IsNonEmptyDirectory(path)); + if (is_dir) { + info.set_type(FileType::Directory); + } else { + info.set_type(FileType::NotFound); + } + return info; } - return info; } } @@ -2691,7 +2757,32 @@ Status InitializeS3(const S3GlobalOptions& options) { } Status EnsureS3Initialized() { - return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status(); + auto log_level = S3LogLevel::Fatal; + + auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL"); + + if (result.ok()) { + // Extract, trim, and downcase the value of the enivronment variable + auto value = + arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe())); + + if (value == "fatal") { + log_level = S3LogLevel::Fatal; + } else if (value == "error") { + log_level = S3LogLevel::Error; + } else if (value == "warn") { + log_level = S3LogLevel::Warn; + } else if (value == "info") { + log_level = S3LogLevel::Info; + } else if (value == "debug") { + log_level = S3LogLevel::Debug; + } else if (value == "trace") { + log_level = S3LogLevel::Trace; + } else if (value == "off") { + log_level = S3LogLevel::Off; + } + } + return EnsureAwsInstanceInitialized({log_level}).status(); } Status FinalizeS3() { diff --git a/python/examples/minimal_build/Dockerfile-ubi b/python/examples/minimal_build/Dockerfile-ubi new file mode 100644 index 0000000000..5bf118a9ec --- /dev/null +++ b/python/examples/minimal_build/Dockerfile-ubi @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM registry.access.redhat.com/ubi9/python-39:1-143.1697559877 + +USER 0 + +RUN dnf update -y && \ + dnf install -y \ + autoconf \ + gcc \ + gcc-g++ \ + git \ + wget \ + make \ + cmake \ + ninja-build \ + python3-devel + +RUN pip3 install -U pip setuptools diff --git a/python/examples/minimal_build/build_conda.sh b/python/examples/minimal_build/build_conda.sh index a2a1fcbf94..cde7cc7c7c 100755 --- a/python/examples/minimal_build/build_conda.sh +++ b/python/examples/minimal_build/build_conda.sh @@ -21,12 +21,12 @@ set -e #---------------------------------------------------------------------- # Change this to whatever makes sense for your system -HOME= +HOME=/io/dist MINICONDA=$HOME/miniconda-for-arrow LIBRARY_INSTALL_DIR=$HOME/local-libs CPP_BUILD_DIR=$HOME/arrow-cpp-build ARROW_ROOT=/arrow -PYTHON=3.10 +PYTHON=3.9 git config --global --add safe.directory $ARROW_ROOT @@ -75,19 +75,7 @@ export ARROW_HOME=$CONDA_PREFIX mkdir -p $CPP_BUILD_DIR pushd $CPP_BUILD_DIR -cmake -GNinja \ - -DCMAKE_BUILD_TYPE=DEBUG \ - -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ - -DCMAKE_INSTALL_LIBDIR=lib \ - -DARROW_WITH_BZ2=ON \ - -DARROW_WITH_ZLIB=ON \ - -DARROW_WITH_ZSTD=ON \ - -DARROW_WITH_LZ4=ON \ - -DARROW_WITH_SNAPPY=ON \ - -DARROW_WITH_BROTLI=ON \ - -DARROW_PYTHON=ON \ - $ARROW_ROOT/cpp - +cmake --preset ninja-release-python-maximal -DCMAKE_INSTALL_PREFIX=$ARROW_HOME $ARROW_ROOT/cpp ninja install popd @@ -99,12 +87,15 @@ pushd $ARROW_ROOT/python rm -rf build/ # remove any pesky pre-existing build directory export CMAKE_PREFIX_PATH=${ARROW_HOME}${CMAKE_PREFIX_PATH:+:${CMAKE_PREFIX_PATH}} -export PYARROW_BUILD_TYPE=Debug +export PYARROW_BUILD_TYPE=RELEASE export PYARROW_CMAKE_GENERATOR=Ninja # You can run either "develop" or "build_ext --inplace". Your pick -# python setup.py build_ext --inplace -python setup.py develop +pip install wheel # if not installed +python setup.py build_ext --build-type=$PYARROW_BUILD_TYPE \ + --bundle-arrow-cpp bdist_wheel --dist-dir $HOME + +#python setup.py develop -py.test pyarrow +#py.test pyarrow diff --git a/python/examples/minimal_build/build_venv.sh b/python/examples/minimal_build/build_venv.sh index 2f1bc4ed30..e689ceaed8 100755 --- a/python/examples/minimal_build/build_venv.sh +++ b/python/examples/minimal_build/build_venv.sh @@ -20,7 +20,7 @@ set -e #---------------------------------------------------------------------- # Change this to whatever makes sense for your system - +HOME=/io/dist WORKDIR=${WORKDIR:-$HOME} MINICONDA=$WORKDIR/miniconda-for-arrow LIBRARY_INSTALL_DIR=$WORKDIR/local-libs @@ -29,6 +29,10 @@ ARROW_ROOT=/arrow export ARROW_HOME=$WORKDIR/dist export LD_LIBRARY_PATH=$ARROW_HOME/lib:$LD_LIBRARY_PATH +export PYARROW_WITH_PARQUET=1 +export PYARROW_WITH_DATASET=1 +export PYARROW_PARALLEL=4 + python3 -m venv $WORKDIR/venv source $WORKDIR/venv/bin/activate @@ -43,19 +47,36 @@ mkdir -p $CPP_BUILD_DIR pushd $CPP_BUILD_DIR cmake -GNinja \ - -DCMAKE_BUILD_TYPE=DEBUG \ - -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ - -DCMAKE_INSTALL_LIBDIR=lib \ - -DARROW_BUILD_STATIC=OFF \ - -DARROW_WITH_BZ2=ON \ - -DARROW_WITH_ZLIB=ON \ - -DARROW_WITH_ZSTD=ON \ - -DARROW_WITH_LZ4=ON \ - -DARROW_WITH_SNAPPY=ON \ - -DARROW_WITH_BROTLI=ON \ - -DARROW_PYTHON=ON \ - $ARROW_ROOT/cpp - + -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ + -DCMAKE_INSTALL_LIBDIR=lib \ + -DCMAKE_UNITY_BUILD=ON \ + -DARROW_ACERO="ON" \ + -DARROW_BUILD_STATIC="OFF" \ + -DARROW_COMPUTE="ON" \ + -DARROW_CSV="ON" \ + -DARROW_DATASET="ON" \ + -DARROW_FILESYSTEM="ON" \ + -DARROW_GCS="ON" \ + -DARROW_HDFS="ON" \ + -DARROW_JSON="ON" \ + -DARROW_MIMALLOC="ON" \ + -DARROW_ORC="ON" \ + -DARROW_PARQUET="ON" \ + -DARROW_S3="ON" \ + -DARROW_SUBSTRAIT="ON" \ + -DARROW_WITH_BROTLI="ON" \ + -DARROW_WITH_BZ2="ON" \ + -DARROW_WITH_LZ4="ON" \ + -DARROW_WITH_RE2="OFF" \ + -DARROW_WITH_SNAPPY="ON" \ + -DARROW_WITH_UTF8PROC="OFF" \ + -DARROW_WITH_ZLIB="ON" \ + -DARROW_WITH_ZSTD="ON" \ + -DARROW_WITH_UTF8PROC="ON" \ + -DARROW_WITH_BACKTRACE="ON" \ + -DCMAKE_BUILD_TYPE="Release" \ + -DPARQUET_REQUIRE_ENCRYPTION="ON" \ + $ARROW_ROOT/cpp ninja install popd @@ -67,14 +88,18 @@ pushd $ARROW_ROOT/python rm -rf build/ # remove any pesky pre-existing build directory export CMAKE_PREFIX_PATH=${ARROW_HOME}${CMAKE_PREFIX_PATH:+:${CMAKE_PREFIX_PATH}} -export PYARROW_BUILD_TYPE=Debug +export PYARROW_BUILD_TYPE=Release +export PYARROW_WITH_GCS=1 +export PYARROW_WITH_PARQUET=1 +export PYARROW_WITH_DATASET=1 +export PYARROW_WITH_S3=1 +export PYARROW_WITH_ORC=1 +export PYARROW_WITH_PARQUET_ENCRYPTION=1 +export PYARROW_WITH_HDFS=1 export PYARROW_CMAKE_GENERATOR=Ninja # You can run either "develop" or "build_ext --inplace". Your pick -# python setup.py build_ext --inplace -python setup.py develop - -pip install -r $ARROW_ROOT/python/requirements-test.txt - -py.test pyarrow +pip install wheel # if not installed +python setup.py build_ext --build-type=$PYARROW_BUILD_TYPE \ + --bundle-arrow-cpp bdist_wheel --dist-dir $HOME diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 018902dde4..3e91b9cfda 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -958,6 +958,7 @@ def __init__(self, where, schema, filesystem=None, encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, + file_size=None, store_schema=True, **options): if use_deprecated_int96_timestamps is None: @@ -992,8 +993,12 @@ def __init__(self, where, schema, filesystem=None, # ARROW-10480: do not auto-detect compression. While # a filename like foo.parquet.gz is nonconforming, it # shouldn't implicitly apply compression. + if file_size is not None: + metadata = {'falkonry:write_options:file_size': file_size} + else: + metadata = None sink = self.file_handle = filesystem.open_output_stream( - path, compression=None) + path, compression=None, metadata=metadata) else: sink = where self._metadata_collector = options.pop('metadata_collector', None) From 417936605139462b8e7158214e66c78af77a0063 Mon Sep 17 00:00:00 2001 From: lenin-falkonry <125077190+lenin-falkonry@users.noreply.github.com> Date: Thu, 9 Nov 2023 07:17:14 -0800 Subject: [PATCH 2/2] Update README.md --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 98e1512bac..0294807413 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ +### Build Notes +- from inside the directory python/examples/minimal_build +- create a docker image using +- `docker build -t arrow_ubi -f Dockerfile-ubi .` +- Run `docker run --rm -t -i -v $PWD:/io -v $PWD/../../..:/arrow arrow_ubi /io/build_venv.sh` + It will produce a whl file. +