add path filtering in s3 connector (#9074)

GitOrigin-RevId: f3da52f6f9a49eae17ebf7fa46f6f166933f2896
This commit is contained in:
Sergey Kulik 2025-07-29 18:10:18 +02:00 committed by Manul from Pathway
parent a634ea8449
commit bcf383f05f
6 changed files with 142 additions and 0 deletions

View File

@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- `path_filter` parameter in `pw.io.s3.read` and `pw.io.minio.read` functions. It enables post-filtering of object paths using a wildcard pattern (`*`, `?`), allowing exclusion of paths that pass the main `path` filter but do not match `path_filter`.
### Changed
- Delta table compression logging has been improved: logs now include table names, and verbose messages have been streamlined while preserving details of important processing steps.

View File

@ -1,5 +1,6 @@
# Copyright © 2024 Pathway
import json
import os
import pathlib
@ -305,3 +306,120 @@ def test_s3_full_autodetect(tmp_path: pathlib.Path, s3_path: str):
model_output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
assert result.equals(expected)
def run_csv_reader_with_path_filter(
root_path: str, output_path: pathlib.Path, path_filter: str, expected_keys: set[int]
):
class InputSchema(pw.Schema):
key: int
value: str
G.clear()
table = pw.io.s3.read(
root_path,
format="csv",
mode="static",
path_filter=path_filter,
schema=InputSchema,
)
pw.io.jsonlines.write(table, output_path)
pw.run()
keys = set()
with open(output_path, "r") as f:
for row in f.readlines():
row_parsed = json.loads(row)
keys.add(row_parsed["key"])
assert keys == expected_keys
def test_s3_objects_filter(tmp_path: pathlib.Path, s3_path: str):
input_s3_path_csv = f"{s3_path}/input.csv"
input_s3_path_json = f"{s3_path}/input.json"
input_s3_path_dsv = f"{s3_path}/input.dsv"
input_contents_csv = "key,value\n1,Hello\n2,World"
input_contents_json = json.dumps({"key": 3, "value": "Bonjour"})
input_contents_dsv = "key,value\n4,Another\n5,Test"
output_path = tmp_path / "output.jsonl"
put_aws_object(input_s3_path_csv, input_contents_csv)
put_aws_object(input_s3_path_json, input_contents_json)
put_aws_object(input_s3_path_dsv, input_contents_dsv)
table = pw.io.s3.read(
f"s3://aws-integrationtest/{s3_path}",
format="plaintext_by_object",
mode="static",
path_filter="*.json",
with_metadata=True,
)
pw.io.jsonlines.write(table, output_path)
pw.run()
n_rows = 0
with open(output_path, "r") as f:
for row in f.readlines():
n_rows += 1
row_parsed = json.loads(row)
assert row_parsed["_metadata"]["path"].endswith(".json")
assert n_rows == 1
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}", output_path, "*.csv", {1, 2}
)
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}", output_path, "*.?sv", {1, 2, 4, 5}
)
def test_s3_objects_filter_complex_path(tmp_path: pathlib.Path, s3_path: str):
input_s3_path_csv = f"{s3_path}/one/two/three/input.csv"
input_s3_path_json = f"{s3_path}/one/three/input.csv"
input_s3_path_dsv = f"{s3_path}/one/two/five/input.csv"
input_contents_csv = "key,value\n1,Hello\n2,World"
input_contents_json = "key,value\n3,Bonjour"
input_contents_dsv = "key,value\n4,Another\n5,Test"
output_path = tmp_path / "output.jsonl"
put_aws_object(input_s3_path_csv, input_contents_csv)
put_aws_object(input_s3_path_json, input_contents_json)
put_aws_object(input_s3_path_dsv, input_contents_dsv)
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}",
output_path,
"*/one/*/*/*.csv",
{1, 2, 4, 5},
)
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}",
output_path,
"*/one/*/three/*.csv",
{1, 2},
)
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}",
output_path,
"*/one/two/*/*.csv",
{1, 2, 4, 5},
)
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}",
output_path,
"*/one/*.csv",
{1, 2, 3, 4, 5},
)
run_csv_reader_with_path_filter(
f"s3://aws-integrationtest/{s3_path}",
output_path,
"*/five/*.csv",
{4, 5},
)

View File

