aws dynamodb output connector (#8991)

GitOrigin-RevId: c2417301cf9329dea96f4c60c9fb66d9353824ec
This commit is contained in:
Sergey Kulik 2025-07-23 13:13:19 +02:00 committed by Manul from Pathway
parent c7f3a1e97f
commit e79d4d8b3d
30 changed files with 1455 additions and 263 deletions

View File

@ -6,8 +6,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased]
### Added
- `pw.xpacks.llm.mcp_server.PathwayMcp` that allows serving `pw.xpacks.llm.document_store.DocumentStore` and `pw.xpacks.llm.question_answering` endpoints as MCP (Model Context Protocol) tools.
- `pw.io.dynamodb.write` method for writing to Dynamo DB.
## [0.25.0] - 2025-07-10

293
Cargo.lock generated
View File

@ -544,9 +544,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "aws-config"
version = "1.5.10"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924"
checksum = "c18d005c70d2b9c0c1ea8876c039db0ec7fb71164d25c73ccea21bf41fd02171"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -563,7 +563,7 @@ dependencies = [
"bytes",
"fastrand 2.3.0",
"hex",
"http 0.2.12",
"http 1.1.0",
"ring",
"time",
"tokio",
@ -574,9 +574,9 @@ dependencies = [
[[package]]
name = "aws-credential-types"
version = "1.2.1"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da"
checksum = "687bc16bc431a8533fe0097c7f0182874767f920989d7260950172ae8e3c4465"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
@ -601,6 +601,29 @@ dependencies = [
"url",
]
[[package]]
name = "aws-lc-rs"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08b5d4e069cbc868041a64bd68dc8cb39a0d79585cd6c5a24caa8c2d622121be"
dependencies = [
"aws-lc-sys",
"zeroize",
]
[[package]]
name = "aws-lc-sys"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff"
dependencies = [
"bindgen",
"cc",
"cmake",
"dunce",
"fs_extra",
]
[[package]]
name = "aws-region"
version = "0.25.5"
@ -612,9 +635,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.4.3"
version = "1.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468"
checksum = "4f6c68419d8ba16d9a7463671593c54f81ba58cab466e9b759418da606dcc2e2"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@ -628,7 +651,6 @@ dependencies = [
"fastrand 2.3.0",
"http 0.2.12",
"http-body 0.4.6",
"once_cell",
"percent-encoding",
"pin-project-lite",
"tracing",
@ -637,9 +659,9 @@ dependencies = [
[[package]]
name = "aws-sdk-dynamodb"
version = "1.54.0"
version = "1.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8efdda6a491bb4640d35b99b0a4b93f75ce7d6e3a1937c3e902d3cb23d0a179c"
checksum = "4fe8ed25686f117ab3a34dec9cf4d0b25f3555d16537858ef530b209967deecf"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -653,16 +675,15 @@ dependencies = [
"bytes",
"fastrand 2.3.0",
"http 0.2.12",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-sso"
version = "1.49.0"
version = "1.74.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4"
checksum = "e0a69de9c1b9272da2872af60c7402683e7f45c06267735b4332deacb203239b"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -674,17 +695,17 @@ dependencies = [
"aws-smithy-types",
"aws-types",
"bytes",
"fastrand 2.3.0",
"http 0.2.12",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-ssooidc"
version = "1.50.0"
version = "1.75.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee"
checksum = "f0b161d836fac72bdd5ac1a4cd1cdc38ab888c7af26cfd95f661be4409505e63"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -696,17 +717,17 @@ dependencies = [
"aws-smithy-types",
"aws-types",
"bytes",
"fastrand 2.3.0",
"http 0.2.12",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-sts"
version = "1.50.0"
version = "1.76.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ada54e5f26ac246dc79727def52f7f8ed38915cb47781e2a72213957dc3a7d5"
checksum = "cb1cd79a3412751a341a28e2cd0d6fa4345241976da427b075a0c0cd5409f886"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -719,17 +740,17 @@ dependencies = [
"aws-smithy-types",
"aws-smithy-xml",
"aws-types",
"fastrand 2.3.0",
"http 0.2.12",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sigv4"
version = "1.2.6"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2"
checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666"
dependencies = [
"aws-credential-types",
"aws-smithy-http",
@ -741,7 +762,6 @@ dependencies = [
"hmac",
"http 0.2.12",
"http 1.1.0",
"once_cell",
"percent-encoding",
"sha2",
"time",
@ -750,9 +770,9 @@ dependencies = [
[[package]]
name = "aws-smithy-async"
version = "1.2.4"
version = "1.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e"
checksum = "1e190749ea56f8c42bf15dd76c65e14f8f765233e6df9b0506d9d934ebef867c"
dependencies = [
"futures-util",
"pin-project-lite",
@ -761,9 +781,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.60.12"
version = "0.62.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc"
checksum = "99335bec6cdc50a346fda1437f9fefe33abf8c99060739a546a16457f2862ca9"
dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
@ -771,8 +791,8 @@ dependencies = [
"bytes-utils",
"futures-core",
"http 0.2.12",
"http 1.1.0",
"http-body 0.4.6",
"once_cell",
"percent-encoding",
"pin-project-lite",
"pin-utils",
@ -780,14 +800,52 @@ dependencies = [
]
[[package]]
name = "aws-smithy-json"
version = "0.60.7"
name = "aws-smithy-http-client"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6"
checksum = "f108f1ca850f3feef3009bdcc977be201bca9a91058864d9de0684e64514bee0"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
"aws-smithy-types",
"h2 0.3.26",
"h2 0.4.6",
"http 0.2.12",
"http 1.1.0",
"http-body 0.4.6",
"hyper 0.14.30",
"hyper 1.6.0",
"hyper-rustls 0.24.2",
"hyper-rustls 0.27.3",
"hyper-util",
"pin-project-lite",
"rustls 0.21.12",
"rustls 0.23.27",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
"tower",
"tracing",
]
[[package]]
name = "aws-smithy-json"
version = "0.61.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a16e040799d29c17412943bdbf488fd75db04112d0c0d4b9290bacf5ae0014b9"
dependencies = [
"aws-smithy-types",
]
[[package]]
name = "aws-smithy-observability"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9364d5989ac4dd918e5cc4c4bdcc61c9be17dcd2586ea7f69e348fc7c6cab393"
dependencies = [
"aws-smithy-runtime-api",
]
[[package]]
name = "aws-smithy-query"
version = "0.60.7"
@ -800,36 +858,33 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.7.7"
version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "865f7050bbc7107a6c98a397a9fcd9413690c27fa718446967cf03b2d3ac517e"
checksum = "c3aaec682eb189e43c8a19c3dab2fe54590ad5f2cc2d26ab27608a20f2acf81c"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-http-client",
"aws-smithy-observability",
"aws-smithy-runtime-api",
"aws-smithy-types",
"bytes",
"fastrand 2.3.0",
"h2 0.3.26",
"http 0.2.12",
"http 1.1.0",
"http-body 0.4.6",
"http-body 1.0.1",
"httparse",
"hyper 0.14.30",
"hyper-rustls 0.24.2",
"once_cell",
"pin-project-lite",
"pin-utils",
"rustls 0.21.12",
"tokio",
"tracing",
]
[[package]]
name = "aws-smithy-runtime-api"
version = "1.7.3"
version = "1.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd"
checksum = "9852b9226cb60b78ce9369022c0df678af1cac231c882d5da97a0c4e03be6e67"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
@ -844,9 +899,9 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "1.2.12"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28f6feb647fb5e0d5b50f0472c19a7db9462b74e2fec01bb0b44eedcc834e97"
checksum = "d498595448e43de7f4296b7b7a18a8a02c61ec9349128c80a368f7c3b4ab11a8"
dependencies = [
"base64-simd",
"bytes",
@ -870,18 +925,18 @@ dependencies = [
[[package]]
name = "aws-smithy-xml"
version = "0.60.9"
version = "0.60.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc"
checksum = "3db87b96cb1b16c024980f133968d52882ca0daaee3a086c6decc500f6c99728"
dependencies = [
"xmlparser",
]
[[package]]
name = "aws-types"
version = "1.3.4"
version = "1.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0df5a18c4f951c645300d365fec53a61418bcf4650f604f85fe2a665bfaa0c2"
checksum = "8a322fec39e4df22777ed3ad8ea868ac2f94cd15e1a55f6ee8d8d6305057689a"
dependencies = [
"aws-credential-types",
"aws-smithy-async",
@ -1117,6 +1172,29 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.69.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088"
dependencies = [
"bitflags 2.9.1",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.101",
"which",
]
[[package]]
name = "bit-set"
version = "0.5.3"
@ -1403,6 +1481,15 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f4c707c6a209cbe82d10abd08e1ea8995e9ea937d2550646e02798948992be0"
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -1451,6 +1538,17 @@ dependencies = [
"phf_codegen",
]
[[package]]
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "clap"
version = "4.5.26"
@ -1564,6 +1662,16 @@ dependencies = [
"libc",
]
[[package]]
name = "core-foundation"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@ -2606,6 +2714,12 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653"
[[package]]
name = "dunce"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
[[package]]
name = "dyn-clone"
version = "1.0.17"
@ -2919,6 +3033,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "funty"
version = "2.0.0"
@ -3481,7 +3601,7 @@ dependencies = [
"hyper 1.6.0",
"hyper-util",
"rustls 0.23.27",
"rustls-native-certs 0.8.0",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.2",
@ -3992,6 +4112,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "levenshtein_automata"
version = "0.2.1"
@ -4092,6 +4218,16 @@ dependencies = [
"rle-decode-fast",
]
[[package]]
name = "libloading"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
]
[[package]]
name = "libm"
version = "0.2.11"
@ -4532,7 +4668,7 @@ dependencies = [
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework 2.11.1",
"security-framework-sys",
"tempfile",
]
@ -5129,6 +5265,9 @@ dependencies = [
"arcstr",
"assert_matches",
"async-nats",
"aws-config",
"aws-sdk-dynamodb",
"aws-smithy-runtime-api",
"azure_core",
"azure_storage",
"azure_storage_blobs",
@ -5435,6 +5574,16 @@ dependencies = [
"termtree",
]
[[package]]
name = "prettyplease"
version = "0.2.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55"
dependencies = [
"proc-macro2",
"syn 2.0.101",
]
[[package]]
name = "proc-macro-crate"
version = "3.2.0"
@ -6115,7 +6264,7 @@ dependencies = [
"pin-project-lite",
"quinn",
"rustls 0.23.27",
"rustls-native-certs 0.8.0",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"serde",
"serde_json",
@ -6412,6 +6561,7 @@ version = "0.23.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321"
dependencies = [
"aws-lc-rs",
"log",
"once_cell",
"ring",
@ -6430,7 +6580,7 @@ dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.4",
"schannel",
"security-framework",
"security-framework 2.11.1",
]
[[package]]
@ -6443,20 +6593,19 @@ dependencies = [
"rustls-pemfile 2.1.3",
"rustls-pki-types",
"schannel",
"security-framework",
"security-framework 2.11.1",
]
[[package]]
name = "rustls-native-certs"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a"
checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.1.3",
"rustls-pki-types",
"schannel",
"security-framework",
"security-framework 3.2.0",
]
[[package]]
@ -6515,6 +6664,7 @@ version = "0.103.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435"
dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
"untrusted",
@ -6607,7 +6757,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [
"bitflags 2.9.1",
"core-foundation",
"core-foundation 0.9.4",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
dependencies = [
"bitflags 2.9.1",
"core-foundation 0.10.1",
"core-foundation-sys",
"libc",
"security-framework-sys",
@ -7112,7 +7275,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
dependencies = [
"bitflags 2.9.1",
"core-foundation",
"core-foundation 0.9.4",
"system-configuration-sys",
]
@ -7671,7 +7834,7 @@ dependencies = [
"percent-encoding",
"pin-project",
"prost",
"rustls-native-certs 0.8.0",
"rustls-native-certs 0.8.1",
"socket2",
"tokio",
"tokio-rustls 0.26.2",
@ -8282,6 +8445,18 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix 0.38.43",
]
[[package]]
name = "whoami"
version = "1.5.2"

View File

@ -21,6 +21,9 @@ mockall = "0.13.1"
arc-swap = "1.7.1"
arcstr = { version = "1.2.0", default-features = false, features = ["serde", "std"] }
async-nats = "0.41.0"
aws-config = "1.8.1"
aws-sdk-dynamodb = "1.82.0"
aws-smithy-runtime-api = "1.8.3"
azure_core = "0.21.0"
azure_storage = "0.21.0"
azure_storage_blobs = "0.21.0"

View File

@ -66,6 +66,7 @@ Before going into more details about the different connectors and how they work,
<span class="block"><a href="/developers/api-docs/pathway-io/bigquery">BigQuery</a></span>
<span class="block"><a href="/developers/user-guide/connect/connectors/csv_connectors">CSV</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/deltalake">Delta Lake</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/dynamodb">DynamoDB</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/elasticsearch">Elastic Search</a></span>
<span class="block"><a href="/developers/user-guide/connect/connectors/fs-connector">File System</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/pubsub">Google PubSub</a></span>

View File

@ -10,6 +10,7 @@ from pathlib import Path
import pytest
from pathway.internals import parse_graph
from pathway.tests.utils import SerializationTestHelper
CREDENTIALS_DIR = Path(os.getenv("CREDENTIALS_DIR", default=Path(__file__).parent))
@ -39,3 +40,8 @@ def disable_monitoring(monkeypatch: pytest.MonkeyPatch) -> None:
@pytest.fixture
def credentials_dir() -> Path:
return CREDENTIALS_DIR
@pytest.fixture
def serialization_tester():
return SerializationTestHelper()

View File

@ -1,6 +1,7 @@
import pytest
from utils import (
DebeziumContext,
DynamoDBContext,
MongoDBContext,
PgvectorContext,
PostgresContext,
@ -31,3 +32,8 @@ def mongodb():
@pytest.fixture
def debezium():
return DebeziumContext()
@pytest.fixture
def dynamodb():
return DynamoDBContext()

View File

@ -0,0 +1,263 @@
import json
import os
import pytest
from boto3.dynamodb.types import Binary, Decimal
from utils import EntryCountChecker
import pathway as pw
from pathway.internals.parse_graph import G
from pathway.tests.utils import (
ExceptionAwareThread,
wait_result_with_checker,
write_lines,
)
@pytest.mark.parametrize("with_optionals", [False, True])
@pytest.mark.parametrize("with_sort_key", [False, True])
def test_dynamodb_static_mode_serialization(
dynamodb, serialization_tester, with_optionals, with_sort_key
):
(table, _) = serialization_tester.create_variety_table(with_optionals)
table_name = dynamodb.generate_table_name()
write_table_kwargs = {
"table": table,
"table_name": table_name,
"partition_key": table.pkey,
"init_mode": "create_if_not_exists",
}
if with_sort_key:
write_table_kwargs["sort_key"] = table.skey
pw.io.dynamodb.write(**write_table_kwargs)
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
table_contents.sort(key=lambda item: item["pkey"])
assert len(table_contents) == 2 if with_optionals else 1
row_contents = table_contents[0]
expected_values = {
"pkey": Decimal("1"),
"skey": Decimal("10"),
"floats": {
"shape": [Decimal("2"), Decimal("2")],
"elements": [
Decimal("1.1"),
Decimal("2.2"),
Decimal("3.3"),
Decimal("4.4"),
],
},
"string": "abcdef",
"double": Decimal("-5.6"),
"binary_data": Binary(b"fedcba"),
"datetime_naive": "2025-01-17T00:00:00.000000000",
"list_data": ["lorem", None, "ipsum"],
"integer": Decimal("123"),
"json_data": '{"a":15,"b":"hello"}',
"ptr": "^Z5QKEQCDK9ZZ6TSYV0PM0G92JC",
"duration": Decimal("432000000000000"),
"floats_flat": {
"shape": [Decimal("3")],
"elements": [Decimal("1.1"), Decimal("2.2"), Decimal("3.3")],
},
"boolean": True,
"tuple_data": [Binary(b"world"), True],
"ints": {
"shape": [Decimal("2"), Decimal("2"), Decimal("2")],
"elements": [
Decimal("1"),
Decimal("2"),
Decimal("3"),
Decimal("4"),
Decimal("5"),
Decimal("6"),
Decimal("7"),
Decimal("8"),
],
},
"ints_flat": {
"shape": [Decimal("3")],
"elements": [Decimal("9"), Decimal("9"), Decimal("9")],
},
"datetime_utc_aware": "2025-01-17T00:00:00.000000000+0000",
}
assert row_contents == expected_values
def test_dynamodb_streaming(dynamodb, tmp_path):
inputs_path = tmp_path / "inputs"
table_name = dynamodb.generate_table_name()
os.mkdir(inputs_path)
inputs = [
{
"key": i,
"value": "a" * i,
}
for i in range(10)
]
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
value: str
def streaming_target(rows: list[dict]):
for index, row in enumerate(rows):
full_input_path = inputs_path / f"{index}"
with open(full_input_path, "w") as f:
json.dump(row, f)
checker = EntryCountChecker(index + 1, dynamodb, table_name=table_name)
wait_result_with_checker(checker, 15, target=None)
table = pw.io.jsonlines.read(
inputs_path, schema=InputSchema, autocommit_duration_ms=100
)
pw.io.dynamodb.write(table, table_name, table.key, init_mode="create_if_not_exists")
t = ExceptionAwareThread(target=streaming_target, args=(inputs,))
t.start()
checker = EntryCountChecker(len(inputs), dynamodb, table_name=table_name)
wait_result_with_checker(checker, 30)
@pytest.mark.parametrize("append_init_mode", ["default", "create_if_not_exists"])
def test_append_init_mode(dynamodb, append_init_mode):
# The write with "default" init mode fails, since the table isn't yet created
table_name = dynamodb.generate_table_name()
table = pw.debug.table_from_markdown(
"""
key | value
0 | zero
"""
)
pw.io.dynamodb.write(table, table_name, table.key)
with pytest.raises(
ValueError,
match=(
f"Failed to create DynamoDB writer: table {table_name} "
"doesn't exist in the destination storage"
),
):
pw.run()
# The write succeeds and creates a table with a single row
G.clear()
table = pw.debug.table_from_markdown(
"""
key | value
1 | one
"""
)
pw.io.dynamodb.write(table, table_name, table.key, init_mode="create_if_not_exists")
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
assert table_contents == [{"key": 1, "value": "one"}]
# The third write appends an entry to the table, regardless of the init_mode chosen
# from {"default", "create_if_not_exists"}
G.clear()
table = pw.debug.table_from_markdown(
"""
key | value
2 | two
"""
)
pw.io.dynamodb.write(table, table_name, table.key, init_mode=append_init_mode)
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
table_contents.sort(key=lambda item: item["key"])
assert table_contents == [{"key": 1, "value": "one"}, {"key": 2, "value": "two"}]
def test_recreate_init_mode(dynamodb):
# A table with a single row is created
table_name = dynamodb.generate_table_name()
table = pw.debug.table_from_markdown(
"""
key | value
1 | one
"""
)
pw.io.dynamodb.write(table, table_name, table.key, init_mode="replace")
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
assert table_contents == [{"key": 1, "value": "one"}]
# The table is overwritten because the init_mode is "replace"
G.clear()
table = pw.debug.table_from_markdown(
"""
key | value
2 | two
"""
)
pw.io.dynamodb.write(table, table_name, table.key, init_mode="replace")
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
table_contents.sort(key=lambda item: item["key"])
assert table_contents == [{"key": 2, "value": "two"}]
def test_key_overwrite(dynamodb):
table_name = dynamodb.generate_table_name()
table = pw.debug.table_from_markdown(
"""
key | value
1 | one
"""
)
pw.io.dynamodb.write(table, table_name, table.key, init_mode="create_if_not_exists")
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
assert table_contents == [{"key": 1, "value": "one"}]
G.clear()
table = pw.debug.table_from_markdown(
"""
key | value
1 | two
"""
)
pw.io.dynamodb.write(table, table_name, table.key)
pw.run()
table_contents = dynamodb.get_table_contents(table_name)
table_contents.sort(key=lambda item: item["key"])
assert table_contents == [
{"key": 1, "value": "two"},
]
def test_key_delete(dynamodb, tmp_path):
table_name = dynamodb.generate_table_name()
inputs_path = tmp_path / "inputs"
os.mkdir(inputs_path)
input_file_path = inputs_path / "input.jsonl"
pstorage_path = tmp_path / "pstorage"
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
value: str
def run_one_iteration(input_contents: list[dict]):
prepared_lines = [json.dumps(x) for x in input_contents]
write_lines(input_file_path, prepared_lines)
G.clear()
table = pw.io.jsonlines.read(inputs_path, mode="static", schema=InputSchema)
pw.io.dynamodb.write(
table, table_name, table.key, init_mode="create_if_not_exists"
)
pw.run(
persistence_config=pw.persistence.Config(
backend=pw.persistence.Backend.filesystem(pstorage_path)
)
)
table_contents = dynamodb.get_table_contents(table_name)
print(f"Table contents before comparison: {table_contents}")
for row in table_contents:
row["key"] = int(row["key"])
table_contents.sort(key=lambda x: x["key"])
assert input_contents == table_contents
run_one_iteration([{"key": 1, "value": "one"}, {"key": 2, "value": "two"}])
run_one_iteration([{"key": 2, "value": "two"}])

View File

@ -2,45 +2,20 @@ import copy
import datetime
import json
import os
import threading
import time
import pytest
from utils import QUEST_DB_HOST, QUEST_DB_LINE_PORT, QuestDBContext
from utils import QUEST_DB_HOST, QUEST_DB_LINE_PORT, EntryCountChecker
import pathway as pw
from pathway.tests.utils import wait_result_with_checker
from pathway.tests.utils import ExceptionAwareThread, wait_result_with_checker
QUESTDB_CONNECTION_STRING = f"http::addr={QUEST_DB_HOST}:{QUEST_DB_LINE_PORT};"
class EntryCountChecker:
def __init__(
self,
n_expected_entries: int,
questdb: QuestDBContext,
table_name: str,
column_names: list[str],
):
self.n_expected_entries = n_expected_entries
self.questdb = questdb
self.table_name = table_name
self.column_names = column_names
def __call__(self):
try:
table_contents = self.questdb.get_table_contents(
self.table_name, self.column_names
)
except Exception:
return False
return len(table_contents) == self.n_expected_entries
@pytest.mark.parametrize(
"designated_timestamp_policy", ["use_now", "use_pathway_time", "use_column"]
)
@pytest.mark.flaky(reruns=5) # No way to check that DB is ready to accept queries
def test_questdb_output_stream(designated_timestamp_policy, tmp_path, questdb):
class InputSchema(pw.Schema):
name: str
@ -49,6 +24,7 @@ def test_questdb_output_stream(designated_timestamp_policy, tmp_path, questdb):
available: bool
updated_at: pw.DateTimeUtc
table_name = questdb.random_table_name()
inputs_path = tmp_path / "inputs"
os.mkdir(inputs_path)
input_items = [
@ -86,18 +62,18 @@ def test_questdb_output_stream(designated_timestamp_policy, tmp_path, questdb):
)
def stream_inputs(test_items: list[dict]) -> None:
file_idx = 0
for test_item in test_items:
time.sleep(1.5)
file_idx += 1
for file_idx, test_item in enumerate(test_items):
input_path = inputs_path / f"{file_idx}.json"
with open(input_path, "w") as f:
f.write(json.dumps(test_item))
checker = EntryCountChecker(
file_idx + 1, questdb, table_name=table_name, column_names=["name"]
)
wait_result_with_checker(checker, 15, target=None)
table = pw.io.jsonlines.read(
inputs_path, schema=InputSchema, autocommit_duration_ms=200
)
table_name = questdb.random_table_name()
extra_params = {}
if designated_timestamp_policy == "use_column":
extra_params["designated_timestamp"] = table.updated_at
@ -110,9 +86,11 @@ def test_questdb_output_stream(designated_timestamp_policy, tmp_path, questdb):
**extra_params,
)
t = threading.Thread(target=stream_inputs, args=(input_items,))
t = ExceptionAwareThread(target=stream_inputs, args=(input_items,))
t.start()
checker = EntryCountChecker(len(input_items), questdb, table_name, ["name"])
checker = EntryCountChecker(
len(input_items), questdb, table_name=table_name, column_names=["name"]
)
wait_result_with_checker(checker, 15)
table_reread = questdb.get_table_contents(
table_name,

View File

@ -1,6 +1,7 @@
import time
import uuid
import boto3
import psycopg2
import requests
from pymongo import MongoClient
@ -280,3 +281,45 @@ class DebeziumContext:
},
}
return self._register_connector(payload, f"{connector_id}.public.{table_name}")
class DynamoDBContext:
def __init__(self):
self.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
def get_table_contents(self, table_name: str) -> list[dict]:
table = self.dynamodb.Table(table_name)
response = table.scan()
data = response["Items"]
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
data.extend(response["Items"])
return data
def generate_table_name(self) -> str:
return "table" + str(uuid.uuid4())
class EntryCountChecker:
def __init__(
self,
n_expected_entries: int,
db_context: DynamoDBContext | WireProtocolSupporterContext,
**get_table_contents_kwargs,
):
self.n_expected_entries = n_expected_entries
self.db_context = db_context
self.get_table_contents_kwargs = get_table_contents_kwargs
def __call__(self) -> bool:
try:
table_contents = self.db_context.get_table_contents(
**self.get_table_contents_kwargs
)
except Exception:
return False
return len(table_contents) == self.n_expected_entries

View File

@ -30,6 +30,7 @@ SCALE_TIER_ENTITLEMENTS = [
"advanced-parser",
"bigquery",
"deltalake",
"dynamodb",
"elasticsearch",
"full-persistence",
"iceberg",

View File

@ -11,7 +11,11 @@ from click.testing import CliRunner
from pathway import cli
from pathway.internals import config, parse_graph
from pathway.tests.utils import AIRBYTE_FAKER_CONNECTION_REL_PATH, UniquePortDispenser
from pathway.tests.utils import (
AIRBYTE_FAKER_CONNECTION_REL_PATH,
SerializationTestHelper,
UniquePortDispenser,
)
@pytest.fixture(autouse=True)
@ -113,3 +117,8 @@ def tmp_path_with_airbyte_config(tmp_path):
yaml.dump(config, f)
return tmp_path
@pytest.fixture
def serialization_tester():
return SerializationTestHelper()

View File

@ -846,10 +846,10 @@ class DeltaOptimizerRule:
class MqttSettings:
def __init__(self, qos: int, retain: bool): ...
class SqlWriterInitMode(Enum):
DEFAULT: SqlWriterInitMode
CREATE_IF_NOT_EXISTS: SqlWriterInitMode
REPLACE: SqlWriterInitMode
class TableWriterInitMode(Enum):
DEFAULT: TableWriterInitMode
CREATE_IF_NOT_EXISTS: TableWriterInitMode
REPLACE: TableWriterInitMode
class DataStorage:
mode: ConnectorMode
@ -880,7 +880,7 @@ class DataStorage:
database: str | None = None,
start_from_timestamp_ms: int | None = None,
namespace: list[str] | None = None,
sql_writer_init_mode: SqlWriterInitMode = SqlWriterInitMode.DEFAULT,
table_writer_init_mode: TableWriterInitMode = TableWriterInitMode.DEFAULT,
topic_name_index: int | None = None,
partition_columns: list[str] | None = None,
backfilling_thresholds: list[BackfillingThreshold] | None = None,
@ -888,6 +888,7 @@ class DataStorage:
delta_optimizer_rule: DeltaOptimizerRule | None = None,
mqtt_settings: MqttSettings | None = None,
only_provide_metadata: bool = False,
sort_key_index: int | None = None,
) -> None: ...
def delta_s3_storage_options(self, *args, **kwargs): ...

View File

@ -6,7 +6,7 @@ from pathway.internals import reducers, udfs, universes
from pathway.internals.api import (
Pointer,
PyObjectWrapper,
SqlWriterInitMode,
TableWriterInitMode,
wrap_py_object,
)
from pathway.internals.common import (
@ -157,5 +157,5 @@ __all__ = [
"local_error_log",
"ColumnDefinition",
"load_yaml",
"SqlWriterInitMode",
"TableWriterInitMode",
]

View File

@ -6,6 +6,7 @@ from pathway.io import (
csv,
debezium,
deltalake,
dynamodb,
elasticsearch,
fs,
gdrive,
@ -68,4 +69,5 @@ __all__ = [
"register_input_synchronization_group",
"mqtt",
"questdb",
"dynamodb",
]

View File

@ -499,3 +499,15 @@ def get_column_index(table: Table, column: ColumnReference | None) -> int | None
if table_column == column.name:
return index
raise RuntimeError(f"The column {column} is not found in the table {table}")
def init_mode_from_str(init_mode: str) -> api.TableWriterInitMode:
match init_mode:
case "default":
return api.TableWriterInitMode.DEFAULT
case "create_if_not_exists":
return api.TableWriterInitMode.CREATE_IF_NOT_EXISTS
case "replace":
return api.TableWriterInitMode.REPLACE
case _:
raise ValueError(f"Invalid init_mode: {init_mode}")

View File

@ -0,0 +1,267 @@
# Copyright © 2025 Pathway
from __future__ import annotations
from typing import Literal
from pathway.internals import api, datasink
from pathway.internals._io_helpers import _format_output_value_fields
from pathway.internals.config import _check_entitlements
from pathway.internals.expression import ColumnReference
from pathway.internals.runtime_type_check import check_arg_types
from pathway.internals.table import Table
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import get_column_index, init_mode_from_str
@check_arg_types
@trace_user_frame
def write(
table: Table,
table_name: str,
partition_key: ColumnReference,
*,
sort_key: ColumnReference | None = None,
init_mode: Literal["default", "create_if_not_exists", "replace"] = "default",
name: str | None = None,
) -> None:
"""
Writes ``table`` into a DynamoDB table. The connection settings are retrieved from
the environment.
This connector supports three modes: ``default`` mode, which performs no preparation
on the target table; ``create_if_not_exists`` mode, which creates the table if it does
not already exist; and ``replace`` mode, which replaces the table and clears any
previously existing data. The table is created with an
`on-demand <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html>`_
billing mode. Be aware that this mode may not be optimal for your use case, and the
provisioned mode with capacity planning might offer better performance or cost
efficiency. In such cases, we recommend creating the table yourself in AWS with the
desired provisioned throughput settings.
Note that if the table already exists and you use either ``default`` or
``create_if_not_exists`` mode, the schema of the table, including the primary key and
optional sort key, must match the schema of the table you are writing.
The connector performs writes using the primary key, defined as a combination of the
partition key and an optional sort key. Note that, due to how DynamoDB operates,
entries may overwrite existing ones if their keys coincide. When an entry is deleted
from the Pathway table, the corresponding entry is also removed from the DynamoDB
table maintained by the connector. In this sense, the connector behaves similarly to
the snapshot mode in the
`Delta Lake </developers/api-docs/pathway-io/deltalake/#pathway.io.deltalake.write>`_
output connector or the
`Postgres </developers/api-docs/pathway-io/postgres#pathway.io.postgres.write_snapshot>`_
output connector.
Args:
table: The table to write.
table_name: The name of the destination table in DynamoDB.
partition_key: The column to use as the
`partition key <https://aws.amazon.com/blogs/database/choosing-the-right-dynamodb-partition-key/>`_
in the destination table. Note that only scalar types, specifically ``Boolean``,
``String`` and ``Number``, can be used as index fields in DynamoDB. Therefore,
the field you select in the Pathway table must serialize to one of these types.
You can verify this using the conversion table provided in the connector documentation.
sort_key: An optional sort key for the destination table. Note that only scalar types can be used as the
index fields in DynamoDB. Similarly to the partition key, you can only use
fields that serialize into a scalar DynamoDB type.
init_mode: The table initialization mode, one of the three described above.
name: A unique name for the connector. If provided, this name will be used in
logs and monitoring dashboards.
Returns:
None
Example:
AWS provides an official DynamoDB Docker image that allows you to test locally.
The image is available as ``amazon/dynamodb-local`` and can be run as follows:
.. code-block:: bash
docker pull amazon/dynamodb-local:latest
docker run -p 8000:8000 --name dynamodb-local amazon/dynamodb-local:latest
The first command pulls the DynamoDB image from the official repository. The second
command starts a container and exposes port ``8000``, which will be used for the
connection.
Since the database runs locally and the settings are retrieved from the environment,
you will need to configure them accordingly. The easiest way to do this is by setting
a few environment variables to point to the running Docker image:
.. code-block:: bash
export AWS_ENDPOINT_URL=http://localhost:8000
export AWS_REGION=us-west-2
Please note that specifying the AWS region is required; however, the exact region does
not matter for the run to succeed, it simply needs to be set. The endpoint, in turn,
should point to the database running in the Docker container, accessible through the
exposed port.
At this point, the database is ready, and you can start writing a program. For example,
you can implement a program that stores data in a table in the locally running database.
First, create a table:
>>> import pathway as pw
>>> table = pw.debug.table_from_markdown('''
... key | value
... 1 | Hello
... 2 | World
... ''')
Next, save it as follows:
>>> pw.io.dynamodb.write(
... table,
... table_name="test",
... partition_key=table.key,
... init_mode="create_if_not_exists",
... )
Remember to run your program by calling ``pw.run()``. Note that if the table does not
already exist, using ``init_mode="default"`` will result in a failure, as Pathway will
not create the table and the write will fail due to its absence.
When finished, you can query the local DynamoDB for the table contents using the AWS
command-line tool:
.. code-block:: bash
aws dynamodb scan --table-name test
This will display the contents of the freshly created table:
.. code-block:: rst
{
"Items": [
{
"value": {
"S": "World"
},
"key": {
"N": "2"
}
},
{
"value": {
"S": "Hello"
},
"key": {
"N": "1"
}
}
],
"Count": 2,
"ScannedCount": 2,
"ConsumedCapacity": null
}
Note that since the ``table.key`` field is the partition key, writing an entry with
the same partition key will overwrite the existing data. For example, you can create
a smaller table with a repeated key:
>>> table = pw.debug.table_from_markdown('''
... key | value
... 1 | Bonjour
... ''')
Then write it again in ``"default"`` mode:
>>> pw.io.dynamodb.write(
... table,
... table_name="test",
... partition_key=table.key,
... )
Then, the contents of the target table will be updated with this new entry where
``key`` equals to ``1``:
.. code-block:: rst
{
"Items": [
{
"value": {
"S": "World"
},
"key": {
"N": "2"
}
},
{
"value": {
"S": "Bonjour"
},
"key": {
"N": "1"
}
}
],
"Count": 2,
"ScannedCount": 2,
"ConsumedCapacity": null
}
Finally, you can run a program in ``"replace"`` table initialization mode, which
will overwrite the existing data:
>>> table = pw.debug.table_from_markdown('''
... key | value
... 3 | Hi
... ''')
>>> pw.io.dynamodb.write(
... table,
... table_name="test",
... partition_key=table.key,
... init_mode="replace",
... )
The next run of ``aws dynamodb scan --table-name test`` will then return a single-row
table:
.. code-block:: rst
{
"Items": [
{
"value": {
"S": "Hi"
},
"key": {
"N": "3"
}
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
"""
_check_entitlements("dynamodb")
data_storage = api.DataStorage(
storage_type="dynamodb",
table_name=table_name,
table_writer_init_mode=init_mode_from_str(init_mode),
key_field_index=get_column_index(table, partition_key),
sort_key_index=get_column_index(table, sort_key),
)
data_format = api.DataFormat(
format_type="identity",
key_field_names=[],
value_fields=_format_output_value_fields(table),
)
table.to(
datasink.GenericDataSink(
data_storage,
data_format,
datasink_name="dynamodb",
unique_name=name,
)
)

View File

@ -10,25 +10,13 @@ from pathway.internals.expression import ColumnReference
from pathway.internals.runtime_type_check import check_arg_types
from pathway.internals.table import Table
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import get_column_index
from pathway.io._utils import get_column_index, init_mode_from_str
def _connection_string_from_settings(settings: dict):
return " ".join(k + "=" + str(v) for (k, v) in settings.items())
def _init_mode_from_str(init_mode: str) -> api.SqlWriterInitMode:
match init_mode:
case "default":
return api.SqlWriterInitMode.DEFAULT
case "create_if_not_exists":
return api.SqlWriterInitMode.CREATE_IF_NOT_EXISTS
case "replace":
return api.SqlWriterInitMode.REPLACE
case _:
raise ValueError(f"Invalid init_mode: {init_mode}")
@check_arg_types
@trace_user_frame
def write(
@ -129,7 +117,7 @@ def write(
connection_string=_connection_string_from_settings(postgres_settings),
max_batch_size=max_batch_size,
table_name=table_name,
sql_writer_init_mode=_init_mode_from_str(init_mode),
table_writer_init_mode=init_mode_from_str(init_mode),
)
data_format = api.DataFormat(
format_type="sql",
@ -232,7 +220,7 @@ def write_snapshot(
max_batch_size=max_batch_size,
snapshot_maintenance_on_output=True,
table_name=table_name,
sql_writer_init_mode=_init_mode_from_str(init_mode),
table_writer_init_mode=init_mode_from_str(init_mode),
)
if (

View File

@ -22,7 +22,6 @@ import pandas as pd
import pyarrow as pa
import pytest
import yaml
from dateutil import tz
from deltalake import DeltaTable, write_deltalake
from fs import open_fs
@ -3717,63 +3716,11 @@ def test_py_object_wrapper_serialization(tmp_path: pathlib.Path, data_format):
@pytest.mark.parametrize("with_optionals", [False, True])
@only_with_license_key("data_format", ["delta", "delta_stored_schema"])
def test_different_types_serialization(
tmp_path: pathlib.Path, data_format, with_optionals
tmp_path: pathlib.Path, data_format, with_optionals, serialization_tester
):
input_path = tmp_path / "input.jsonl"
auxiliary_path = tmp_path / "auxiliary-storage"
input_path.write_text("test")
n_expected_rows = 1
column_values = {
"boolean": True,
"integer": 123,
"double": -5.6,
"string": "abcdef",
"binary_data": b"fedcba",
"datetime_naive": pw.DateTimeNaive(year=2025, month=1, day=17),
"datetime_utc_aware": pw.DateTimeUtc(year=2025, month=1, day=17, tz=tz.UTC),
"duration": pw.Duration(days=5),
"ints": np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]], dtype=int),
"floats": np.array([[1.1, 2.2], [3.3, 4.4]], dtype=float),
"ints_flat": np.array([9, 9, 9], dtype=int),
"floats_flat": np.array([1.1, 2.2, 3.3], dtype=float),
"json_data": pw.Json.parse('{"a": 15, "b": "hello"}'),
"tuple_data": (b"world", True),
"list_data": ("lorem", None, "ipsum"),
"ptr": api.ref_scalar(42),
}
known_columns = {"test": column_values}
composite_types = {
"ints": np.ndarray[None, int],
"floats": np.ndarray[None, float],
"ints_flat": np.ndarray[None, int],
"floats_flat": np.ndarray[None, float],
"tuple_data": tuple[bytes, bool],
"list_data": list[str | None],
}
table = pw.io.plaintext.read(input_path, mode="static")
table = table.select(
data=pw.this.data,
**column_values,
)
if with_optionals:
n_expected_rows += 1
none_column_values = {
"data": "test_with_optionals",
}
for key in column_values.keys():
none_column_values[key] = None
table_with_optionals = pw.io.plaintext.read(input_path, mode="static")
table_with_optionals = table_with_optionals.select(**none_column_values)
pw.universes.promise_are_pairwise_disjoint(table, table_with_optionals)
table = table.concat(table_with_optionals)
optional_composite_types = {}
for field_name, field_type in composite_types.items():
optional_composite_types[field_name] = field_type | None
composite_types = optional_composite_types
known_columns[none_column_values["data"]] = none_column_values
table = table.update_types(**composite_types)
table, known_rows = serialization_tester.create_variety_table(with_optionals)
if data_format == "delta" or data_format == "delta_stored_schema":
pw.io.deltalake.write(table, auxiliary_path)
elif data_format == "json":
@ -3782,59 +3729,16 @@ def test_different_types_serialization(
pw.io.csv.write(table, auxiliary_path)
else:
raise ValueError(f"Unknown data format: {data_format}")
run_all()
G.clear()
if with_optionals:
class InputSchema(pw.Schema):
data: str = pw.column_definition(primary_key=True)
boolean: bool | None
integer: int | None
double: float | None
string: str | None
binary_data: bytes | None
datetime_naive: pw.DateTimeNaive | None
datetime_utc_aware: pw.DateTimeUtc | None
duration: pw.Duration | None
ints: np.ndarray[None, int] | None
floats: np.ndarray[None, float] | None
ints_flat: np.ndarray[None, int] | None
floats_flat: np.ndarray[None, float] | None
json_data: pw.Json | None
tuple_data: tuple[bytes, bool] | None
list_data: list[str | None] | None
ptr: api.Pointer | None
else:
class InputSchema(pw.Schema):
data: str = pw.column_definition(primary_key=True)
boolean: bool
integer: int
double: float
string: str
binary_data: bytes
datetime_naive: pw.DateTimeNaive
datetime_utc_aware: pw.DateTimeUtc
duration: pw.Duration
ints: np.ndarray[None, int]
floats: np.ndarray[None, float]
ints_flat: np.ndarray[None, int]
floats_flat: np.ndarray[None, float]
json_data: pw.Json
tuple_data: tuple[bytes, bool]
list_data: list[str | None]
ptr: api.Pointer
class Checker:
def __init__(self):
self.n_processed_rows = 0
def __call__(self, key, row, time, is_addition):
self.n_processed_rows += 1
column_values = known_columns[row["data"]]
column_values = known_rows[row["pkey"]]
for field, expected_value in column_values.items():
if isinstance(expected_value, np.ndarray):
assert row[field].shape == expected_value.shape
@ -3851,6 +3755,7 @@ def test_different_types_serialization(
assert row[field] in expected_values
InputSchema = table.schema
if data_format == "delta":
table = pw.io.deltalake.read(auxiliary_path, schema=InputSchema, mode="static")
elif data_format == "delta_stored_schema":
@ -3861,11 +3766,10 @@ def test_different_types_serialization(
table = pw.io.csv.read(auxiliary_path, schema=InputSchema, mode="static")
else:
raise ValueError(f"Unknown data format: {data_format}")
checker = Checker()
pw.io.subscribe(table, on_change=checker)
run_all()
assert checker.n_processed_rows == n_expected_rows
assert checker.n_processed_rows == len(known_rows)
@only_with_license_key

View File

@ -18,11 +18,13 @@ from abc import abstractmethod
from collections.abc import Callable, Generator, Hashable, Iterable, Mapping
from contextlib import AbstractContextManager, contextmanager
from dataclasses import dataclass
from functools import cached_property
from typing import Any, TypeVar
import numpy as np
import pandas as pd
import pytest
from dateutil import tz
import pathway as pw
from pathway.debug import _markdown_to_pandas, table_from_markdown, table_from_pandas
@ -909,3 +911,68 @@ def assert_sets_equality_from_path(path: pathlib.Path, expected: set[str]) -> No
except pd.errors.EmptyDataError:
result = pd.Series([])
assert set(result) == expected
class SerializationTestHelper:
def create_variety_table(self, with_optionals: bool) -> tuple[pw.Table, dict]:
composite_types: dict[str, type] = {
"ints": np.ndarray[None, int], # type: ignore
"floats": np.ndarray[None, float], # type: ignore
"ints_flat": np.ndarray[None, int], # type: ignore
"floats_flat": np.ndarray[None, float], # type: ignore
"tuple_data": tuple[bytes, bool],
"list_data": list[str | None],
}
table = pw.debug.table_from_markdown("field\n1")
table = table.select(**self.filled_column_values)
pkey_row_mapping = {
1: self.filled_column_values,
}
if with_optionals:
# Prepare a row with all types None except for pkey
none_column_values: dict[str, int | None] = {}
for key in self.filled_column_values.keys():
none_column_values[key] = None
none_column_values["pkey"] = 2
none_column_values["skey"] = 20
pkey_row_mapping[2] = none_column_values
# Append this row to an existing table
table_with_optionals = pw.debug.table_from_markdown("field\n1")
table_with_optionals = table.select(**none_column_values)
pw.universes.promise_are_pairwise_disjoint(table, table_with_optionals)
table = table.concat(table_with_optionals)
# Update composite types
optional_composite_types = {}
for field_name, field_type in composite_types.items():
optional_composite_types[field_name] = field_type | None
composite_types = optional_composite_types # type: ignore
table = table.update_types(**composite_types)
return (table, pkey_row_mapping)
@cached_property
def filled_column_values(self):
return {
"pkey": 1,
"skey": 10,
"boolean": True,
"integer": 123,
"double": -5.6,
"string": "abcdef",
"binary_data": b"fedcba",
"datetime_naive": pw.DateTimeNaive(year=2025, month=1, day=17),
"datetime_utc_aware": pw.DateTimeUtc(year=2025, month=1, day=17, tz=tz.UTC),
"duration": pw.Duration(days=5),
"ints": np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]], dtype=int),
"floats": np.array([[1.1, 2.2], [3.3, 4.4]], dtype=float),
"ints_flat": np.array([9, 9, 9], dtype=int),
"floats_flat": np.array([1.1, 2.2, 3.3], dtype=float),
"json_data": pw.Json.parse('{"a": 15, "b": "hello"}'),
"tuple_data": (b"world", True),
"list_data": ("lorem", None, "ipsum"),
"ptr": api.ref_scalar(42),
}

View File

@ -0,0 +1,375 @@
use log::error;
use std::collections::HashMap;
use std::mem::take;
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::operation::batch_write_item::BatchWriteItemError;
use aws_sdk_dynamodb::operation::create_table::builders::CreateTableFluentBuilder;
use aws_sdk_dynamodb::operation::create_table::CreateTableError;
use aws_sdk_dynamodb::operation::delete_table::DeleteTableError;
use aws_sdk_dynamodb::operation::describe_table::DescribeTableError;
use aws_sdk_dynamodb::types::{
AttributeDefinition, AttributeValue, BillingMode, DeleteRequest, KeySchemaElement, KeyType,
PutRequest, ScalarAttributeType, WriteRequest,
};
use aws_sdk_dynamodb::Client;
use aws_smithy_runtime_api::http::Response as AwsHttpResponse;
use ndarray::ArrayD;
use tokio::runtime::Runtime as TokioRuntime;
use crate::connectors::data_format::{
FormatterContext, FormatterError, NDARRAY_ELEMENTS_FIELD_NAME, NDARRAY_SHAPE_FIELD_NAME,
};
use crate::connectors::data_storage::TableWriterInitMode;
use crate::connectors::{WriteError, Writer};
use crate::engine::{Type, Value};
use crate::python_api::ValueField;
use crate::retry::RetryConfig;
// No more than 25 items can be sent in a single batch
// There is no public constant for that, so we create our own
// https://docs.rs/aws-sdk-dynamodb/latest/aws_sdk_dynamodb/operation/batch_write_item/builders/struct.BatchWriteItemFluentBuilder.html
pub const MAX_BATCH_WRITE_SIZE: usize = 25;
pub const N_SEND_ATTEMPTS: usize = 5;
#[derive(Debug, thiserror::Error)]
pub enum AwsRequestError {
#[error("Create table error, service error details: {:?}", .0.as_service_error())]
CreateTableError(#[from] SdkError<CreateTableError, AwsHttpResponse>),
#[error("Delete table error, service error details: {:?}", .0.as_service_error())]
DeleteTableError(#[from] SdkError<DeleteTableError, AwsHttpResponse>),
#[error("Describe table error, service error details: {:?}", .0.as_service_error())]
DescribeTableError(#[from] SdkError<DescribeTableError, AwsHttpResponse>),
#[error("Batch write error, service error details: {:?}", .0.as_service_error())]
BatchWriteError(#[from] SdkError<BatchWriteItemError, AwsHttpResponse>),
}
pub struct DynamoDBWriter {
runtime: TokioRuntime,
client: Client,
table_name: String,
value_fields: Vec<ValueField>,
write_requests: Vec<WriteRequest>,
partition_key_index: usize,
sort_key_index: Option<usize>,
}
impl DynamoDBWriter {
pub fn new(
runtime: TokioRuntime,
client: Client,
table_name: String,
value_fields: Vec<ValueField>,
partition_key_index: usize,
sort_key_index: Option<usize>,
init_mode: TableWriterInitMode,
) -> Result<Self, WriteError> {
let writer = Self {
runtime,
client,
table_name,
value_fields,
write_requests: Vec::new(),
partition_key_index,
sort_key_index,
};
match init_mode {
TableWriterInitMode::Default => {}
TableWriterInitMode::Replace => {
if writer.table_exists()? {
writer.delete_table()?;
}
writer.create_table_from_schema()?;
}
TableWriterInitMode::CreateIfNotExists => {
if !writer.table_exists()? {
writer.create_table_from_schema()?;
}
}
}
if !writer.table_exists()? {
return Err(WriteError::TableDoesNotExist(writer.table_name));
}
Ok(writer)
}
fn rust_type_to_dynamodb_index_type(ty: &Type) -> Result<ScalarAttributeType, WriteError> {
match ty {
Type::Bool => Ok(ScalarAttributeType::B),
Type::Int | Type::Float | Type::Duration => Ok(ScalarAttributeType::N),
Type::String | Type::Pointer | Type::DateTimeNaive | Type::DateTimeUtc => {
Ok(ScalarAttributeType::S)
}
_ => Err(WriteError::NotIndexType(ty.clone())),
}
}
fn add_attribute_definition(
mut builder: CreateTableFluentBuilder,
field: &ValueField,
key_type: KeyType,
) -> Result<CreateTableFluentBuilder, WriteError> {
let scalar_type = Self::rust_type_to_dynamodb_index_type(&field.type_)?;
builder = builder.attribute_definitions(
AttributeDefinition::builder()
.attribute_name(field.name.clone())
.attribute_type(scalar_type)
.build()?,
);
builder = builder.key_schema(
KeySchemaElement::builder()
.attribute_name(field.name.clone())
.key_type(key_type)
.build()?,
);
Ok(builder)
}
fn create_table_from_schema(&self) -> Result<(), WriteError> {
self.runtime.block_on(async {
let mut builder = self
.client
.create_table()
.table_name(self.table_name.clone())
.billing_mode(BillingMode::PayPerRequest);
builder = Self::add_attribute_definition(
builder,
&self.value_fields[self.partition_key_index],
KeyType::Hash,
)?;
if let Some(sort_key_index) = self.sort_key_index {
builder = Self::add_attribute_definition(
builder,
&self.value_fields[sort_key_index],
KeyType::Range,
)?;
}
// Convert the possible error first into AwsRequestError,
// and then to WriteError, if needed.
builder.send().await.map_err(AwsRequestError::from)?;
Ok(())
})
}
fn delete_table(&self) -> Result<(), WriteError> {
self.runtime.block_on(async {
self.client
.delete_table()
.table_name(self.table_name.clone())
.send()
.await?;
Ok::<(), AwsRequestError>(())
})?;
Ok(())
}
fn table_exists(&self) -> Result<bool, WriteError> {
self.runtime.block_on(async {
let table_description = self
.client
.describe_table()
.table_name(self.table_name.clone())
.send()
.await;
match table_description {
Ok(_table_info) => Ok(true),
Err(err) => {
if matches!(
err.as_service_error(),
Some(DescribeTableError::ResourceNotFoundException(_))
) {
Ok(false)
} else {
Err(AwsRequestError::from(err).into())
}
}
}
})
}
fn array_to_attribute_value<T>(arr: &ArrayD<T>) -> AttributeValue
where
T: ToString,
{
let mut value = HashMap::with_capacity(2);
let list = arr
.iter()
.map(|i| AttributeValue::N(i.to_string()))
.collect::<Vec<_>>();
value.insert(
NDARRAY_ELEMENTS_FIELD_NAME.to_string(),
AttributeValue::L(list),
);
let shape = arr
.shape()
.iter()
.map(|i| AttributeValue::N(i.to_string()))
.collect::<Vec<_>>();
value.insert(
NDARRAY_SHAPE_FIELD_NAME.to_string(),
AttributeValue::L(shape),
);
AttributeValue::M(value)
}
fn value_to_attribute(value: &Value) -> Result<AttributeValue, WriteError> {
match value {
Value::None => Ok(AttributeValue::Null(true)),
Value::Bool(b) => Ok(AttributeValue::Bool(*b)),
Value::Int(i) => Ok(AttributeValue::N(i.to_string())),
Value::Float(f) => Ok(AttributeValue::N(f.to_string())),
Value::String(s) => Ok(AttributeValue::S(s.to_string())),
Value::Bytes(b) => Ok(AttributeValue::B(b.to_vec().into())),
Value::Pointer(p) => Ok(AttributeValue::S(p.to_string())),
Value::Tuple(t) => {
let list = t
.iter()
.map(Self::value_to_attribute)
.collect::<Result<Vec<_>, _>>()?;
Ok(AttributeValue::L(list))
}
Value::IntArray(arr) => Ok(Self::array_to_attribute_value(arr)),
Value::FloatArray(arr) => Ok(Self::array_to_attribute_value(arr)),
Value::DateTimeNaive(dt) => Ok(AttributeValue::S(dt.to_string())),
Value::DateTimeUtc(dt) => Ok(AttributeValue::S(dt.to_string())),
Value::Duration(d) => Ok(AttributeValue::N(d.nanoseconds().to_string())),
Value::Json(j) => Ok(AttributeValue::S(j.to_string())),
Value::PyObjectWrapper(v) => Ok(AttributeValue::B(
bincode::serialize(v).map_err(|e| *e)?.into(),
)),
Value::Error => Err(FormatterError::ErrorValueNonJsonSerializable.into()),
Value::Pending => Err(FormatterError::PendingValueNonJsonSerializable.into()),
}
}
fn create_upsert_request(&self, data: &FormatterContext) -> Result<WriteRequest, WriteError> {
let mut values_prepared_as_map = HashMap::with_capacity(self.value_fields.len());
for (value_field, entry) in self.value_fields.iter().zip(data.values.iter()) {
values_prepared_as_map
.insert(value_field.name.clone(), Self::value_to_attribute(entry)?);
}
Ok(WriteRequest::builder()
.put_request(
PutRequest::builder()
.set_item(Some(values_prepared_as_map))
.build()?,
)
.build())
}
fn create_delete_request(&self, data: &FormatterContext) -> Result<WriteRequest, WriteError> {
let mut values_prepared_as_map = HashMap::with_capacity(2);
values_prepared_as_map.insert(
self.value_fields[self.partition_key_index].name.clone(),
Self::value_to_attribute(&data.values[self.partition_key_index])?,
);
if let Some(sort_key_index) = self.sort_key_index {
values_prepared_as_map.insert(
self.value_fields[sort_key_index].name.clone(),
Self::value_to_attribute(&data.values[sort_key_index])?,
);
}
Ok(WriteRequest::builder()
.delete_request(
DeleteRequest::builder()
.set_key(Some(values_prepared_as_map))
.build()?,
)
.build())
}
}
impl Writer for DynamoDBWriter {
fn write(&mut self, data: FormatterContext) -> Result<(), WriteError> {
let request = match data.diff {
1 => self.create_upsert_request(&data)?,
-1 => self.create_delete_request(&data)?,
_ => unreachable!("diff can only be 1 or -1"),
};
self.write_requests.push(request);
if self.write_requests.len() == MAX_BATCH_WRITE_SIZE {
self.flush(false)?;
}
Ok(())
}
fn flush(&mut self, _forced: bool) -> Result<(), WriteError> {
if self.write_requests.is_empty() {
return Ok(());
}
let mut request_items = HashMap::with_capacity(1);
request_items.insert(self.table_name.clone(), take(&mut self.write_requests));
self.runtime.block_on(async {
let mut retry = RetryConfig::default();
for _ in 0..N_SEND_ATTEMPTS {
let response = self
.client
.batch_write_item()
.set_request_items(Some(request_items.clone()))
.send()
.await;
match response {
Ok(response) => {
// If there are unprocessed items in the response, save them for the next request.
// Otherwise the request has succeeded, and has no items to process further.
if let Some(unprocessed_items) = response.unprocessed_items {
request_items = unprocessed_items;
} else {
request_items.clear();
}
if let Some(unprocessed_requests) = request_items.get(&self.table_name) {
// If there's a non-empty array with unprocessed items, it must be retried.
// Otherwise, the method may terminate.
if unprocessed_requests.is_empty() {
return Ok(());
}
} else {
// If there's no vector with the items waiting for submission, it means that
// everything has been sent
return Ok(());
}
}
Err(e) => {
error!(
"An attempt to save item batch has failed: {}",
AwsRequestError::from(e)
);
}
}
retry.sleep_after_error();
}
let unprocessed_items = request_items.remove(&self.table_name);
if let Some(unprocessed_items) = unprocessed_items {
Err(WriteError::SomeItemsNotDelivered(unprocessed_items.len()))
} else {
Ok(())
}
})
}
fn name(&self) -> String {
format!("DynamoDB({})", self.table_name)
}
fn single_threaded(&self) -> bool {
false
}
}

View File

@ -0,0 +1,3 @@
pub mod dynamodb;
pub use dynamodb::DynamoDBWriter;

View File

@ -43,6 +43,10 @@ use serde_json::{Map as JsonMap, Value as JsonValue};
use super::data_storage::{ConversionError, SpecialEvent};
pub const COMMIT_LITERAL: &str = "*COMMIT*";
pub const NDARRAY_ELEMENTS_FIELD_NAME: &str = "elements";
pub const NDARRAY_SINGLE_ELEMENT_FIELD_NAME: &str = "element";
pub const NDARRAY_SHAPE_FIELD_NAME: &str = "shape";
const DEBEZIUM_EMPTY_KEY_PAYLOAD: &str = "{\"payload\": {\"before\": {}, \"after\": {}}}";
fn is_commit_literal(value: &DynResult<Value>) -> bool {
@ -1142,10 +1146,10 @@ fn parse_tuple_from_json(values: &[JsonValue], dtypes: &[Type]) -> Option<Value>
}
fn parse_ndarray_from_json(value: &JsonMap<String, JsonValue>, dtype: &Type) -> Option<Value> {
let JsonValue::Array(ref elements) = value["elements"] else {
let JsonValue::Array(ref elements) = value[NDARRAY_ELEMENTS_FIELD_NAME] else {
return None;
};
let JsonValue::Array(ref json_field_shape) = value["shape"] else {
let JsonValue::Array(ref json_field_shape) = value[NDARRAY_SHAPE_FIELD_NAME] else {
return None;
};
let mut shape = Vec::new();
@ -1309,8 +1313,8 @@ pub fn serialize_value_to_json(value: &Value) -> Result<JsonValue, FormatterErro
flat_elements.push(json!(item));
}
let serialized_values = json!({
"shape": a.shape(),
"elements": flat_elements,
NDARRAY_SHAPE_FIELD_NAME: a.shape(),
NDARRAY_ELEMENTS_FIELD_NAME: flat_elements,
});
Ok(serialized_values)
}
@ -1320,8 +1324,8 @@ pub fn serialize_value_to_json(value: &Value) -> Result<JsonValue, FormatterErro
flat_elements.push(json!(item));
}
let serialized_values = json!({
"shape": a.shape(),
"elements": flat_elements,
NDARRAY_SHAPE_FIELD_NAME: a.shape(),
NDARRAY_ELEMENTS_FIELD_NAME: flat_elements,
});
Ok(serialized_values)
}

View File

@ -17,6 +17,9 @@ use deltalake::arrow::datatypes::{
use ndarray::ArrayD;
use super::{LakeWriterSettings, MaintenanceMode};
use crate::connectors::data_format::{
NDARRAY_ELEMENTS_FIELD_NAME, NDARRAY_SHAPE_FIELD_NAME, NDARRAY_SINGLE_ELEMENT_FIELD_NAME,
};
use crate::connectors::data_lake::LakeBatchWriter;
use crate::connectors::WriteError;
use crate::engine::time::DateTime as EngineDateTime;
@ -259,8 +262,11 @@ fn arrow_data_type(
Type::List(wrapped_type) => {
let wrapped_type_is_optional = wrapped_type.is_optional();
let wrapped_arrow_type = arrow_data_type(wrapped_type, settings)?;
let list_field =
ArrowField::new("element", wrapped_arrow_type, wrapped_type_is_optional);
let list_field = ArrowField::new(
NDARRAY_SINGLE_ELEMENT_FIELD_NAME,
wrapped_arrow_type,
wrapped_type_is_optional,
);
ArrowDataType::List(list_field.into())
}
Type::Array(_, wrapped_type) => {
@ -272,16 +278,26 @@ fn arrow_data_type(
};
let struct_fields_vector = vec![
ArrowField::new(
"shape",
NDARRAY_SHAPE_FIELD_NAME,
ArrowDataType::List(
ArrowField::new("element", ArrowDataType::Int64, true).into(),
ArrowField::new(
NDARRAY_SINGLE_ELEMENT_FIELD_NAME,
ArrowDataType::Int64,
true,
)
.into(),
),
false,
),
ArrowField::new(
"elements",
NDARRAY_ELEMENTS_FIELD_NAME,
ArrowDataType::List(
ArrowField::new("element", elements_arrow_type, true).into(),
ArrowField::new(
NDARRAY_SINGLE_ELEMENT_FIELD_NAME,
elements_arrow_type,
true,
)
.into(),
),
false,
),

View File

@ -43,7 +43,9 @@ use super::{
MaintenanceMode, MetadataPerColumn, PATHWAY_COLUMN_META_FIELD, SPECIAL_OUTPUT_FIELDS,
};
use crate::async_runtime::create_async_tokio_runtime;
use crate::connectors::data_format::parse_bool_advanced;
use crate::connectors::data_format::{
parse_bool_advanced, NDARRAY_ELEMENTS_FIELD_NAME, NDARRAY_SHAPE_FIELD_NAME,
};
use crate::connectors::data_lake::buffering::PayloadType;
use crate::connectors::data_lake::ArrowDataType;
use crate::connectors::data_storage::{ConnectorMode, ConversionError, ValuesMap};
@ -393,8 +395,12 @@ impl DeltaBatchWriter {
DeltaTableArrayType::new(elements_kernel_type, true).into(),
);
let struct_descriptor = DeltaTableStructType::new(vec![
DeltaTableStructField::new("shape", shape_data_type, false),
DeltaTableStructField::new("elements", elements_data_type, false),
DeltaTableStructField::new(NDARRAY_SHAPE_FIELD_NAME, shape_data_type, false),
DeltaTableStructField::new(
NDARRAY_ELEMENTS_FIELD_NAME,
elements_data_type,
false,
),
]);
DeltaTableKernelType::Struct(struct_descriptor.into())
}

View File

@ -30,6 +30,7 @@ use super::{
columns_into_pathway_values, LakeBatchWriter, LakeWriterSettings, SPECIAL_OUTPUT_FIELDS,
};
use crate::async_runtime::create_async_tokio_runtime;
use crate::connectors::data_format::NDARRAY_SINGLE_ELEMENT_FIELD_NAME;
use crate::connectors::data_lake::buffering::PayloadType;
use crate::connectors::data_storage::ConnectorMode;
use crate::connectors::metadata::IcebergMetadata;
@ -190,7 +191,7 @@ impl IcebergTableParams {
let nested_element_type = Self::iceberg_type(element_type.unoptionalize())?;
let nested_type = IcebergNestedField::new(
0,
"element",
NDARRAY_SINGLE_ELEMENT_FIELD_NAME,
nested_element_type,
!element_type_is_optional,
);

View File

@ -23,6 +23,7 @@ use half::f16;
use ndarray::ArrayD;
use once_cell::sync::Lazy;
use crate::connectors::data_format::{NDARRAY_ELEMENTS_FIELD_NAME, NDARRAY_SHAPE_FIELD_NAME};
use crate::connectors::data_lake::buffering::PayloadType;
use crate::connectors::data_storage::ConversionError;
use crate::connectors::data_storage::ValuesMap;
@ -206,19 +207,19 @@ pub fn parse_pathway_tuple_from_row(row: &ParquetRow, nested_types: &[Type]) ->
}
pub fn parse_pathway_array_from_parquet_row(row: &ParquetRow, array_type: &Type) -> Option<Value> {
let shape_i64 = parse_int_array_from_parquet_row(row, "shape")?;
let shape_i64 = parse_int_array_from_parquet_row(row, NDARRAY_SHAPE_FIELD_NAME)?;
let mut shape: Vec<usize> = Vec::new();
for element in shape_i64 {
shape.push(element.try_into().ok()?);
}
match array_type {
Type::Int => {
let values = parse_int_array_from_parquet_row(row, "elements")?;
let values = parse_int_array_from_parquet_row(row, NDARRAY_ELEMENTS_FIELD_NAME)?;
let array_impl = ArrayD::<i64>::from_shape_vec(shape, values).ok()?;
Some(Value::from(array_impl))
}
Type::Float => {
let values = parse_float_array_from_parquet_row(row, "elements")?;
let values = parse_float_array_from_parquet_row(row, NDARRAY_ELEMENTS_FIELD_NAME)?;
let array_impl = ArrayD::<f64>::from_shape_vec(shape, values).ok()?;
Some(Value::from(array_impl))
}

View File

@ -24,6 +24,7 @@ use std::thread::sleep;
use std::time::Duration;
use arcstr::ArcStr;
use aws_sdk_dynamodb::error::BuildError as DynamoDBBuildError;
use deltalake::arrow::datatypes::DataType as ArrowDataType;
use deltalake::arrow::error::ArrowError;
use deltalake::datafusion::common::DataFusionError;
@ -48,6 +49,7 @@ use rumqttc::{
use tokio::runtime::Runtime as TokioRuntime;
use crate::async_runtime::create_async_tokio_runtime;
use crate::connectors::aws::dynamodb::AwsRequestError;
use crate::connectors::data_format::{
create_bincoded_value, serialize_value_to_json, FormatterContext, FormatterError,
COMMIT_LITERAL,
@ -630,6 +632,9 @@ pub enum WriteError {
#[error("delta table schema mismatch: {0}")]
DeltaTableSchemaMismatch(DeltaSchemaMismatchDetails),
#[error("table {0} doesn't exist in the destination storage")]
TableDoesNotExist(String),
#[error("table written in snapshot mode has a duplicate primary key: {0:?}")]
TableAlreadyContainsKey(Key),
@ -638,6 +643,18 @@ pub enum WriteError {
#[error("the snapshot of the existing data in the output delta table does not correspond to the schema: {0}")]
IncorrectInitialSnapshot(IncorrectSnapshotError),
#[error(transparent)]
AwsDynamoDBBuild(#[from] DynamoDBBuildError),
#[error(transparent)]
AwsRequest(#[from] AwsRequestError),
#[error("after several retried attempts, {0} items haven't been saved")]
SomeItemsNotDelivered(usize),
#[error("the type {0} can't be used in the index")]
NotIndexType(Type),
}
pub trait Writer: Send {
@ -1250,7 +1267,7 @@ impl PsqlWriter {
table_name: &str,
schema: &HashMap<String, Type>,
key_field_names: Option<&Vec<String>>,
mode: SqlWriterInitMode,
mode: TableWriterInitMode,
) -> Result<PsqlWriter, WriteError> {
let mut writer = PsqlWriter {
client,
@ -1265,17 +1282,17 @@ impl PsqlWriter {
pub fn initialize(
&mut self,
mode: SqlWriterInitMode,
mode: TableWriterInitMode,
table_name: &str,
schema: &HashMap<String, Type>,
key_field_names: Option<&Vec<String>>,
) -> Result<(), WriteError> {
match mode {
SqlWriterInitMode::Default => return Ok(()),
SqlWriterInitMode::Replace | SqlWriterInitMode::CreateIfNotExists => {
TableWriterInitMode::Default => return Ok(()),
TableWriterInitMode::Replace | TableWriterInitMode::CreateIfNotExists => {
let mut transaction = self.client.transaction()?;
if mode == SqlWriterInitMode::Replace {
if mode == TableWriterInitMode::Replace {
Self::drop_table_if_exists(&mut transaction, table_name)?;
}
Self::create_table_if_not_exists(
@ -1371,7 +1388,7 @@ impl PsqlWriter {
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum SqlWriterInitMode {
pub enum TableWriterInitMode {
Default,
CreateIfNotExists,
Replace,

View File

@ -17,6 +17,7 @@ use std::time::{Duration, SystemTime};
use timely::dataflow::operators::probe::Handle;
pub mod adaptors;
pub mod aws;
pub mod data_format;
pub mod data_lake;
pub mod data_storage;
@ -549,7 +550,7 @@ impl Connector {
persistent_storage.as_ref(),
snapshot_access,
)
.map_err(EngineError::SnapshotWriterError)?;
.map_err(|e| EngineError::SnapshotWriterError(Box::new(e)))?;
let input_thread_handle = thread::Builder::new()
.name(thread_name)

View File

@ -120,7 +120,7 @@ pub enum Error {
},
#[error("snapshot writer failed: {0}")]
SnapshotWriterError(#[source] WriteError),
SnapshotWriterError(#[source] Box<WriteError>),
#[error("reader failed: {0:?}")]
ReaderFailed(#[source] ReadError),

View File

@ -18,6 +18,7 @@ use crate::persistence::frontier::OffsetAntichain;
use async_nats::connect as nats_connect;
use async_nats::Client as NatsClient;
use async_nats::Subscriber as NatsSubscriber;
use aws_sdk_dynamodb::Client as DynamoDBClient;
use azure_storage::StorageCredentials as AzureStorageCredentials;
use csv::ReaderBuilder as CsvReaderBuilder;
use elasticsearch::{
@ -83,6 +84,7 @@ use self::external_index_wrappers::{
};
use self::threads::PythonThreadState;
use crate::connectors::aws::DynamoDBWriter;
use crate::connectors::data_format::{
BsonFormatter, DebeziumDBType, DebeziumMessageParser, DsvSettings, Formatter,
IdentityFormatter, IdentityParser, InnerSchemaField, JsonLinesFormatter, JsonLinesParser,
@ -103,7 +105,7 @@ use crate::connectors::data_storage::{
KafkaWriter, LakeWriter, MessageQueueTopic, MongoWriter, MqttReader, MqttWriter, NatsReader,
NatsWriter, NullWriter, ObjectDownloader, PsqlWriter, PythonConnectorEventType,
PythonReaderBuilder, QuestDBAtColumnPolicy, QuestDBWriter, RdkafkaWatermark, ReadError,
ReadMethod, ReaderBuilder, SqlWriterInitMode, SqliteReader, WriteError, Writer,
ReadMethod, ReaderBuilder, SqliteReader, TableWriterInitMode, WriteError, Writer,
MQTT_CLIENT_MAX_CHANNEL_SIZE,
};
use crate::connectors::data_tokenize::{BufReaderTokenizer, CsvTokenizer, Tokenize};
@ -712,18 +714,18 @@ impl<'py> IntoPyObject<'py> for MonitoringLevel {
}
}
impl<'py> FromPyObject<'py> for SqlWriterInitMode {
impl<'py> FromPyObject<'py> for TableWriterInitMode {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
Ok(ob.extract::<PyRef<PySqlWriterInitMode>>()?.0)
Ok(ob.extract::<PyRef<PyTableWriterInitMode>>()?.0)
}
}
impl<'py> IntoPyObject<'py> for SqlWriterInitMode {
impl<'py> IntoPyObject<'py> for TableWriterInitMode {
type Target = PyAny;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
PySqlWriterInitMode(self).into_bound_py_any(py)
PyTableWriterInitMode(self).into_bound_py_any(py)
}
}
@ -2043,17 +2045,17 @@ impl PyMonitoringLevel {
pub const ALL: MonitoringLevel = MonitoringLevel::All;
}
#[pyclass(module = "pathway.engine", frozen, name = "SqlWriterInitMode")]
pub struct PySqlWriterInitMode(SqlWriterInitMode);
#[pyclass(module = "pathway.engine", frozen, name = "TableWriterInitMode")]
pub struct PyTableWriterInitMode(TableWriterInitMode);
#[pymethods]
impl PySqlWriterInitMode {
impl PyTableWriterInitMode {
#[classattr]
pub const DEFAULT: SqlWriterInitMode = SqlWriterInitMode::Default;
pub const DEFAULT: TableWriterInitMode = TableWriterInitMode::Default;
#[classattr]
pub const CREATE_IF_NOT_EXISTS: SqlWriterInitMode = SqlWriterInitMode::CreateIfNotExists;
pub const CREATE_IF_NOT_EXISTS: TableWriterInitMode = TableWriterInitMode::CreateIfNotExists;
#[classattr]
pub const REPLACE: SqlWriterInitMode = SqlWriterInitMode::Replace;
pub const REPLACE: TableWriterInitMode = TableWriterInitMode::Replace;
}
#[pyclass(module = "pathway.engine", frozen)]
@ -4312,7 +4314,7 @@ pub struct DataStorage {
database: Option<String>,
start_from_timestamp_ms: Option<i64>,
namespace: Option<Vec<String>>,
sql_writer_init_mode: SqlWriterInitMode,
table_writer_init_mode: TableWriterInitMode,
topic_name_index: Option<usize>,
partition_columns: Option<Vec<String>>,
backfilling_thresholds: Option<Vec<BackfillingThreshold>>,
@ -4320,6 +4322,7 @@ pub struct DataStorage {
delta_optimizer_rule: Option<PyDeltaOptimizerRule>,
mqtt_settings: Option<MqttSettings>,
only_provide_metadata: bool,
sort_key_index: Option<usize>,
}
#[pyclass(module = "pathway.engine", frozen, name = "PersistenceMode")]
@ -4780,7 +4783,7 @@ impl DataStorage {
database = None,
start_from_timestamp_ms = None,
namespace = None,
sql_writer_init_mode = SqlWriterInitMode::Default,
table_writer_init_mode = TableWriterInitMode::Default,
topic_name_index = None,
partition_columns = None,
backfilling_thresholds = None,
@ -4788,6 +4791,7 @@ impl DataStorage {
delta_optimizer_rule = None,
mqtt_settings = None,
only_provide_metadata = false,
sort_key_index = None,
))]
#[allow(clippy::too_many_arguments)]
fn new(
@ -4816,7 +4820,7 @@ impl DataStorage {
database: Option<String>,
start_from_timestamp_ms: Option<i64>,
namespace: Option<Vec<String>>,
sql_writer_init_mode: SqlWriterInitMode,
table_writer_init_mode: TableWriterInitMode,
topic_name_index: Option<usize>,
partition_columns: Option<Vec<String>>,
backfilling_thresholds: Option<Vec<BackfillingThreshold>>,
@ -4824,6 +4828,7 @@ impl DataStorage {
delta_optimizer_rule: Option<PyDeltaOptimizerRule>,
mqtt_settings: Option<MqttSettings>,
only_provide_metadata: bool,
sort_key_index: Option<usize>,
) -> Self {
DataStorage {
storage_type,
@ -4851,7 +4856,7 @@ impl DataStorage {
database,
start_from_timestamp_ms,
namespace,
sql_writer_init_mode,
table_writer_init_mode,
topic_name_index,
partition_columns,
backfilling_thresholds,
@ -4859,6 +4864,7 @@ impl DataStorage {
delta_optimizer_rule,
mqtt_settings,
only_provide_metadata,
sort_key_index,
}
}
@ -5759,7 +5765,7 @@ impl DataStorage {
self.table_name()?,
&data_format.value_fields_type_map(py),
data_format.key_field_names.as_ref(),
self.sql_writer_init_mode,
self.table_writer_init_mode,
)
.map_err(|e| {
PyIOError::new_err(format!("Unable to initialize PostgreSQL table: {e}"))
@ -6036,6 +6042,41 @@ impl DataStorage {
Ok(Box::new(writer))
}
fn construct_dynamodb_writer(
&self,
py: pyo3::Python,
data_format: &DataFormat,
license: Option<&License>,
) -> PyResult<Box<dyn Writer>> {
if let Some(license) = license {
license.check_entitlements(["dynamodb"])?;
}
let runtime = create_async_tokio_runtime()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create async runtime: {e}")))?;
let config = runtime.block_on(async { ::aws_config::load_from_env().await });
let table_name = self.table_name()?;
let client = DynamoDBClient::new(&config);
let writer = DynamoDBWriter::new(
runtime,
client,
table_name.to_string(),
data_format
.value_fields
.iter()
.map(|f| f.borrow(py).clone())
.collect(),
self.key_field_index
.ok_or_else(|| PyValueError::new_err("'key_field_index' must be specified"))?,
self.sort_key_index,
self.table_writer_init_mode,
)
.map_err(|e| PyValueError::new_err(format!("Failed to create DynamoDB writer: {e}")))?;
Ok(Box::new(writer))
}
fn construct_writer(
&self,
py: pyo3::Python,
@ -6054,6 +6095,7 @@ impl DataStorage {
"iceberg" => self.construct_iceberg_writer(py, data_format, license),
"mqtt" => self.construct_mqtt_writer(),
"questdb" => self.construct_questdb_writer(py, data_format, license),
"dynamodb" => self.construct_dynamodb_writer(py, data_format, license),
other => Err(PyValueError::new_err(format!(
"Unknown data sink {other:?}"
))),
@ -6647,7 +6689,7 @@ fn engine(_py: Python<'_>, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<PyKeyGenerationPolicy>()?;
m.add_class::<PyReadMethod>()?;
m.add_class::<PyMonitoringLevel>()?;
m.add_class::<PySqlWriterInitMode>()?;
m.add_class::<PyTableWriterInitMode>()?;
m.add_class::<Universe>()?;
m.add_class::<Column>()?;
m.add_class::<LegacyTable>()?;