@ -66,6 +66,7 @@ def read(
with_metadata: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
path_filter: str | None = None,
downloader_threads_count: int | None = None,
name: str | None = None,
autocommit_duration_ms: int | None = 1500,
@ -107,6 +108,10 @@ def read(
it should be given in the format ``<field_name>: <path to be mapped>``,
where the path to be mapped needs to be a
`JSON Pointer (RFC 6901) <https://www.rfc-editor.org/rfc/rfc6901>`_.
path_filter: A wildcard pattern used to match full object paths. Supports ``*``
(any number of any characters, including none) and ``?`` (any single character).
If specified, only paths matching this pattern will be included. Applied as an
additional filter after the initial ``path`` matching.
downloader_threads_count: The number of threads created to download the contents
of the bucket under the given path. It defaults to the number of cores
available on the machine. It is recommended to increase the number of
@ -159,6 +164,7 @@ def read(
autocommit_duration_ms=autocommit_duration_ms,
name=name,
json_field_paths=json_field_paths,
path_filter=path_filter,
downloader_threads_count=downloader_threads_count,
debug_data=debug_data,
_stacklevel=5,

View File

@ -102,6 +102,7 @@ def read(
with_metadata: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
path_filter: str | None = None,
downloader_threads_count: int | None = None,
name: str | None = None,
autocommit_duration_ms: int | None = 1500,
@ -146,6 +147,10 @@ def read(
it should be given in the format ``<field_name>: <path to be mapped>``,
where the path to be mapped needs to be a
`JSON Pointer (RFC 6901) <https://www.rfc-editor.org/rfc/rfc6901>`_.
path_filter: A wildcard pattern used to match full object paths. Supports ``*``
(any number of any characters, including none) and ``?`` (any single character).
If specified, only paths matching this pattern will be included. Applied as an
additional filter after the initial ``path`` matching.
downloader_threads_count: The number of threads created to download the contents
of the bucket under the given path. It defaults to the number of cores
available on the machine. It is recommended to increase the number of
@ -271,6 +276,7 @@ def read(
path=path,
aws_s3_settings=prepared_aws_settings.settings,
csv_parser_settings=csv_settings.api_settings if csv_settings else None,
object_pattern=path_filter or "*",
mode=internal_connector_mode(mode),
read_method=internal_read_method(format),
downloader_threads_count=downloader_threads_count,

View File

@ -5,6 +5,7 @@ use std::str::from_utf8;
use std::time::SystemTime;
use arcstr::ArcStr;
use glob::Pattern as GlobPattern;
use log::{info, warn};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rayon::{ThreadPool, ThreadPoolBuilder};
@ -65,6 +66,7 @@ pub struct S3Scanner {
*/
bucket: S3Bucket,
objects_prefix: String,
object_pattern: GlobPattern,
pending_modifications: HashMap<String, Vec<u8>>,
downloader_pool: ThreadPool,
}
@ -121,6 +123,9 @@ impl PosixLikeScanner for S3Scanner {
let mut pending_modification_download_tasks = Vec::new();
for list in object_lists {
for object in &list.contents {
if !self.object_pattern.matches(&object.key) {
continue;
}
seen_object_keys.insert(object.key.clone());
let actual_metadata = FileLikeMetadata::from_s3_object(object);
let object_key = object.key.as_bytes();
@ -180,9 +185,11 @@ impl S3Scanner {
pub fn new(
bucket: S3Bucket,
objects_prefix: impl Into<String>,
object_pattern: impl Into<String>,
downloader_threads_count: usize,
) -> Result<Self, ReadError> {
let objects_prefix = objects_prefix.into();
let object_pattern = object_pattern.into();
let object_lists = execute_with_retries(
|| bucket.list(objects_prefix.clone(), None),
@ -205,6 +212,7 @@ impl S3Scanner {
Ok(S3Scanner {
bucket,
objects_prefix,
object_pattern: GlobPattern::new(&object_pattern)?,
downloader_pool: ThreadPoolBuilder::new()
.num_threads(downloader_threads_count)
.build()

View File

@ -5324,6 +5324,7 @@ impl DataStorage {
let scanner = S3Scanner::new(
self.s3_bucket()?,
deduced_path,
self.object_pattern.clone(),
self.downloader_threads_count()?,
)
.map_err(|e| PyIOError::new_err(format!("Failed to initialize S3 scanner: {e}")))?;