Release 0.7.0

Co-authored-by: Michał Bartoszkiewicz <embe@pathway.com>
Co-authored-by: Jan Chorowski <janek@pathway.com>
Co-authored-by: Xavier Gendre <xavier@pathway.com>
Co-authored-by: Adrian Kosowski <adrian@pathway.com>
Co-authored-by: Jakub Kowalski <kuba@pathway.com>
Co-authored-by: Sergey Kulik <sergey@pathway.com>
Co-authored-by: Mateusz Lewandowski <mateusz@pathway.com>
Co-authored-by: Mohamed Malhou <mohamed@pathway.com>
Co-authored-by: Krzysztof Nowicki <krzysiek@pathway.com>
Co-authored-by: Richard Pelgrim <richard.pelgrim@pathway.com>
Co-authored-by: Kamil Piechowiak <kamil@pathway.com>
Co-authored-by: Paweł Podhajski <pawel.podhajski@pathway.com>
Co-authored-by: Olivier Ruas <olivier@pathway.com>
Co-authored-by: Przemysław Uznański <przemek@pathway.com>
Co-authored-by: Sebastian Włudzik <sebastian.wludzik@pathway.com>
GitOrigin-RevId: 71c8b3e511c0ea3b530530ca733d3b1cb717a198
This commit is contained in:
Manul from Pathway 2023-11-16 11:05:33 +01:00
parent 20816a63ba
commit 4fca74bb8c
56 changed files with 1277 additions and 735 deletions

View File

@ -2,10 +2,19 @@
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- class `Behavior`, a superclass of all behavior classes
- class `ExactlyOnceBehavior` indicating we want to create a `CommonBehavior` that results in each window prodcucing exactly one output (shifted in time by an optional `shift` parameter)
- function `exactly_once_behavior` creating an instance of `ExactlyOnceBehavior`
### Changed
- **BREAKING**: `WindowBehavior` is now called `CommonBehavior`, as it can be also used with interval joins
- **BREAKING**: `window_behavior` is now called `common_behavior`, as it can be also used with interval joins
- Deprecating parameter `keep_queries` in `pw.io.http.rest_connector`. Now `delete_completed_queries` with an opposite meaning should be used instead. The default is still `delete_completed_queries=True` (equivalent to `keep_queries=False`) but it will soon be required to be set explicitly.
## [0.6.0] - 2023-11-10
### Added

View File

@ -26,6 +26,8 @@ After a pull request is opened it will be reviewed, and merged after
passing continuous integration tests and being accepted by a project or
sub-system maintainer.
We maintain a [Changelog](https://github.com/pathwaycom/pathway/blob/main/CHANGELOG.md) where all notable changes to this project are documented. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
We ask that developers sign our [contributor license
agreement](https://cla-assistant.io/pathwaycom/pathway). The
process of signing the CLA is automated, and you'll be prompted with instructions

2
Cargo.lock generated
View File

@ -1434,7 +1434,7 @@ dependencies = [
[[package]]
name = "pathway"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arc-swap",
"arcstr",

View File

@ -1,6 +1,6 @@
[package]
name = "pathway"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
publish = false
rust-version = "1.72.0"

View File

@ -16,11 +16,22 @@
alt="follow on Twitter"></a>
<a href="https://linkedin.com/company/pathway">
<img src="https://img.shields.io/badge/pathway-0077B5?style=social&logo=linkedin" alt="follow on LinkedIn"></a>
<br>
<a href="#getting-started">Getting Started</a> |
<a href="#example">Example</a> |
<a href="#performance">Performance</a> |
<a href="#deployment">Deployment</a> |
<a href="#resources">Resources</a> |
<a href="https://pathway.com/developers/">Documentation</a> |
<a href="https://pathway.com/blog/">Blog</a> |
<a href="#get-help">Get Help</a>
</p>
# Pathway
# Pathway<a id="pathway"></a>
[Pathway](https://pathway.com) is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.
@ -37,7 +48,10 @@ In Pathway, data is represented in the form of Tables. Live data streams are als
For any questions, you will find the community and team behind the project [on Discord](https://discord.com/invite/pathway).
## Installation
## Getting started<a id="getting-started"></a>
### Installation<a id="installation"></a>
Pathway requires Python 3.10 or above.
@ -49,7 +63,7 @@ $ pip install -U pathway
⚠️ Pathway is available on MacOS and Linux. Users of other systems should run Pathway on a Virtual Machine.
## Getting started
### Running Pathway locally<a id="running-pathway-locally"></a>
To use Pathway, you only need to import it:
@ -78,7 +92,7 @@ $ pathway spawn --threads 3 python main.py
To jumpstart a Pathway project, you can use our [cookiecutter template](https://github.com/pathwaycom/cookiecutter-pathway).
### Example
### Example<a id="example"></a>
```python
import pathway as pw
@ -99,7 +113,18 @@ pw.run()
Run this example [in Google Colab](https://colab.research.google.com/drive/1kLx5-vKKg0IeQ88ydS-ehtrxSujEZrXK?usp=sharing)!
## Monitoring Pathway
## Deployment<a id="deployment"></a>
Do you feel limited by a local run?
If you want to scale your Pathway application, you may be interested in our Pathway for Enterprise.
Pathway for Enterprise is specially tailored towards end-to-end data processing and real time intelligent analytics.
It scales using distributed computing on the cloud and supports Kubernetes deployment.
You can learn more about the features of Pathway for Enterprise on our [website](https://pathway.com/features).
If you are interested, don't hesitate to [contact us](mailto:contact@pathway.com) to learn more.
## Monitoring Pathway<a id="monitoring-pathway"></a>
Pathway comes with a monitoring dashboard that allows you to keep track of the number of messages sent by each connector and the latency of the system. The dashboard also includes log messages.
@ -109,18 +134,18 @@ This dashboard is enabled by default; you can disable it by passing `monitoring_
In addition to Pathway's built-in dashboard, you can [use Prometheus](https://pathway.com/developers/tutorials/prometheus-monitoring) to monitor your Pathway application.
## Resources
## Resources<a id="resources"></a>
See also: **[Pathway Developer Resources](https://pathway.com/developers/)** webpage (including API Docs).
See also: **📖 [Pathway Documentation](https://pathway.com/developers/)** webpage (including API Docs).
### Videos about Pathway
### Videos about Pathway<a id="videos-about-pathway"></a>
[▶️ Building an LLM Application without a vector database](https://www.youtube.com/watch?v=kcrJSk00duw) - by [Jan Chorowski](https://scholar.google.com/citations?user=Yc94070AAAAJ) (7min 56s)
[▶️ Linear regression on a Kafka Stream](https://vimeo.com/805069039) - by [Richard Pelgrim](https://twitter.com/richardpelgrim) (7min 53s)
[▶️ Introduction to reactive data processing](https://pathway.com/developers/user-guide/introduction/welcome) - by [Adrian Kosowski](https://scholar.google.com/citations?user=om8De_0AAAAJ) (27min 54s)
### Guides
### Guides<a id="guides"></a>
- [Core concepts of Pathway](https://pathway.com/developers/user-guide/introduction/key-concepts/)
- [Basic operations](https://pathway.com/developers/user-guide/introduction/survival-guide/)
- [Joins](https://pathway.com/developers/user-guide/table-operations/join-manual/)
@ -132,7 +157,7 @@ See also: **[Pathway Developer Resources](https://pathway.com/developers/)** web
- [API docs](https://pathway.com/developers/api-docs/pathway)
- [Troubleshooting](https://pathway.com/developers/user-guide/introduction/troubleshooting/)
### Tutorials
### Tutorials<a id="tutorials"></a>
- [Linear regression on a Kafka Stream](https://pathway.com/developers/tutorials/linear_regression_with_kafka/) ([video](https://vimeo.com/805069039))
- Joins:
- [Interval joins](https://pathway.com/developers/tutorials/fleet_eta_interval_join/)
@ -147,12 +172,12 @@ See also: **[Pathway Developer Resources](https://pathway.com/developers/)** web
- [Monitoring Pathway with Prometheus](https://pathway.com/developers/tutorials/prometheus-monitoring/)
- [Time between events in a multi-topic event stream](https://pathway.com/developers/tutorials/event_stream_processing_time_between_occurrences/)
### Showcases
### Showcases<a id="showcases"></a>
- [Realtime Twitter Analysis App](https://pathway.com/developers/showcases/twitter/)
- [Realtime classification with Nearest Neighbors](https://pathway.com/developers/showcases/lsh/lsh_chapter1/)
- [Realtime Fuzzy joins](https://pathway.com/developers/showcases/fuzzy_join/fuzzy_join_chapter1/)
### External and community content
### External and community content<a id="external-and-community-content"></a>
- [Real-time linear regression (Data Engineering Weekly)](https://pathway.com/developers/tutorials/unlocking-data-stream-processing-1/)
- [Realtime server logs monitoring (Data Engineering Weekly)](https://pathway.com/developers/tutorials/unlocking-data-stream-processing-2/)
- [Data enrichment with fuzzy joins (Data Engineering Weekly)](https://pathway.com/developers/tutorials/unlocking-data-stream-processing-3/)
@ -160,13 +185,13 @@ See also: **[Pathway Developer Resources](https://pathway.com/developers/)** web
If you would like to share with us some Pathway-related content, please give an admin a shout on [Discord](https://discord.gg/pathway).
### Manul conventions
### Manul conventions<a id="manul-conventions"></a>
Manuls (aka Pallas's Cats) [are creatures with fascinating habits](https://www.youtube.com/watch?v=rlSTBvViflc). As a tribute to them, we usually read `pw`, one of the most frequent tokens in Pathway code, as: `"paw"`.
<img src="https://d14l3brkh44201.cloudfront.net/PathwayManul.svg" alt="manul" width="50px"></img>
## Performance
## Performance<a id="performance"></a>
Pathway is made to outperform state-of-the-art technologies designed for streaming and batch data processing tasks, including: Flink, Spark, and Kafka Streaming. It also makes it possible to implement a lot of algorithms/UDF's in streaming mode which are not readily supported by other streaming frameworks (especially: temporal joins, iterative graph algorithms, machine learning routines).
@ -176,29 +201,39 @@ If you are curious, here are [some benchmarks to play with](https://github.com/p
If you try your own benchmarks, please don't hesitate to let us know. We investigate situations in which Pathway is underperforming on par with bugs (i.e., to our knowledge, they shouldn't happen...).
## Coming soon
## Coming soon<a id="coming-soon"></a>
Pathway continues to evolve and gain new capabilities. Here are some exciting new features that we plan to incorporate in the near future:
Here are some features we plan to incorporate in the near future:
- Enhanced monitoring, observability, and data drift detection (integrates with Grafana visualization and other dashboarding tools).
- New connectors: interoperability with Delta Lake and Snowflake data sources.
- Easier connection setup for MongoDB.
- More performant garbage collection.
Stay up to date with the latest developments and news surrounding Pathway on [our blog](https://pathway.com/blog/), or [subscribe to our newsletter].
## Dependencies
## Dependencies<a id="dependencies"></a>
Pathway is made to run in a "clean" Linux/MacOS + Python environment. When installing the pathway package with `pip` (from a wheel), you are likely to encounter a small number of Python package dependencies, such as sqlglot (used in the SQL API) and python-sat (useful for resolving dependencies during compilation). All necessary Rust crates are pre-built; the Rust compiler is not required to install Pathway, unless building from sources. A modified version of Timely/Differential Dataflow (which provides a dataflow assembly layer) is part of this repo.
## License
## License<a id="license"></a>
Pathway is distributed on a [BSL 1.1 License](https://github.com/pathwaycom/pathway/blob/main/LICENSE.txt) which allows for unlimited non-commercial use, as well as use of the Pathway package [for most commercial purposes](https://pathway.com/license/), free of charge. Code in this repository automatically converts to Open Source (Apache 2.0 License) after 4 years. Some [public repos](https://github.com/pathwaycom) which are complementary to this one (examples, libraries, connectors, etc.) are licensed as Open Source, under the MIT license.
## Contribution guidelines
## Contribution guidelines<a id="contribution-guidelines"></a>
If you develop a library or connector which you would like to integrate with this repo, we suggest releasing it first as a separate repo on a MIT/Apache 2.0 license.
For all concerns regarding core Pathway functionalities, Issues are encouraged. For further information, don't hesitate to engage with Pathway's [Discord community](https://discord.gg/pathway).
## Get Help<a id="get-help"></a>
If you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to:
- Check out the [documentation](https://pathway.com/developers/) for detailed information.
- [Open an issue on GitHub](https://github.com/pathwaycom/pathway/issues) if you encounter any bugs or have feature requests.
- Join us on [Discord](https://discord.com/invite/pathway) to connect with other users and get support.
- Reach out to us via email at [contact@pathway.com](mailto:contact@pathway.com).
Our team is always happy to help you and ensure that you get the most out of Pathway.
If you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us.

View File

@ -32,17 +32,21 @@ def test_server(tmp_path: pathlib.Path):
def target():
time.sleep(5)
requests.post(
r = requests.post(
f"http://127.0.0.1:{port}",
json={"query": "one", "user": "sergey"},
).raise_for_status()
requests.post(
)
r.raise_for_status()
assert r.text == '"ONE"', r.text
r = requests.post(
f"http://127.0.0.1:{port}",
json={"query": "two", "user": "sergey"},
).raise_for_status()
)
r.raise_for_status()
assert r.text == '"TWO"', r.text
queries, response_writer = pw.io.http.rest_connector(
host="127.0.0.1", port=port, schema=InputSchema
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=True
)
responses = logic(queries)
response_writer(responses)
@ -80,7 +84,11 @@ def test_server_customization(tmp_path: pathlib.Path):
).raise_for_status()
queries, response_writer = pw.io.http.rest_connector(
host="127.0.0.1", port=port, schema=InputSchema, route="/endpoint"
host="127.0.0.1",
port=port,
schema=InputSchema,
route="/endpoint",
delete_completed_queries=True,
)
responses = logic(queries)
response_writer(responses)
@ -118,7 +126,7 @@ def test_server_schema_customization(tmp_path: pathlib.Path):
).raise_for_status()
queries, response_writer = pw.io.http.rest_connector(
host="127.0.0.1", port=port, schema=InputSchema
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=True
)
responses = logic(queries)
response_writer(responses)
@ -151,7 +159,7 @@ def test_server_keep_queries(tmp_path: pathlib.Path):
).raise_for_status()
queries, response_writer = pw.io.http.rest_connector(
host="127.0.0.1", port=port, schema=InputSchema, delete_queries=False
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=False
)
response_writer(queries.select(query_id=queries.id, result=pw.this.v))

View File

@ -122,7 +122,7 @@ def table_to_pandas(table: Table):
@runtime_type_check
@trace_user_frame
def _table_from_pandas(
def table_from_pandas(
df: pd.DataFrame,
id_from: list[str] | None = None,
unsafe_trusted_ids: bool = False,
@ -205,6 +205,10 @@ def table_to_parquet(table: Table, filename: str | PathLike):
return df.to_parquet(filename)
# XXX: clean this up
table_from_markdown = parse_to_table
class _EmptyConnectorSubject(ConnectorSubject):
def run(self):
pass
@ -215,7 +219,7 @@ class StreamGenerator:
events: dict[tuple[str, int], list[api.SnapshotEvent]] = {}
def _get_next_persistent_id(self) -> str:
return str(next(self._persistent_id))
return str(f"_stream_generator_{next(self._persistent_id)}")
def _advance_time_for_all_workers(
self, persistent_id: str, workers: Iterable[int], timestamp: int
@ -283,6 +287,15 @@ class StreamGenerator:
batches: list[dict[int, list[dict[str, api.Value]]]],
schema: type[Schema],
) -> Table:
"""
A function that creates a table from a list of batches, where each batch is a mapping
from worker id to a list of rows processed by this worker in this batch.
Each row is a mapping from column name to a value.
Args:
batches: list of batches to be put in the table
schema: schema of the table
"""
key = itertools.count()
schema, api_schema = read_schema(schema=schema)
value_fields: list[api.ValueField] = api_schema["value_fields"]
@ -313,6 +326,14 @@ class StreamGenerator:
batches: list[list[dict[str, api.Value]]],
schema: type[Schema],
) -> Table:
"""
A function that creates a table from a list of batches, where each batch is a list of
rows in this batch. Each row is a mapping from column name to a value.
Args:
batches: list of batches to be put in the table
schema: schema of the table
"""
batches_by_worker = [{0: batch} for batch in batches]
return self.table_from_list_of_batches_by_workers(batches_by_worker, schema)
@ -323,6 +344,12 @@ class StreamGenerator:
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
) -> Table:
"""
A function for creating a table from a pandas DataFrame. If the DataFrame
contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column.
Then ``_worker`` column will be interpreted as the id of a worker which will process the row and
``_diff`` column as an event type with ``1`` treated as inserting row and ``-1`` as removing.
"""
if schema is None:
schema = schema_from_pandas(
df, exclude_columns=["_time", "_diff", "_worker"]
@ -337,11 +364,6 @@ class StreamGenerator:
if "_diff" not in df:
df["_diff"] = [1] * len(df)
persistent_id = self._get_next_persistent_id()
workers = set(df["_worker"])
for worker in workers:
self.events[(persistent_id, worker)] = []
batches: dict[
int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]]
] = {}
@ -380,10 +402,21 @@ class StreamGenerator:
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
) -> Table:
"""
A function for creating a table from its definition in markdown. If it
contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column.
Then ``_worker`` column will be interpreted as the id of a worker which will process the row and
``_diff`` column as an event type - with ``1`` treated as inserting row and ``-1`` as removing.
"""
df = _markdown_to_pandas(table)
return self.table_from_pandas(df, id_from, unsafe_trusted_ids, schema)
def persistence_config(self) -> persistence.Config | None:
"""
Returns a persistece config to be used during run. Needs to be passed to ``pw.run``
so that tables created using StreamGenerator are filled with data.
"""
if len(self.events) == 0:
return None
return persistence.Config.simple_config(
@ -391,73 +424,3 @@ class StreamGenerator:
snapshot_access=api.SnapshotAccess.REPLAY,
replay_mode=api.ReplayMode.SPEEDRUN,
)
stream_generator = StreamGenerator()
def table_from_list_of_batches_by_workers(
batches: list[dict[int, list[dict[str, api.Value]]]],
schema: type[Schema],
) -> Table:
"""
A function that creates a table from a list of batches, where each batch is a mapping
from worker id to a list of rows processed by this worker in this batch.
Each row is a mapping from column name to a value.
Args:
batches: list of batches to be put in the table
schema: schema of the table
"""
return stream_generator.table_from_list_of_batches_by_workers(batches, schema)
def table_from_list_of_batches(
batches: list[list[dict[str, api.Value]]],
schema: type[Schema],
) -> Table:
"""
A function that creates a table from a list of batches, where each batch is a list of
rows in this batch. Each row is a mapping from column name to a value.
Args:
batches: list of batches to be put in the table
schema: schema of the table
"""
return stream_generator.table_from_list_of_batches(batches, schema)
def table_from_pandas(
df: pd.DataFrame,
id_from: list[str] | None = None,
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
):
"""
A function for creating a table from a pandas DataFrame. If the DataFrame
contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column.
Then ``_worker`` column will be interpreted as the id of a worker which will process the row and
``_diff`` column as an event type with ``1`` treated as inserting row and ``-1`` as removing.
"""
if "_time" in df:
return stream_generator.table_from_pandas(
df, id_from, unsafe_trusted_ids, schema
)
else:
return _table_from_pandas(df, id_from, unsafe_trusted_ids, schema)
def table_from_markdown(
table_def: str,
id_from: list[str] | None = None,
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
) -> Table:
"""
A function for creating a table from its definition in markdown. If it
contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column.
Then ``_worker`` column will be interpreted as the id of a worker which will process the row and
``_diff`` column as an event type - with ``1`` treated as inserting row and ``-1`` as removing.
"""
df = _markdown_to_pandas(table_def)
return table_from_pandas(df, id_from, unsafe_trusted_ids, schema)

View File

@ -126,11 +126,11 @@ def join_kwargs_handler(*, allow_how: bool, allow_id: bool):
if "behavior" in kwargs:
behavior = processed_kwargs["behavior"] = kwargs.pop("behavior")
from pathway.stdlib.temporal import WindowBehavior
from pathway.stdlib.temporal import CommonBehavior
if not isinstance(behavior, WindowBehavior):
if not isinstance(behavior, CommonBehavior):
raise ValueError(
"The behavior argument of join should be of type pathway.temporal.WindowBehavior."
"The behavior argument of join should be of type pathway.temporal.CommonBehavior."
)
if kwargs:

View File

@ -14,15 +14,16 @@ import pathway.internals as pw
from pathway.internals import column_properties as cp, dtype as dt, trace
from pathway.internals.expression import ColumnExpression, ColumnReference
from pathway.internals.helpers import SetOnceProperty, StableSet
from pathway.internals.parse_graph import G
from pathway.internals.universe import Universe
if TYPE_CHECKING:
from pathway.internals.expression import InternalColRef
from pathway.internals.operator import OutputHandle
from pathway.internals.table import Table
from pathway.internals.universe import Universe
@dataclass(eq=False, frozen=True)
@dataclass(frozen=True)
class Lineage:
source: OutputHandle
"""Source handle."""
@ -32,7 +33,7 @@ class Lineage:
return self.source.operator.trace
@dataclass(eq=False, frozen=True)
@dataclass(frozen=True)
class ColumnLineage(Lineage):
name: str
"""Original name of a column."""
@ -63,7 +64,7 @@ class Column(ABC):
self._trace = trace.Trace.from_traceback()
def column_dependencies(self) -> StableSet[Column]:
return StableSet([self])
return StableSet([])
@property
def trace(self) -> trace.Trace:
@ -117,7 +118,11 @@ class ColumnWithContext(Column, ABC):
context: Context
def __init__(self, context: Context, universe: Universe):
def __init__(
self,
context: Context,
universe: Universe,
):
super().__init__(universe)
self.context = context
@ -143,6 +148,20 @@ class IdColumn(ColumnWithContext):
return dt.POINTER
class MaterializedIdColumn(IdColumn):
context: MaterializedContext
def __init__(
self, context: MaterializedContext, properties: cp.ColumnProperties
) -> None:
super().__init__(context)
self._properties = properties
@property
def properties(self) -> cp.ColumnProperties:
return self._properties
class ColumnWithExpression(ColumnWithContext):
"""Column holding expression and context."""
@ -210,26 +229,31 @@ class ContextTable:
assert all((column.universe == self.universe) for column in self.columns)
def _create_internal_table(
columns: Iterable[Column], universe: Universe, context: Context
) -> Table:
def _create_internal_table(columns: Iterable[Column], context: Context) -> Table:
from pathway.internals.table import Table
columns_dict = {f"{i}": column for i, column in enumerate(columns)}
return Table(columns_dict, universe, id_column=IdColumn(context))
return Table(columns_dict, context=context)
@dataclass(eq=False, frozen=True)
class Context:
class Context(ABC):
"""Context of the column evaluation.
Context will be mapped to proper evaluator based on its type.
"""
universe: Universe
"""Resulting universe."""
_column_properties_evaluator: ClassVar[type[cp.ColumnPropertiesEvaluator]]
@cached_property
def id_column(self) -> IdColumn:
return IdColumn(self)
@property
@abstractmethod
def universe(self) -> Universe:
...
def column_dependencies_external(self) -> Iterable[Column]:
return []
@ -240,7 +264,8 @@ class Context:
# columns depend on columns in their context, not dependencies of columns in context
return StableSet(
chain(
self.column_dependencies_external(), self.column_dependencies_internal()
self.column_dependencies_external(),
self.column_dependencies_internal(),
)
)
@ -269,21 +294,17 @@ class Context:
dependencies = list(self.column_dependencies_internal())
if len(dependencies) == 0:
return []
universe = None
context = None
columns: list[ColumnWithContext] = []
for column in dependencies:
assert isinstance(
column, ColumnWithContext
), f"Column {column} that is not ColumnWithContext appeared in column_dependencies_internal()"
assert universe is None or universe == column.universe
assert context is None or context == column.context
columns.append(column)
universe = column.universe
context = column.context
assert universe is not None
assert context is not None
return [_create_internal_table(columns, universe, context)]
return [_create_internal_table(columns, context)]
def column_properties(self, column: ColumnWithContext) -> cp.ColumnProperties:
return self._column_properties_evaluator().eval(column)
@ -306,16 +327,42 @@ class RowwiseContext(
):
"""Context for basic expressions."""
_id_column: IdColumn
def column_dependencies_external(self) -> Iterable[Column]:
return [self._id_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.universe]
@property
def universe(self) -> Universe:
return self._id_column.universe
@dataclass(eq=False, frozen=True)
class MaterializedContext(Context):
_universe: Universe
_universe_properties: cp.ColumnProperties = cp.ColumnProperties(dtype=dt.POINTER)
def universe_dependencies(self) -> Iterable[Universe]:
return [self.universe]
@property
def id_column(self) -> MaterializedIdColumn:
return MaterializedIdColumn(self, self._universe_properties)
@property
def universe(self) -> Universe:
return self._universe
@dataclass(eq=False, frozen=True)
class GradualBroadcastContext(Context):
orig_id_column: IdColumn
lower_column: ColumnWithExpression
value_column: ColumnWithExpression
upper_column: ColumnWithExpression
apx_value_column: MaterializedColumn
def column_dependencies_internal(self) -> Iterable[Column]:
return [self.lower_column, self.value_column, self.upper_column]
@ -323,6 +370,14 @@ class GradualBroadcastContext(Context):
def universe_dependencies(self) -> Iterable[Universe]:
return [self.universe, self.value_column.universe]
@cached_property
def apx_value_column(self):
return MaterializedColumn(self.universe, cp.ColumnProperties(dtype=dt.FLOAT))
@property
def universe(self) -> Universe:
return self.orig_id_column.universe
@dataclass(eq=False, frozen=True)
class TableRestrictedRowwiseContext(RowwiseContext):
@ -353,6 +408,10 @@ class GroupedContext(Context):
def universe_dependencies(self) -> Iterable[Universe]:
return [self.inner_context.universe]
@cached_property
def universe(self) -> Universe:
return Universe()
@dataclass(eq=False, frozen=True)
class FilterContext(
@ -361,28 +420,42 @@ class FilterContext(
"""Context of `table.filter() operation."""
filtering_column: ColumnWithExpression
universe_to_filter: Universe
id_column_to_filter: IdColumn
def column_dependencies_internal(self) -> Iterable[Column]:
return [self.filtering_column]
def column_dependencies_external(self) -> Iterable[Column]:
return [self.id_column_to_filter]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.universe_to_filter]
return [self.id_column_to_filter.universe]
@cached_property
def universe(self) -> Universe:
return self.id_column_to_filter.universe.subset()
@dataclass(eq=False, frozen=True)
class TimeColumnContext(Context):
"""Context of operations that use time columns."""
orig_universe: Universe
orig_id_column: IdColumn
threshold_column: ColumnWithExpression
time_column: ColumnWithExpression
def column_dependencies_internal(self) -> Iterable[Column]:
return [self.threshold_column, self.time_column]
def column_dependencies_external(self) -> Iterable[Column]:
return [self.orig_id_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe]
return [self.orig_id_column.universe]
@cached_property
def universe(self) -> Universe:
return self.orig_id_column.universe.subset()
@dataclass(eq=False, frozen=True)
@ -396,20 +469,34 @@ class ForgetContext(TimeColumnContext):
class ForgetImmediatelyContext(Context):
"""Context of `table._forget_immediately operation."""
orig_universe: Universe
orig_id_column: IdColumn
def column_dependencies_external(self) -> Iterable[Column]:
return [self.orig_id_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe]
return [self.orig_id_column.universe]
@cached_property
def universe(self) -> Universe:
return self.orig_id_column.universe.subset()
@dataclass(eq=False, frozen=True)
class FilterOutForgettingContext(Context):
"""Context of `table._filter_out_results_of_forgetting() operation."""
orig_universe: Universe
orig_id_column: IdColumn
def column_dependencies_external(self) -> Iterable[Column]:
return [self.orig_id_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe]
return [self.orig_id_column.universe]
@cached_property
def universe(self) -> Universe:
return self.orig_id_column.universe.superset()
@dataclass(eq=False, frozen=True)
@ -434,66 +521,96 @@ class ReindexContext(Context):
def universe_dependencies(self) -> Iterable[Universe]:
return [self.reindex_column.universe]
@cached_property
def universe(self) -> Universe:
return Universe()
@dataclass(eq=False, frozen=True)
class IxContext(Context):
"""Context of `table.ix() operation."""
orig_universe: Universe
key_column: Column
orig_id_column: IdColumn
optional: bool
def column_dependencies_external(self) -> Iterable[Column]:
return [self.key_column]
return [self.orig_id_column, self.key_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.universe, self.orig_universe]
return [self.universe, self.orig_id_column.universe]
@cached_property
def universe(self) -> Universe:
return self.key_column.universe
@dataclass(eq=False, frozen=True)
class IntersectContext(Context):
"""Context of `table.intersect() operation."""
intersecting_universes: tuple[Universe, ...]
intersecting_ids: tuple[IdColumn, ...]
def __post_init__(self):
assert len(self.intersecting_universes) > 0
assert len(self.intersecting_ids) > 0
def universe_dependencies(self) -> Iterable[Universe]:
return self.intersecting_universes
return [c.universe for c in self.intersecting_ids]
@cached_property
def universe(self) -> Universe:
return G.universe_solver.get_intersection(
*[c.universe for c in self.intersecting_ids]
)
@dataclass(eq=False, frozen=True)
class RestrictContext(Context):
"""Context of `table.restrict() operation."""
orig_universe: Universe
orig_id_column: IdColumn
_universe: Universe
def column_dependencies_external(self) -> Iterable[Column]:
return [self.orig_id_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe, self.universe]
return [self.orig_id_column.universe, self.universe]
@cached_property
def universe(self) -> Universe:
return self._universe
@dataclass(eq=False, frozen=True)
class DifferenceContext(Context):
"""Context of `table.difference() operation."""
left: Universe
right: Universe
left: IdColumn
right: IdColumn
def universe_dependencies(self) -> Iterable[Universe]:
return [self.left, self.right]
return [self.left.universe, self.right.universe]
@cached_property
def universe(self) -> Universe:
return G.universe_solver.get_difference(self.left.universe, self.right.universe)
@dataclass(eq=False, frozen=True)
class HavingContext(Context):
orig_universe: Universe
orig_id_column: IdColumn
key_column: Column
def column_dependencies_external(self) -> Iterable[Column]:
return [self.key_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe, self.key_column.universe]
return [self.orig_id_column.universe, self.key_column.universe]
@cached_property
def universe(self) -> Universe:
return self.key_column.universe.subset()
@dataclass(eq=False, frozen=True)
@ -501,41 +618,60 @@ class UpdateRowsContext(Context):
"""Context of `table.update_rows()` and related operations."""
updates: dict[str, Column]
union_universes: tuple[Universe, ...]
union_ids: tuple[IdColumn, ...]
def __post_init__(self):
assert len(self.union_universes) > 0
assert len(self.union_ids) > 0
def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]:
return StableSet([self.updates[ref.name]])
def universe_dependencies(self) -> Iterable[Universe]:
return self.union_universes
return [c.universe for c in self.union_ids]
@cached_property
def universe(self) -> Universe:
return G.universe_solver.get_union(*[c.universe for c in self.union_ids])
@dataclass(eq=False, frozen=True)
class UpdateCellsContext(UpdateRowsContext):
class UpdateCellsContext(Context):
left: IdColumn
right: IdColumn
updates: dict[str, Column]
def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]:
if ref.name in self.updates:
return super().reference_column_dependencies(ref)
return StableSet([self.updates[ref.name]])
return StableSet()
def universe_dependencies(self) -> Iterable[Universe]:
return [self.left.universe, self.right.universe]
@property
def universe(self) -> Universe:
return self.left.universe
@dataclass(eq=False, frozen=True)
class ConcatUnsafeContext(Context):
"""Context of `table.concat_unsafe()`."""
updates: tuple[dict[str, Column], ...]
union_universes: tuple[Universe, ...]
union_ids: tuple[IdColumn, ...]
def __post_init__(self):
assert len(self.union_universes) > 0
assert len(self.union_ids) > 0
def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]:
return StableSet([update[ref.name] for update in self.updates])
def universe_dependencies(self) -> Iterable[Universe]:
return self.union_universes
return [c.universe for c in self.union_ids]
@cached_property
def universe(self) -> Universe:
return G.universe_solver.get_union(*[c.universe for c in self.union_ids])
@dataclass(eq=False, frozen=True)
@ -544,16 +680,24 @@ class PromiseSameUniverseContext(
):
"""Context of table.unsafe_promise_same_universe_as() operation."""
orig_universe: Universe
orig_id_column: IdColumn
_id_column: IdColumn
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe, self.universe]
return [self.orig_id_column.universe, self.universe]
@cached_property
def universe(self) -> Universe:
return self._id_column.universe
@dataclass(eq=True, frozen=True)
class JoinContext(Context):
"""Context of `table.join() operation."""
"""Context for building inner table of a join, where all columns from left and right
are properly unrolled. Uses JoinTypeInterpreter to properly evaluate which columns
should be optionalized."""
_universe: Universe
left_table: pw.Table
right_table: pw.Table
on_left: ContextTable
@ -579,32 +723,37 @@ class JoinContext(Context):
return [
_create_internal_table(
self.on_left.columns,
self.on_left.universe,
self.left_table._table_restricted_context,
),
_create_internal_table(
self.on_right.columns,
self.on_right.universe,
self.right_table._table_restricted_context,
),
]
@cached_property
def universe(self) -> Universe:
return self._universe
@dataclass(eq=False, frozen=True)
class JoinRowwiseContext(RowwiseContext):
"""Context for actually evaluating join expressions."""
temporary_column_to_original: dict[InternalColRef, InternalColRef]
original_column_to_temporary: dict[InternalColRef, ColumnReference]
@staticmethod
def from_mapping(
universe: Universe,
id_column: IdColumn,
columns_mapping: dict[InternalColRef, ColumnReference],
) -> JoinRowwiseContext:
temporary_column_to_original = {}
for orig_colref, expression in columns_mapping.items():
temporary_column_to_original[expression._to_internal()] = orig_colref
temporary_column_to_original = {
expression._to_internal(): orig_colref
for orig_colref, expression in columns_mapping.items()
}
return JoinRowwiseContext(
universe, temporary_column_to_original, columns_mapping.copy()
id_column, temporary_column_to_original, columns_mapping.copy()
)
def _get_type_interpreter(self):
@ -620,15 +769,13 @@ class FlattenContext(Context):
"""Context of `table.flatten() operation."""
orig_universe: Universe
flatten_column: Column
flatten_result_column: MaterializedColumn
flatten_column: ColumnWithExpression
def column_dependencies_external(self) -> Iterable[Column]:
return [self.flatten_column]
@staticmethod
def get_flatten_column_dtype(flatten_column: ColumnWithExpression):
dtype = flatten_column.dtype
def _get_flatten_column_dtype(self):
dtype = self.flatten_column.dtype
if isinstance(dtype, dt.List):
return dtype.wrapped
if isinstance(dtype, dt.Tuple):
@ -645,12 +792,25 @@ class FlattenContext(Context):
return dt.ANY
else:
raise TypeError(
f"Cannot flatten column {flatten_column.expression!r} of type {dtype}."
f"Cannot flatten column {self.flatten_column.expression!r} of type {dtype}."
)
def universe_dependencies(self) -> Iterable[Universe]:
return [self.orig_universe]
@cached_property
def universe(self) -> Universe:
return Universe()
@cached_property
def flatten_result_column(self) -> Column:
return MaterializedColumn(
self.universe,
cp.ColumnProperties(
dtype=self._get_flatten_column_dtype(),
),
)
@dataclass(eq=False, frozen=True)
class SortingContext(Context):
@ -658,11 +818,25 @@ class SortingContext(Context):
key_column: ColumnWithExpression
instance_column: ColumnWithExpression
prev_column: MaterializedColumn
next_column: MaterializedColumn
def column_dependencies_internal(self) -> Iterable[Column]:
return [self.key_column, self.instance_column]
def universe_dependencies(self) -> Iterable[Universe]:
return [self.universe]
@cached_property
def universe(self) -> Universe:
return self.key_column.universe
@cached_property
def prev_column(self) -> Column:
return MaterializedColumn(
self.universe, cp.ColumnProperties(dtype=dt.Optional(dt.POINTER))
)
@cached_property
def next_column(self) -> Column:
return MaterializedColumn(
self.universe, cp.ColumnProperties(dtype=dt.Optional(dt.POINTER))
)

View File

@ -40,5 +40,4 @@ class PreserveDependenciesPropsEvaluator(ColumnPropertiesEvaluator):
return all(
getattr(col.properties, name) == value
for col in column.column_dependencies()
if col != column
)

View File

@ -131,12 +131,9 @@ class TableReplacementWithNoneDesugaring(IdentityTransform):
class TableCallbackDesugaring(DesugaringTransform):
table_like: table.TableLike
table_like: table.TableLike | groupby.GroupedJoinable
def __init__(self, table_like: table.TableLike):
from pathway.internals import table
assert isinstance(table_like, table.TableLike)
def __init__(self, table_like: table.TableLike | groupby.GroupedJoinable):
self.table_like = table_like
@abstractmethod

View File

@ -37,8 +37,6 @@ class GraphRunner:
default_logging: bool = True,
persistence_config: PersistenceConfig | None = None,
) -> None:
from pathway.debug import stream_generator
self._graph = input_graph
self.debug = debug
if ignore_asserts is None:
@ -47,11 +45,7 @@ class GraphRunner:
self.monitoring_level = monitoring_level
self.with_http_server = with_http_server
self.default_logging = default_logging
self.persistence_config = (
persistence_config
or environ.get_replay_config()
or stream_generator.persistence_config()
)
self.persistence_config = persistence_config or environ.get_replay_config()
def run_tables(
self,
@ -102,7 +96,6 @@ class GraphRunner:
for operator in context.nodes
if isinstance(operator, ContextualizedIntermediateOperator)
]
monitoring_level = self.monitoring_level.to_internal()
with new_event_loop() as event_loop, monitor_stats(

View File

@ -1001,7 +1001,7 @@ class JoinEvaluator(ExpressionEvaluator, context_type=clmn.JoinContext):
def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table:
joined_storage = self.run_join(self.context.universe, *input_storages)
rowwise_evaluator = RowwiseEvaluator(
clmn.RowwiseContext(self.context.universe),
clmn.RowwiseContext(self.context.id_column),
self.scope,
self.state,
self.scope_context,
@ -1117,7 +1117,7 @@ class UpdateRowsEvaluator(ExpressionEvaluator, context_type=clmn.UpdateRowsConte
return self.scope.update_rows_table(input_table, update_input_table, properties)
class UpdateCellsEvaluator(UpdateRowsEvaluator, context_type=clmn.UpdateCellsContext):
class UpdateCellsEvaluator(ExpressionEvaluator, context_type=clmn.UpdateCellsContext):
context: clmn.UpdateCellsContext
def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table:

View File

@ -162,7 +162,7 @@ class NoNewColumnsMultipleSourcesPathEvaluator(
) -> Storage:
context = self.context
output_columns_list = list(output_columns)
source_universe = context.union_universes[0]
source_universe = context.union_ids[0].universe
# ensure that keeping structure is possible,
# i.e. all sources have the same path to required columns
keep_structure = True
@ -328,7 +328,7 @@ class PromiseSameUniversePathEvaluator(
input_storages: dict[Universe, Storage],
) -> Storage:
paths: dict[clmn.Column, ColumnPath] = {}
orig_storage = input_storages[self.context.orig_universe]
orig_storage = input_storages[self.context.orig_id_column.universe]
new_storage = input_storages[self.context.universe]
for column in output_columns:
if (

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from collections.abc import Callable, Iterable
import pathway.internals.graph_runner.expression_evaluator as evaluator
from pathway.internals import api, column, table, universe
from pathway.internals.column_path import ColumnPath
from pathway.internals.graph_runner.path_storage import Storage
@ -19,7 +18,6 @@ class ScopeState:
legacy_tables: dict[table.Table, api.LegacyTable]
universes: dict[universe.Universe, api.Universe]
computers: list[Callable]
evaluators: dict[column.Context, evaluator.ExpressionEvaluator]
tables: dict[universe.Universe, api.Table]
storages: dict[universe.Universe, Storage]
@ -28,7 +26,6 @@ class ScopeState:
self.columns = {}
self.universes = {}
self.computers = []
self.evaluators = {}
self.legacy_tables = {}
self.tables = {}
self.storages = {}
@ -142,16 +139,6 @@ class ScopeState:
def get_computer_logic(self, id: int) -> Callable:
return self.computers[id]
def get_or_create_evaluator(
self,
context: column.Context,
evaluator_factory: Callable[[column.Context], evaluator.ExpressionEvaluator],
):
if context not in self.evaluators:
evaluator = evaluator_factory(context)
self.evaluators[context] = evaluator
return self.evaluators[context]
def get_table(self, key: Storage) -> api.Table:
return self.tables[key._universe]

View File

@ -8,7 +8,6 @@ from collections.abc import Iterable, Iterator
from functools import lru_cache
from typing import TYPE_CHECKING
from pathway.internals import universes
from pathway.internals.expression_visitor import IdentityTransform
from pathway.internals.trace import trace_user_frame
@ -17,7 +16,7 @@ if TYPE_CHECKING:
import pathway.internals.column as clmn
import pathway.internals.expression as expr
from pathway.internals import table, table_like, thisclass
from pathway.internals import table, thisclass
from pathway.internals.arg_handlers import arg_handler, reduce_args_handler
from pathway.internals.decorators import contextualized_operator
from pathway.internals.desugaring import (
@ -34,14 +33,15 @@ from pathway.internals.parse_graph import G
from pathway.internals.universe import Universe
class GroupedJoinable(DesugaringContext, table_like.TableLike, OperatorInput):
class GroupedJoinable(DesugaringContext, OperatorInput):
_substitution: dict[thisclass.ThisMetaclass, table.Joinable]
_joinable_to_group: table.Joinable
_universe: Universe
def __init__(self, _universe: Universe, _substitution, _joinable: table.Joinable):
super().__init__(_universe)
self._substitution = _substitution
self._joinable_to_group = _joinable
self._universe = _universe
@property
def _desugaring(self) -> TableReduceDesugaring:
@ -198,13 +198,11 @@ class GroupedTable(GroupedJoinable, OperatorInput):
def _reduce(self, **kwargs: expr.ColumnExpression) -> table.Table:
reduced_columns: dict[str, clmn.ColumnWithExpression] = {}
universe = Universe()
context = clmn.GroupedContext(
table=self._joinable_to_group,
universe=universe,
grouping_columns=tuple(self._grouping_columns),
set_id=self._set_id,
inner_context=self._joinable_to_group._context,
inner_context=self._joinable_to_group._rowwise_context,
sort_by=self._sort_by,
)
@ -214,10 +212,9 @@ class GroupedTable(GroupedJoinable, OperatorInput):
result: table.Table = table.Table(
columns=reduced_columns,
universe=universe,
id_column=clmn.IdColumn(context),
context=context,
)
universes.promise_are_equal(result, self)
G.universe_solver.register_as_equal(self._universe, result._universe)
return result
def _validate_expression(self, expression: expr.ColumnExpression):

View File

@ -487,7 +487,7 @@ class JoinResult(Joinable, OperatorInput):
def __init__(
self,
_universe: Universe,
_context: clmn.Context,
_inner_table: Table,
_columns_mapping: dict[expr.InternalColRef, expr.ColumnReference],
_left_table: Table,
@ -498,7 +498,7 @@ class JoinResult(Joinable, OperatorInput):
_joined_on_names: StableSet[str],
_join_mode: JoinMode,
):
super().__init__(_universe)
super().__init__(_context)
self._inner_table = _inner_table
self._columns_mapping = _columns_mapping
self._left_table = _left_table
@ -658,17 +658,19 @@ class JoinResult(Joinable, OperatorInput):
filter_expression
)
inner_table = self._inner_table.filter(desugared_filter_expression)
new_columns_mapping = {}
for int_ref, expression in self._columns_mapping.items():
new_columns_mapping[int_ref] = inner_table[expression.name]
new_columns_mapping = {
int_ref: inner_table[expression.name]
for int_ref, expression in self._columns_mapping.items()
}
new_columns_mapping[inner_table.id._to_internal()] = inner_table.id
inner_table._context = clmn.JoinRowwiseContext.from_mapping(
inner_table._universe, new_columns_mapping
) # FIXME don't set _context property of table
context = clmn.JoinRowwiseContext.from_mapping(
inner_table._id_column, new_columns_mapping
)
inner_table._rowwise_context = context
return JoinResult(
_universe=inner_table._universe,
_context=context,
_inner_table=inner_table,
_columns_mapping=new_columns_mapping,
_left_table=self._left_table,
@ -817,8 +819,7 @@ class JoinResult(Joinable, OperatorInput):
return Table(
columns=columns,
universe=context.universe,
id_column=clmn.IdColumn(context),
context=context,
)
@staticmethod
@ -861,9 +862,12 @@ class JoinResult(Joinable, OperatorInput):
final_mapping[colref._to_internal()] = colref
final_mapping[inner_table.id._to_internal()] = inner_table.id
inner_table._context = clmn.JoinRowwiseContext.from_mapping(
inner_table._universe, final_mapping
) # FIXME don't set _context property of table
rowwise_context = clmn.JoinRowwiseContext.from_mapping(
inner_table._id_column, final_mapping
)
inner_table._rowwise_context = (
rowwise_context # FIXME don't set _context property of table
)
return (inner_table, final_mapping)
@ -964,7 +968,7 @@ class JoinResult(Joinable, OperatorInput):
common_column_names,
)
return JoinResult(
universe,
context,
inner_table,
columns_mapping,
left_table,

View File

@ -34,11 +34,14 @@ def schema_from_columns(
) -> type[Schema]:
if _name is None:
_name = "schema_from_columns(" + str(list(columns.keys())) + ")"
__dict = {
"__metaclass__": SchemaMetaclass,
"__annotations__": {name: c.dtype for name, c in columns.items()},
}
return _schema_builder(_name, __dict)
return schema_builder(
columns={
name: ColumnDefinition.from_properties(c.properties)
for name, c in columns.items()
},
name=_name,
)
def _type_converter(series: pd.Series) -> dt.DType:
@ -210,6 +213,18 @@ def _create_column_definitions(
return columns
def _universe_properties(
columns: list[ColumnSchema], schema_properties: SchemaProperties
) -> ColumnProperties:
append_only: bool = False
if len(columns) > 0:
append_only = any(c.append_only for c in columns)
elif schema_properties.append_only is not None:
append_only = schema_properties.append_only
return ColumnProperties(dtype=dt.POINTER, append_only=append_only)
@dataclass(frozen=True)
class SchemaProperties:
append_only: bool | None = None
@ -219,13 +234,15 @@ class SchemaMetaclass(type):
__columns__: dict[str, ColumnSchema]
__dtypes__: dict[str, dt.DType]
__types__: dict[str, Any]
__universe_properties__: ColumnProperties
@trace.trace_user_frame
def __init__(self, *args, append_only: bool | None = None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.__columns__ = _create_column_definitions(
self, SchemaProperties(append_only=append_only)
schema_properties = SchemaProperties(append_only=append_only)
self.__columns__ = _create_column_definitions(self, schema_properties)
self.__universe_properties__ = _universe_properties(
list(self.__columns__.values()), schema_properties
)
self.__dtypes__ = {
name: column.dtype for name, column in self.__columns__.items()
@ -241,6 +258,10 @@ class SchemaMetaclass(type):
def column_names(self) -> list[str]:
return list(self.keys())
@property
def universe_properties(self) -> ColumnProperties:
return self.__universe_properties__
def column_properties(self, name: str) -> ColumnProperties:
column = self.__columns__[name]
return ColumnProperties(dtype=column.dtype, append_only=column.append_only)
@ -510,6 +531,10 @@ class ColumnDefinition:
def __post_init__(self):
assert self.dtype is None or isinstance(self.dtype, dt.DType)
@classmethod
def from_properties(cls, properties: ColumnProperties) -> ColumnDefinition:
return cls(dtype=properties.dtype, append_only=properties.append_only)
def column_definition(
*,

View File

@ -17,7 +17,6 @@ from pathway.internals.arg_handlers import (
reduce_args_handler,
select_args_handler,
)
from pathway.internals.column_properties import ColumnProperties
from pathway.internals.decorators import (
contextualized_operator,
empty_from_schema,
@ -99,27 +98,26 @@ class Table(
)
_columns: dict[str, clmn.Column]
_context: clmn.RowwiseContext
_schema: type[Schema]
_id_column: clmn.IdColumn
_rowwise_context: clmn.RowwiseContext
_source: SetOnceProperty[OutputHandle] = SetOnceProperty()
"""Lateinit by operator."""
def __init__(
self,
columns: Mapping[str, clmn.Column],
universe: Universe,
context: clmn.Context,
schema: type[Schema] | None = None,
id_column: clmn.IdColumn | None = None,
):
if schema is None:
schema = schema_from_columns(columns)
super().__init__(universe)
super().__init__(context)
self._columns = dict(columns)
self._schema = schema
self._context = clmn.RowwiseContext(self._universe)
self._id_column = id_column or clmn.IdColumn(self._context)
self._id_column = context.id_column
self._substitution = {thisclass.this: self}
self._rowwise_context = clmn.RowwiseContext(self._id_column)
@property
def id(self) -> expr.ColumnReference:
@ -520,8 +518,7 @@ class Table(
filtering_column = self._eval(filter_expression)
assert self._universe == filtering_column.universe
universe = self._universe.subset()
context = clmn.FilterContext(universe, filtering_column, self._universe)
context = clmn.FilterContext(filtering_column, self._id_column)
return self._table_with_context(context)
@ -550,23 +547,14 @@ class Table(
value_column,
upper_column,
):
apx_value = clmn.MaterializedColumn(
self._universe, ColumnProperties(dtype=dt.FLOAT)
)
context = clmn.GradualBroadcastContext(
self._universe,
self._id_column,
threshold_table._eval(lower_column),
threshold_table._eval(value_column),
threshold_table._eval(upper_column),
apx_value_column=apx_value,
)
return Table(
columns={"apx_value": apx_value},
universe=context.universe,
id_column=clmn.IdColumn(context),
)
return Table(columns={"apx_value": context.apx_value_column}, context=context)
@trace_user_frame
@desugar
@ -578,10 +566,8 @@ class Table(
time_column: expr.ColumnExpression,
mark_forgetting_records: bool,
) -> Table:
universe = self._universe.subset()
context = clmn.ForgetContext(
universe,
self._universe,
self._id_column,
self._eval(threshold_column),
self._eval(time_column),
mark_forgetting_records,
@ -595,11 +581,7 @@ class Table(
def _forget_immediately(
self,
) -> Table:
universe = self._universe.subset()
context = clmn.ForgetImmediatelyContext(
universe,
self._universe,
)
context = clmn.ForgetImmediatelyContext(self._id_column)
return self._table_with_context(context)
@trace_user_frame
@ -609,14 +591,10 @@ class Table(
def _filter_out_results_of_forgetting(
self,
) -> Table:
universe = self._universe.superset()
# The output universe is a superset of input universe because forgetting entries
# are filtered out. At each point in time, the set of keys with +1 diff can be
# bigger than a set of keys with +1 diff in an input table.
context = clmn.FilterOutForgettingContext(
universe,
self._universe,
)
context = clmn.FilterOutForgettingContext(self._id_column)
return self._table_with_context(context)
@trace_user_frame
@ -628,10 +606,8 @@ class Table(
threshold_column: expr.ColumnExpression,
time_column: expr.ColumnExpression,
) -> Table:
universe = self._universe.subset()
context = clmn.FreezeContext(
universe,
self._universe,
self._id_column,
self._eval(threshold_column),
self._eval(time_column),
)
@ -646,10 +622,8 @@ class Table(
threshold_column: expr.ColumnExpression,
time_column: expr.ColumnExpression,
) -> Table:
universe = self._universe.subset()
context = clmn.BufferContext(
universe,
self._universe,
self._id_column,
self._eval(threshold_column),
self._eval(time_column),
)
@ -687,11 +661,9 @@ class Table(
age | owner | pet
10 | Alice | 1
"""
universe = G.universe_solver.get_difference(self._universe, other._universe)
context = clmn.DifferenceContext(
universe=universe,
left=self._universe,
right=other._universe,
left=self._id_column,
right=other._id_column,
)
return self._table_with_context(context)
@ -734,14 +706,14 @@ class Table(
)
universe = G.universe_solver.get_intersection(*intersecting_universes)
if universe in intersecting_universes:
context: clmn.Context = clmn.RestrictContext(
universe=universe,
orig_universe=self._universe,
)
context: clmn.Context = clmn.RestrictContext(self._id_column, universe)
else:
intersecting_ids = (
self._id_column,
*tuple(table._id_column for table in tables),
)
context = clmn.IntersectContext(
universe=universe,
intersecting_universes=intersecting_universes,
intersecting_ids=intersecting_ids,
)
return self._table_with_context(context)
@ -789,10 +761,7 @@ class Table(
"Table.restrict(): other universe has to be a subset of self universe."
+ "Consider using Table.promise_universe_is_subset_of() to assert it."
)
context = clmn.RestrictContext(
universe=other._universe,
orig_universe=self._universe,
)
context = clmn.RestrictContext(self._id_column, other._universe)
columns = {
name: self._wrap_column_in_context(context, column, name)
@ -801,8 +770,7 @@ class Table(
return Table(
columns=columns,
universe=other._universe,
id_column=clmn.IdColumn(context),
context=context,
)
@contextualized_operator
@ -832,14 +800,11 @@ class Table(
"""
columns = {
name: self._wrap_column_in_context(self._context, column, name)
name: self._wrap_column_in_context(self._rowwise_context, column, name)
for name, column in self._columns.items()
}
return Table(
columns=columns,
universe=self._universe,
)
return Table(columns=columns, context=self._rowwise_context)
@trace_user_frame
@desugar
@ -1041,11 +1006,8 @@ class Table(
key_expression: expr.ColumnReference,
optional: bool,
) -> Table:
key_universe_table = key_expression._table
universe = key_universe_table._universe
key_column = key_expression._column
context = clmn.IxContext(universe, self._universe, key_column, optional)
context = clmn.IxContext(key_column, self._id_column, optional)
return self._table_with_context(context)
@ -1160,32 +1122,21 @@ class Table(
@trace_user_frame
@contextualized_operator
def _concat(self, *others: Table[TSchema]) -> Table[TSchema]:
union_universes = (self._universe, *(other._universe for other in others))
if not G.universe_solver.query_are_disjoint(*union_universes):
union_ids = (self._id_column, *(other._id_column for other in others))
if not G.universe_solver.query_are_disjoint(*(c.universe for c in union_ids)):
raise ValueError(
"Universes of the arguments of Table.concat() have to be disjoint.\n"
+ "Consider using Table.promise_universes_are_disjoint() to assert it.\n"
+ "(However, untrue assertion might result in runtime errors.)"
)
universe = G.universe_solver.get_union(*union_universes)
context = clmn.ConcatUnsafeContext(
universe=universe,
union_universes=union_universes,
union_ids=union_ids,
updates=tuple(
{col_name: other._columns[col_name] for col_name in self.keys()}
for other in others
),
)
columns = {
name: self._wrap_column_in_context(context, column, name)
for name, column in self._columns.items()
}
ret: Table = Table(
columns=columns,
universe=universe,
id_column=clmn.IdColumn(context),
)
return ret
return self._table_with_context(context)
@trace_user_frame
@runtime_type_check
@ -1259,24 +1210,12 @@ class Table(
+ "Consider using Table.promise_is_subset_of() to assert this.\n"
+ "(However, untrue assertion might result in runtime errors.)"
)
union_universes = [self._universe]
if other._universe != self._universe:
union_universes.append(other._universe)
context = clmn.UpdateCellsContext(
universe=self._universe,
union_universes=tuple(union_universes),
left=self._id_column,
right=other._id_column,
updates={name: other._columns[name] for name in other.keys()},
)
columns = {
name: self._wrap_column_in_context(context, column, name)
for name, column in self._columns.items()
}
return Table(
columns=columns,
universe=self._universe,
id_column=clmn.IdColumn(context),
)
return self._table_with_context(context)
@trace_user_frame
@runtime_type_check
@ -1332,36 +1271,27 @@ class Table(
key: dt.types_lca(self.schema.__dtypes__[key], other.schema.__dtypes__[key])
for key in self.keys()
}
return Table._update_rows(
self.cast_to_types(**schema), other.cast_to_types(**schema)
)
union_universes = (self._universe, other._universe)
universe = G.universe_solver.get_union(*union_universes)
if universe == self._universe:
return Table._update_cells(
self.cast_to_types(**schema), other.cast_to_types(**schema)
)
else:
return Table._update_rows(
self.cast_to_types(**schema), other.cast_to_types(**schema)
)
@trace_user_frame
@contextualized_operator
@runtime_type_check
def _update_rows(self, other: Table[TSchema]) -> Table[TSchema]:
union_universes = (self._universe, other._universe)
universe = G.universe_solver.get_union(*union_universes)
context_cls = (
clmn.UpdateCellsContext
if universe == self._universe
else clmn.UpdateRowsContext
)
context = context_cls(
universe=universe,
union_universes=union_universes,
union_ids = (self._id_column, other._id_column)
context = clmn.UpdateRowsContext(
updates={col_name: other._columns[col_name] for col_name in self.keys()},
union_ids=union_ids,
)
columns = {
name: self._wrap_column_in_context(context, column, name)
for name, column in self._columns.items()
}
ret: Table = Table(
columns=columns,
universe=universe,
id_column=clmn.IdColumn(context),
)
return ret
return self._table_with_context(context)
@trace_user_frame
@desugar
@ -1497,19 +1427,9 @@ class Table(
reindex_column = self._eval(new_index)
assert self._universe == reindex_column.universe
universe = Universe()
context = clmn.ReindexContext(universe, reindex_column)
context = clmn.ReindexContext(reindex_column)
columns = {
name: self._wrap_column_in_context(context, column, name)
for name, column in self._columns.items()
}
return Table(
columns=columns,
universe=universe,
id_column=clmn.IdColumn(context),
)
return self._table_with_context(context)
@trace_user_frame
@desugar
@ -1559,7 +1479,9 @@ class Table(
columns_wrapped = {
name: self._wrap_column_in_context(
self._context, column, mapping[name] if name in mapping else name
self._rowwise_context,
column,
mapping[name] if name in mapping else name,
)
for name, column in renamed_columns.items()
}
@ -1704,7 +1626,7 @@ class Table(
assert isinstance(col, str)
new_columns.pop(col)
columns_wrapped = {
name: self._wrap_column_in_context(self._context, column, name)
name: self._wrap_column_in_context(self._rowwise_context, column, name)
for name, column in new_columns.items()
}
return self._with_same_universe(*columns_wrapped.items())
@ -1760,11 +1682,9 @@ class Table(
@contextualized_operator
@runtime_type_check
def _having(self, indexer: expr.ColumnReference) -> Table[TSchema]:
universe = indexer.table._universe.subset()
context = clmn.HavingContext(
universe=universe, orig_universe=self._universe, key_column=indexer._column
orig_id_column=self._id_column, key_column=indexer._column
)
return self._table_with_context(context)
@trace_user_frame
@ -1856,18 +1776,9 @@ class Table(
flatten_column = self._columns[flatten_name]
assert isinstance(flatten_column, clmn.ColumnWithExpression)
universe = Universe()
flatten_result_column = clmn.MaterializedColumn(
universe,
ColumnProperties(
dtype=clmn.FlattenContext.get_flatten_column_dtype(flatten_column),
),
)
context = clmn.FlattenContext(
universe=universe,
orig_universe=self._universe,
flatten_column=flatten_column,
flatten_result_column=flatten_result_column,
)
columns = {
@ -1878,11 +1789,10 @@ class Table(
return Table(
columns={
flatten_name: flatten_result_column,
flatten_name: context.flatten_result_column,
**columns,
},
universe=universe,
id_column=clmn.IdColumn(context),
context=context,
)
@trace_user_frame
@ -1944,33 +1854,23 @@ class Table(
^RT0AZWX... | David | 35 | 90 | ^EDPSSB1... |
^T0B95XH... | Eve | 15 | 80 | | ^GBSDEEW...
"""
if not isinstance(instance, expr.ColumnExpression):
instance = expr.ColumnConstExpression(instance)
prev_column = clmn.MaterializedColumn(
self._universe, ColumnProperties(dtype=dt.Optional(dt.POINTER))
)
next_column = clmn.MaterializedColumn(
self._universe, ColumnProperties(dtype=dt.Optional(dt.POINTER))
)
instance = clmn.ColumnExpression._wrap(instance)
context = clmn.SortingContext(
self._universe,
self._eval(key),
self._eval(instance),
prev_column,
next_column,
)
return Table(
columns={
"prev": prev_column,
"next": next_column,
"prev": context.prev_column,
"next": context.next_column,
},
universe=self._universe,
id_column=clmn.IdColumn(context),
context=context,
)
def _set_source(self, source: OutputHandle):
self._source = source
self._id_column.lineage = clmn.ColumnLineage(name="id", source=source)
if not hasattr(self._id_column, "lineage"):
self._id_column.lineage = clmn.ColumnLineage(name="id", source=source)
for name, column in self._columns.items():
if not hasattr(column, "lineage"):
column.lineage = clmn.ColumnLineage(name=name, source=source)
@ -1980,17 +1880,8 @@ class Table(
@contextualized_operator
def _unsafe_promise_universe(self, other: TableLike) -> Table:
context = clmn.PromiseSameUniverseContext(other._universe, self._universe)
columns = {
name: self._wrap_column_in_context(context, column, name)
for name, column in self._columns.items()
}
return Table(
columns=columns,
universe=context.universe,
id_column=clmn.IdColumn(context),
)
context = clmn.PromiseSameUniverseContext(self._id_column, other._id_column)
return self._table_with_context(context)
def _validate_expression(self, expression: expr.ColumnExpression):
for dep in expression._dependencies_above_reducer():
@ -2027,20 +1918,19 @@ class Table(
return Table(
columns=columns,
universe=context.universe,
id_column=clmn.IdColumn(context),
context=context,
)
@functools.cached_property
def _table_restricted_context(self) -> clmn.TableRestrictedRowwiseContext:
return clmn.TableRestrictedRowwiseContext(self._universe, self)
return clmn.TableRestrictedRowwiseContext(self._id_column, self)
def _eval(
self, expression: expr.ColumnExpression, context: clmn.Context | None = None
) -> clmn.ColumnWithExpression:
"""Desugar expression and wrap it in given context."""
if context is None:
context = self._context
context = self._rowwise_context
column = expression._column_with_expression_cls(
context=context,
universe=context.universe,
@ -2051,6 +1941,7 @@ class Table(
@classmethod
def _from_schema(cls, schema: type[Schema]) -> Table:
universe = Universe()
context = clmn.MaterializedContext(universe, schema.universe_properties)
columns = {
name: clmn.MaterializedColumn(
universe,
@ -2058,7 +1949,7 @@ class Table(
)
for name in schema.column_names()
}
return cls(columns=columns, universe=universe, schema=schema)
return cls(columns=columns, schema=schema, context=context)
def __repr__(self) -> str:
return f"<pathway.Table schema={dict(self.typehints())}>"
@ -2070,9 +1961,8 @@ class Table(
) -> Table:
return Table(
columns=dict(columns),
universe=self._universe,
schema=schema,
id_column=clmn.IdColumn(self._context),
context=self._rowwise_context,
)
def _sort_columns_by_other(self, other: Table):
@ -2093,14 +1983,15 @@ class Table(
table_to_datasink(self, sink)
def _materialize(self, universe: Universe):
context = clmn.MaterializedContext(universe)
columns = {
name: clmn.MaterializedColumn(universe, column.properties)
for (name, column) in self._columns.items()
}
return Table(
columns=columns,
universe=universe,
schema=self.schema,
context=context,
)
@trace_user_frame
@ -2237,7 +2128,7 @@ class Table(
def eval_type(self, expression: expr.ColumnExpression) -> dt.DType:
return (
self._context._get_type_interpreter()
self._rowwise_context._get_type_interpreter()
.eval_expression(expression, state=TypeInterpreterState())
._dtype
)

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from typing import TypeVar
from pathway.internals import universes
from pathway.internals import column as clmn, universes
from pathway.internals.deprecation_meta import DeprecationSuperclass
from pathway.internals.runtime_type_check import runtime_type_check
from pathway.internals.universe import Universe
@ -37,9 +37,13 @@ class TableLike(DeprecationSuperclass):
"""
_universe: Universe
_context: clmn.Context
_id_column: clmn.IdColumn
def __init__(self, universe: Universe):
self._universe = universe
def __init__(self, context: clmn.Context):
self._context = context
self._universe = context.universe
self._id_column = context.id_column
@runtime_type_check
def promise_universes_are_disjoint(

View File

@ -526,6 +526,11 @@ class TypeInterpreter(IdentityTransform):
class JoinTypeInterpreter(TypeInterpreter):
"""This type interpreter is used by JoinContext.
It evaluates only column references, and is used to decide which columns
to optionalize when unrolling left and right table columns to internal table columns.
"""
left: Table
right: Table
optionalize_left: bool
@ -553,6 +558,10 @@ class JoinTypeInterpreter(TypeInterpreter):
class JoinRowwiseTypeInterpreter(TypeInterpreter):
"""Type interpreter for evaluating expressions in join.
Colrefs are already properly optionalized (dependning on type of join and
left/right table) and properly unrolled and stored in internal table."""
temporary_column_to_original: dict[expr.InternalColRef, expr.InternalColRef]
original_column_to_temporary: dict[expr.InternalColRef, expr.ColumnReference]

View File

@ -19,6 +19,10 @@ class UniverseSolver:
self.var_counter = itertools.count(start=1)
self.universe_vars = defaultdict(lambda: next(self.var_counter))
def register_as_equal(self, left: Universe, right: Universe) -> None:
self.register_as_subset(left, right)
self.register_as_subset(right, left)
def register_as_subset(self, subset: Universe, superset: Universe) -> None:
varA = self.universe_vars[subset]
varB = self.universe_vars[superset]

View File

@ -126,5 +126,4 @@ def promise_are_equal(self: TableLike, *others: TableLike) -> None:
15 | Alice | tortoise
"""
for other in others:
promise_is_subset_of(self, other)
promise_is_subset_of(other, self)
G.universe_solver.register_as_equal(self._universe, other._universe)

View File

@ -227,6 +227,21 @@ def read_schema(
)
def assert_schema_or_value_columns_not_none(
schema: type[Schema] | None,
value_columns: list[str] | None,
data_format_type: str | None = None,
):
if schema is None and value_columns is None:
if data_format_type == "dsv":
raise ValueError(
"Neither schema nor value_columns were specified. "
"Consider using `pw.schema_from_csv` for generating schema from a CSV file"
)
else:
raise ValueError("Neither schema nor value_columns were specified")
def construct_schema_and_data_format(
format: str,
*,
@ -272,6 +287,8 @@ def construct_schema_and_data_format(
parse_utf8=(format != "binary"),
)
assert_schema_or_value_columns_not_none(schema, value_columns, data_format_type)
if with_metadata:
if schema is not None:
schema |= MetadataSchema

View File

@ -226,7 +226,6 @@ named ``path`` that will show the full path to the file from where a row was fil
mode=internal_connector_mode(mode),
object_pattern=object_pattern,
persistent_id=persistent_id,
with_metadata=with_metadata,
)
else:
data_storage = api.DataStorage(
@ -236,7 +235,6 @@ named ``path`` that will show the full path to the file from where a row was fil
read_method=internal_read_method(format),
object_pattern=object_pattern,
persistent_id=persistent_id,
with_metadata=with_metadata,
)
schema, data_format = construct_schema_and_data_format(

View File

@ -6,6 +6,7 @@ import logging
from collections.abc import Callable
from typing import Any
from uuid import uuid4
from warnings import warn
from aiohttp import web
@ -18,7 +19,7 @@ class RestServerSubject(io.python.ConnectorSubject):
_host: str
_port: int
_loop: asyncio.AbstractEventLoop
_delete_queries: bool
_delete_completed_queries: bool
def __init__(
self,
@ -28,7 +29,7 @@ class RestServerSubject(io.python.ConnectorSubject):
loop: asyncio.AbstractEventLoop,
tasks: dict[Any, Any],
schema: type[pw.Schema],
delete_queries: bool,
delete_completed_queries: bool,
format: str = "raw",
) -> None:
super().__init__()
@ -38,7 +39,7 @@ class RestServerSubject(io.python.ConnectorSubject):
self._loop = loop
self._tasks = tasks
self._schema = schema
self._delete_queries = delete_queries
self._delete_completed_queries = delete_completed_queries
self._format = format
def run(self):
@ -75,7 +76,7 @@ class RestServerSubject(io.python.ConnectorSubject):
self._add(id, data)
response = await self._fetch_response(id, event)
if self._delete_queries:
if self._delete_completed_queries:
self._remove(id, data)
return web.json_response(status=200, data=response)
@ -98,7 +99,8 @@ def rest_connector(
route: str = "/",
schema: type[pw.Schema] | None = None,
autocommit_duration_ms=1500,
delete_queries: bool = False,
keep_queries: bool | None = None,
delete_completed_queries: bool | None = None,
) -> tuple[pw.Table, Callable]:
"""
Runs a lightweight HTTP server and inputs a collection from the HTTP endpoint,
@ -116,7 +118,8 @@ def rest_connector(
autocommit_duration_ms: the maximum time between two commits. Every
autocommit_duration_ms milliseconds, the updates received by the connector are
committed and pushed into Pathway's computation graph;
delete_queries: whether to send a deletion entry after the query is processed.
keep_queries: whether to keep queries after processing; defaults to False. [deprecated]
delete_completed_queries: whether to send a deletion entry after the query is processed.
Allows to remove it from the system if it is stored by operators such as ``join`` or ``groupby``;
Returns:
@ -124,6 +127,20 @@ def rest_connector(
response_writer: a callable, where the result table should be provided.
"""
if delete_completed_queries is None:
if keep_queries is None:
warn(
"delete_completed_queries arg of rest_connector should be set explicitly."
+ " It will soon be required."
)
delete_completed_queries = True
else:
warn(
"DEPRECATED: keep_queries arg of rest_connector is deprecated,"
+ " use delete_completed_queries with an opposite meaning instead."
)
delete_completed_queries = not keep_queries
loop = asyncio.new_event_loop()
tasks: dict[Any, Any] = {}
@ -141,7 +158,7 @@ def rest_connector(
loop=loop,
tasks=tasks,
schema=schema,
delete_queries=delete_queries,
delete_completed_queries=delete_completed_queries,
format=format,
),
schema=schema,

View File

@ -12,7 +12,12 @@ from pathway.internals.decorators import table_from_datasource
from pathway.internals.runtime_type_check import runtime_type_check
from pathway.internals.schema import Schema
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import RawDataSchema, get_data_format_type, read_schema
from pathway.io._utils import (
RawDataSchema,
assert_schema_or_value_columns_not_none,
get_data_format_type,
read_schema,
)
SUPPORTED_INPUT_FORMATS: set[str] = {
"json",
@ -175,6 +180,8 @@ computations from the moment they were terminated last time.
raise ValueError("raw format must not be used with value_columns property")
schema = RawDataSchema
assert_schema_or_value_columns_not_none(schema, value_columns, data_format_type)
schema, api_schema = read_schema(
schema=schema,
value_columns=value_columns,

View File

@ -263,13 +263,17 @@ def write(
... }
You want to send a Pathway table t to the Redpanda instance.
>>> import pathway as pw
>>> t = pw.debug.parse_to_table("age owner pet \\n 1 10 Alice dog \\n 2 9 Bob cat \\n 3 8 Alice cat")
To connect to the topic "animals" and send messages, the connector must be used \
as follows, depending on the format:
JSON version:
>>> import pathway as pw
>>> t = pw.io.redpanda.read(
>>> pw.io.redpanda.write(
... t,
... rdkafka_settings,
... "animals",
... format="json",

View File

@ -33,7 +33,7 @@ from ._window_join import (
window_join_outer,
window_join_right,
)
from .temporal_behavior import WindowBehavior, window_behavior
from .temporal_behavior import CommonBehavior, common_behavior
__all__ = [
"AsofJoinResult",
@ -65,6 +65,6 @@ __all__ = [
"tumbling",
"sliding",
"session",
"window_behavior",
"WindowBehavior",
"common_behavior",
"CommonBehavior",
]

View File

@ -24,7 +24,7 @@ from pathway.internals.runtime_type_check import runtime_type_check
from pathway.internals.thisclass import ThisMetaclass
from pathway.internals.trace import trace_user_frame
from .temporal_behavior import WindowBehavior
from .temporal_behavior import CommonBehavior
from .utils import IntervalType, TimeEventType, check_joint_types, get_default_shift
@ -156,7 +156,7 @@ class IntervalJoinResult(DesugaringContext):
@staticmethod
def _apply_temporal_behavior(
table: pw.Table, behavior: WindowBehavior | None
table: pw.Table, behavior: CommonBehavior | None
) -> pw.Table:
if behavior is not None:
if behavior.delay is not None:
@ -179,7 +179,7 @@ class IntervalJoinResult(DesugaringContext):
right_time_expression: pw.ColumnExpression,
interval: Interval,
*on: pw.ColumnExpression,
behavior: WindowBehavior | None = None,
behavior: CommonBehavior | None = None,
mode: pw.JoinMode,
):
"""Creates an IntervalJoinResult. To perform an interval join uses it uses two
@ -428,7 +428,7 @@ def interval_join(
other_time: pw.ColumnExpression,
interval: Interval,
*on: pw.ColumnExpression,
behavior: WindowBehavior | None = None,
behavior: CommonBehavior | None = None,
how: pw.JoinMode = pw.JoinMode.INNER,
) -> IntervalJoinResult:
"""Performs an interval join of self with other using a time difference
@ -545,7 +545,7 @@ def interval_join_inner(
other_time: pw.ColumnExpression,
interval: Interval,
*on: pw.ColumnExpression,
behavior: WindowBehavior | None = None,
behavior: CommonBehavior | None = None,
) -> IntervalJoinResult:
"""Performs an interval join of self with other using a time difference
and join expressions. If `self_time + lower_bound <=
@ -659,7 +659,7 @@ def interval_join_left(
other_time: pw.ColumnExpression,
interval: Interval,
*on: pw.ColumnExpression,
behavior: WindowBehavior | None = None,
behavior: CommonBehavior | None = None,
) -> IntervalJoinResult:
"""Performs an interval left join of self with other using a time difference
and join expressions. If `self_time + lower_bound <=
@ -778,7 +778,7 @@ def interval_join_right(
other_time: pw.ColumnExpression,
interval: Interval,
*on: pw.ColumnExpression,
behavior: WindowBehavior | None = None,
behavior: CommonBehavior | None = None,
) -> IntervalJoinResult:
"""Performs an interval right join of self with other using a time difference
and join expressions. If `self_time + lower_bound <=
@ -899,7 +899,7 @@ def interval_join_outer(
other_time: pw.ColumnExpression,
interval: Interval,
*on: pw.ColumnExpression,
behavior: WindowBehavior | None = None,
behavior: CommonBehavior | None = None,
) -> IntervalJoinResult:
"""Performs an interval outer join of self with other using a time difference
and join expressions. If `self_time + lower_bound <=

View File

@ -19,8 +19,19 @@ from pathway.internals.type_interpreter import eval_type
from ._interval_join import interval, interval_join
from ._window_join import WindowJoinResult
from .temporal_behavior import WindowBehavior
from .utils import IntervalType, TimeEventType, check_joint_types, get_default_shift
from .temporal_behavior import (
Behavior,
CommonBehavior,
ExactlyOnceBehavior,
common_behavior,
)
from .utils import (
IntervalType,
TimeEventType,
check_joint_types,
get_default_shift,
zero_length_interval,
)
class Window(ABC):
@ -29,7 +40,7 @@ class Window(ABC):
self,
table: pw.Table,
key: pw.ColumnExpression,
behavior: WindowBehavior | None,
behavior: Behavior | None,
shard: pw.ColumnExpression | None,
) -> pw.GroupedTable:
...
@ -100,7 +111,7 @@ class _SessionWindow(Window):
self,
table: pw.Table,
key: pw.ColumnExpression,
behavior: WindowBehavior | None,
behavior: Behavior | None,
shard: pw.ColumnExpression | None,
) -> pw.GroupedTable:
if self.max_gap is not None:
@ -307,7 +318,7 @@ class _SlidingWindow(Window):
self,
table: pw.Table,
key: pw.ColumnExpression,
behavior: WindowBehavior | None,
behavior: Behavior | None,
shard: pw.ColumnExpression | None,
) -> pw.GroupedTable:
check_joint_types(
@ -318,6 +329,7 @@ class _SlidingWindow(Window):
"window.offset": (self.offset, TimeEventType),
}
)
target = table.select(
_pw_window=pw.apply_with_type(
self._assign_windows,
@ -341,6 +353,31 @@ class _SlidingWindow(Window):
)
if behavior is not None:
if isinstance(behavior, ExactlyOnceBehavior):
duration: IntervalType
# that is split in two if-s, as it helps mypy figure out proper types
# one if impl left either self.ratio or self.duration as optionals
# which won't fit into the duration variable of type IntervalType
if self.duration is not None:
duration = self.duration
elif self.ratio is not None:
duration = self.ratio * self.hop
shift = (
self.shift
if self.shift is not None
else zero_length_interval(type(duration))
)
behavior = common_behavior(
duration + shift, shift, True # type:ignore
)
elif not isinstance(behavior, CommonBehavior):
raise ValueError(
f"behavior {behavior} unsupported in sliding/tumbling window"
)
if behavior.cutoff is not None:
cutoff_threshold = pw.this._pw_window_end + behavior.cutoff
target = target._freeze(cutoff_threshold, pw.this._pw_key)
if behavior.delay is not None:
target = target._buffer(
target._pw_window_start + behavior.delay, target._pw_key
@ -355,7 +392,6 @@ class _SlidingWindow(Window):
if behavior.cutoff is not None:
cutoff_threshold = pw.this._pw_window_end + behavior.cutoff
target = target._freeze(cutoff_threshold, pw.this._pw_key)
target = target._forget(
cutoff_threshold, pw.this._pw_key, behavior.keep_results
)
@ -470,7 +506,7 @@ class _IntervalsOverWindow(Window):
self,
table: pw.Table,
key: pw.ColumnExpression,
behavior: WindowBehavior | None,
behavior: CommonBehavior | None,
shard: pw.ColumnExpression | None,
) -> pw.GroupedTable:
check_joint_types(
@ -803,7 +839,7 @@ def windowby(
time_expr: pw.ColumnExpression,
*,
window: Window,
behavior: WindowBehavior | None = None,
behavior: Behavior | None = None,
shard: pw.ColumnExpression | None = None,
) -> pw.GroupedTable:
"""

View File

@ -4,11 +4,17 @@ from dataclasses import dataclass
from .utils import IntervalType
# TODO - clarify corner cases (which times are exclusive / inclusive)
class Behavior:
"""
A superclass of all classes defining temporal behavior.
"""
pass
@dataclass
class WindowBehavior:
class CommonBehavior(Behavior):
"""Defines temporal behavior of windows and temporal joins."""
delay: IntervalType | None
@ -16,22 +22,57 @@ class WindowBehavior:
keep_results: bool
def window_behavior(
def common_behavior(
delay: IntervalType | None = None,
cutoff: IntervalType | None = None,
keep_results: bool = True,
) -> WindowBehavior:
"""Creates WindowBehavior
) -> CommonBehavior:
"""Creates CommonBehavior
Args:
delay: For windows, delays initial output by ``delay`` with respect to the
beginning of the window. For interval joins, it delays the time the record
is joined by ``delay``. Using `delay` is useful when updates are too frequent.
cutoff: For windows, stops updating windows which end earlier than maximal seen
time minus ``cutoff``. For interval joins, it ignores entries that are older
delay:
Optional; for windows, delays initial output by ``delay`` with respect to the
beginning of the window. Setting it to ``None`` does not enable
delaying mechanism.
For interval joins, it delays the time the record is joined by ``delay``.
Using `delay` is useful when updates are too frequent.
cutoff:
Optional; for windows, stops updating windows which end earlier than maximal
seen time minus ``cutoff``. Setting cutoff to ``None`` does not enable
cutoff mechanism.
For interval joins, it ignores entries that are older
than maximal seen time minus ``cutoff``. This parameter is also used to clear
memory. It allows to release memory used by entries that won't change.
keep_results: If set to True, keeps all results of the operator. If set to False,
keeps only results that are newer than maximal seen time minus ``cutoff``.
Can't be set to ``False``, when ``cutoff`` is ``None``.
"""
return WindowBehavior(delay, cutoff, keep_results)
assert not (cutoff is None and not keep_results)
return CommonBehavior(delay, cutoff, keep_results)
@dataclass
class ExactlyOnceBehavior(Behavior):
shift: IntervalType | None
def exactly_once_behavior(shift: IntervalType | None = None):
"""Creates an instance of class ExactlyOnceBehavior, indicating that each non empty
window should produce exactly one output.
Args:
shift: optional, defines the moment in time (``window end + shift``) in which
the window stops accepting the data and sends the results to the output.
Setting it to ``None`` is interpreted as ``shift=0``.
Remark:
note that setting a non-zero shift and demanding exactly one output results in
the output being delivered only when the time in the time column reaches
``window end + shift``.
"""
return ExactlyOnceBehavior(shift)

View File

@ -1,7 +1,7 @@
# Copyright © 2023 Pathway
import datetime
from typing import Any, Union
from typing import Any, Type, Union
from pathway.internals import dtype as dt
from pathway.internals.type_interpreter import eval_type
@ -19,6 +19,17 @@ def get_default_shift(interval: IntervalType) -> TimeEventType:
return 0.0
def zero_length_interval(interval_type: Type[IntervalType]) -> IntervalType:
if issubclass(interval_type, datetime.timedelta):
return datetime.timedelta(0)
elif issubclass(interval_type, int):
return 0
elif issubclass(interval_type, float):
return 0.0
else:
raise Exception("unsupported interval type")
def _get_possible_types(type: Any) -> tuple[dt.DType, ...]:
if type is TimeEventType:
return (dt.INT, dt.FLOAT, dt.DATE_TIME_NAIVE, dt.DATE_TIME_UTC)

View File

@ -4,4 +4,10 @@ from __future__ import annotations
from . import async_transformer, bucketing, col, filtering, pandas_transformer
__all__ = ["bucketing", "col", "pandas_transformer", "async_transformer", "filtering"]
__all__ = [
"bucketing",
"col",
"pandas_transformer",
"async_transformer",
"filtering",
]

View File

@ -72,11 +72,13 @@ def test_all_at_once():
assert_table_equality_wo_index(result, expected)
def stream_points() -> tuple[pw.Table, pw.Table]:
def stream_points() -> tuple[pw.Table, pw.Table, pw.persistence.Config]:
"""Returns (points, queries)."""
points = get_points()
table = pw.debug.table_from_list_of_batches(
stream_generator = pw.debug.StreamGenerator()
table = stream_generator.table_from_list_of_batches(
[[{"coords": point[0], "is_query": point[1]}] for point in points],
PointSchema,
).update_types(coords=tuple[int, ...])
@ -84,11 +86,12 @@ def stream_points() -> tuple[pw.Table, pw.Table]:
return (
table.filter(~pw.this.is_query).without(pw.this.is_query),
table.filter(pw.this.is_query).without(pw.this.is_query),
stream_generator.persistence_config(),
)
def test_update_old():
points, queries = stream_points()
points, queries, persistence_config = stream_points()
index = KNNIndex(points.coords, points, n_dimensions=2, n_and=5)
result = queries + index.get_nearest_items(queries.coords, k=2).with_universe_of(
queries
@ -101,11 +104,13 @@ def test_update_old():
((-2, -3), ((-1, 0), (1, -4))),
]
)
assert_table_equality_wo_index(result, expected)
assert_table_equality_wo_index(
result, expected, persistence_config=persistence_config
)
def test_asof_now():
points, queries = stream_points()
points, queries, persistence_config = stream_points()
index = KNNIndex(points.coords, points, n_dimensions=2, n_and=5)
result = queries + index.get_nearest_items_asof_now(queries.coords, k=2).select(
nn=pw.apply(sort_arrays, pw.this.coords)
@ -118,4 +123,6 @@ def test_asof_now():
((-2, -3), ((-3, 1), (-1, 0))),
]
)
assert_table_equality_wo_index(result, expected)
assert_table_equality_wo_index(
result, expected, persistence_config=persistence_config
)

View File

@ -35,7 +35,7 @@ def test_forgetting(keep_results: bool):
t1.t,
t2.t,
pw.temporal.interval(-0.1, 0.1),
behavior=pw.temporal.window_behavior(0, 2, keep_results=keep_results),
behavior=pw.temporal.common_behavior(0, 2, keep_results=keep_results),
).select(left_t=pw.left.t, right_t=pw.right.t)
if keep_results:
expected = T(
@ -104,7 +104,7 @@ def test_forgetting_sharded(keep_results: bool):
t2.t,
pw.temporal.interval(-0.1, 0.1),
t1.v == t2.v,
behavior=pw.temporal.window_behavior(0, 2, keep_results=keep_results),
behavior=pw.temporal.common_behavior(0, 2, keep_results=keep_results),
).select(v=pw.this.v, left_t=pw.left.t, right_t=pw.right.t)
if keep_results:
expected = T(

View File

@ -224,7 +224,7 @@ def test_sliding_compacting():
gb = t.windowby(
t.t,
window=pw.temporal.sliding(duration=10, hop=3),
behavior=pw.temporal.window_behavior(delay=0, cutoff=1, keep_results=False),
behavior=pw.temporal.common_behavior(delay=0, cutoff=1, keep_results=False),
shard=t.shard,
)
@ -271,7 +271,7 @@ def test_sliding_compacting_2():
gb = t.windowby(
t.t,
window=pw.temporal.sliding(duration=10, hop=3),
behavior=pw.temporal.window_behavior(delay=0, cutoff=2, keep_results=False),
behavior=pw.temporal.common_behavior(delay=0, cutoff=2, keep_results=False),
shard=t.shard,
)

View File

@ -6,7 +6,12 @@ import typing
import pathway as pw
from pathway.internals import api
from pathway.tests.utils import DiffEntry, assert_key_entries_in_stream_consistent, run
from pathway.tests.utils import (
DiffEntry,
assert_key_entries_in_stream_consistent,
assert_stream_equal,
run,
)
class TimeColumnInputSchema(pw.Schema):
@ -31,6 +36,7 @@ def generate_buffer_output(
duration,
hop,
delay,
cutoff,
):
now = 0
buffer = {}
@ -44,7 +50,12 @@ def generate_buffer_output(
for _pw_window_start, _pw_window_end in windows:
shard = None
window = (shard, _pw_window_start, _pw_window_end)
freeze_threshold = window[2] + cutoff
if freeze_threshold <= now:
continue
threshold = window[1] + delay
if threshold <= now:
to_process.append((window, entry))
else:
@ -58,7 +69,6 @@ def generate_buffer_output(
to_process.append((window, entry))
output.extend(to_process)
# print(buffer)
return output
@ -68,8 +78,7 @@ def test_keep_results_manual():
"value": lambda x: x,
}
# 68 is 4*17, 1
# 7 is a nice number I chose arbitrarily
# 68 is 4*17, 17 is a nice number I chose arbitrarily
# 4 comes from the fact that I wanted 2 old entries and two fresh (possibly late)
# entries in a window
@ -84,7 +93,7 @@ def test_keep_results_manual():
gb = t.windowby(
t.time,
window=pw.temporal.sliding(duration=5, hop=3),
behavior=pw.temporal.window_behavior(delay=0, cutoff=0, keep_results=True),
behavior=pw.temporal.common_behavior(delay=0, cutoff=0, keep_results=True),
)
expected_entries = []
@ -146,14 +155,13 @@ def test_keep_results_manual():
pw.this._pw_window_end,
max_time=pw.reducers.max(pw.this.time),
max_value=pw.reducers.max(pw.this.value),
).select(pw.this._pw_window_end, pw.this.max_time, pw.this.max_value)
)
assert_key_entries_in_stream_consistent(expected_entries, result)
run(debug=True)
run()
def parametrized_test(duration, hop, delay, cutoff, keep_results):
def create_windowby_scenario(duration, hop, delay, cutoff, keep_results):
value_functions = {
"time": lambda x: (x // 2) % 17,
"value": lambda x: x,
@ -170,25 +178,38 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results):
autocommit_duration_ms=5,
input_rate=25,
)
gb = t.windowby(
t.time,
window=pw.temporal.sliding(duration=duration, hop=hop),
behavior=pw.temporal.window_behavior(
behavior=pw.temporal.common_behavior(
delay=delay, cutoff=cutoff, keep_results=keep_results
),
)
result = gb.reduce(
pw.this._pw_window_end,
max_time=pw.reducers.max(pw.this.time),
max_value=pw.reducers.max(pw.this.value),
)
result.debug("res")
return result
def generate_expected(duration, hop, delay, cutoff, keep_results, result_table):
entries = []
for i in range(68):
entries.append({"value": i, "time": (i // 2) % 17})
buf_out = generate_buffer_output(entries, duration=duration, hop=hop, delay=delay)
buf_out = generate_buffer_output(
entries, duration=duration, hop=hop, delay=delay, cutoff=cutoff
)
simulated_state: dict = {}
simulated_state: dict[pw.Pointer, DiffEntry] = {}
expected_entries = []
max_global_time = 0
order = 0
print(buf_out)
for (
window,
in_entry,
@ -200,7 +221,7 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results):
"_pw_window_end": window[2],
}
entry_id = DiffEntry.create_id_from(gb, pk_row)
entry_id = DiffEntry.create_id_from(result_table, pk_row)
order = in_entry["value"]
max_value = in_entry["value"]
@ -209,13 +230,11 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results):
old_entry_state = simulated_state.get(entry_id)
if old_entry_state is not None:
# cutoff
if max_global_time < typing.cast(
int, old_entry_state.row["_pw_window_end"] + cutoff
):
expected_entries.append(
DiffEntry.create(gb, pk_row, order, False, old_entry_state.row)
expected_entries.append(
DiffEntry.create(
result_table, pk_row, order, False, old_entry_state.row
)
)
max_value = max(
max_value, typing.cast(int, old_entry_state.row["max_value"])
@ -229,28 +248,24 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results):
"max_value": max_value,
"max_time": max_window_time,
}
insert_entry = DiffEntry.create(gb, pk_row, order, True, row)
if (
max_global_time
< typing.cast(int, insert_entry.row["_pw_window_end"]) + cutoff
):
simulated_state[entry_id] = insert_entry
expected_entries.append(insert_entry)
insert_entry = DiffEntry.create(result_table, pk_row, order, True, row)
simulated_state[entry_id] = insert_entry
expected_entries.append(insert_entry)
if not keep_results:
for entry in simulated_state.values():
if entry.row["_pw_window_end"] + cutoff <= max_global_time:
expected_entries.append(entry.final_cleanup_entry())
return expected_entries
result = gb.reduce(
pw.this._pw_window_end,
max_time=pw.reducers.max(pw.this.time),
max_value=pw.reducers.max(pw.this.value),
).select(pw.this._pw_window_end, pw.this.max_time, pw.this.max_value)
assert_key_entries_in_stream_consistent(expected_entries, result)
run(debug=True)
def parametrized_test(duration, hop, delay, cutoff, keep_results):
result_table = create_windowby_scenario(duration, hop, delay, cutoff, keep_results)
expected = generate_expected(
duration, hop, delay, cutoff, keep_results, result_table
)
assert_key_entries_in_stream_consistent(expected, result_table)
run()
def test_keep_results():
@ -287,3 +302,93 @@ def test_high_delay_high_buffer_keep_results():
def test_non_zero_delay_non_zero_buffer_remove_results():
parametrized_test(5, 3, 1, 1, False)
def test_exactly_once():
duration = 5
hop = 3
delay = 6
cutoff = 1
keep_results = True
result = create_windowby_scenario(duration, hop, delay, cutoff, keep_results)
expected = []
for i, window_end in enumerate([2, 5, 8, 11, 14]):
pk_row: dict = {
"_pw_window": (None, window_end - duration, window_end),
"_pw_shard": None,
"_pw_window_start": window_end - duration,
"_pw_window_end": window_end,
}
row: dict = {
"_pw_window_end": window_end,
"max_time": window_end - 1,
"max_value": 2 * window_end - 1,
}
expected.append(DiffEntry.create(result, pk_row, i, True, row))
assert_stream_equal(expected, result)
run()
def test_exactly_once_from_behavior():
duration = 5
hop = 3
value_functions = {
"time": lambda x: (x // 2) % 17,
"value": lambda x: x,
}
# 68 is 4*17, 17 is a nice number I chose arbitrarily
# 4 comes from the fact that I wanted 2 old entries and two fresh (possibly late)
# entries in a window
t = pw.demo.generate_custom_stream(
value_functions,
schema=TimeColumnInputSchema,
nb_rows=68,
autocommit_duration_ms=5,
input_rate=25,
)
gb = t.windowby(
t.time,
window=pw.temporal.sliding(duration=duration, hop=hop),
behavior=pw.temporal.temporal_behavior.exactly_once_behavior(),
)
result = gb.reduce(
pw.this._pw_window_end,
max_time=pw.reducers.max(pw.this.time),
max_value=pw.reducers.max(pw.this.value),
)
expected = []
for i, window_end in enumerate([2, 5, 8, 11, 14]):
pk_row: dict = {
"_pw_window": (None, window_end - duration, window_end),
"_pw_shard": None,
"_pw_window_start": window_end - duration,
"_pw_window_end": window_end,
}
row: dict = {
"_pw_window_end": window_end,
"max_time": window_end - 1,
"max_value": 2 * window_end - 1,
}
expected.append(DiffEntry.create(result, pk_row, i, True, row))
assert_stream_equal(expected, result)
run()
def test_exactly_once_empty():
duration = 5
hop = 3
delay = 6
cutoff = 1
keep_results = False
result = create_windowby_scenario(duration, hop, delay, cutoff, keep_results)
expected: list[DiffEntry] = []
assert_stream_equal(expected, result)
run()

View File

@ -1,6 +1,9 @@
import pytest
import pathway.internals as pw
from pathway.internals import dtype as dt
from pathway.internals.column_properties import ColumnProperties
from pathway.internals.decorators import empty_from_schema
from pathway.tests.utils import T
@ -68,3 +71,44 @@ def test_preserve_context_dependency_properties():
assert_col_props(res1.a, ColumnProperties(dtype=dt.INT, append_only=True))
assert_col_props(res2.a, ColumnProperties(dtype=dt.INT, append_only=False))
@pytest.mark.parametrize("append_only", [True, False])
def test_const_column_properties(append_only):
class Schema(pw.Schema, append_only=append_only):
a: int = pw.column_definition(primary_key=True)
table = empty_from_schema(Schema)
result = table.select(ret=42)
assert table.a._column.properties.append_only == append_only
assert result.ret._column.properties.append_only == append_only
@pytest.mark.parametrize("append_only", [True, False])
def test_universe_properties(append_only):
class Schema(pw.Schema, append_only=append_only):
a: int = pw.column_definition(primary_key=True)
table = empty_from_schema(Schema)
result = table.select()
assert table._id_column.properties.append_only == append_only
assert result._id_column.properties.append_only == append_only
def test_universe_properties_with_universe_of():
class Schema(pw.Schema, append_only=True):
a: int = pw.column_definition(primary_key=True)
table = empty_from_schema(Schema)
reduced = table.groupby(pw.this.a).reduce(pw.this.a)
reduced_same_universe = (
table.groupby(pw.this.a).reduce(pw.this.a).with_universe_of(table)
)
assert table._id_column.properties.append_only
assert not reduced._id_column.properties.append_only
assert reduced_same_universe._id_column.properties.append_only

View File

@ -21,6 +21,7 @@ from pathway.internals.parse_graph import G
from pathway.tests.utils import (
CountDifferentTimestampsCallback,
CsvLinesNumberChecker,
FileLinesNumberChecker,
T,
assert_table_equality,
assert_table_equality_wo_index,
@ -1184,11 +1185,17 @@ def test_immediate_connector_errors():
def run_replacement_test(
streaming_target, input_format, expected_output_lines, tmp_path, monkeypatch
streaming_target,
input_format,
expected_output_lines,
tmp_path,
monkeypatch,
inputs_path_override=None,
has_only_file_replacements=False,
):
monkeypatch.setenv("PATHWAY_PERSISTENT_STORAGE", str(tmp_path / "PStorage"))
inputs_path = tmp_path / "inputs"
os.mkdir(inputs_path)
inputs_path = inputs_path_override or (tmp_path / "inputs")
os.mkdir(tmp_path / "inputs")
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
@ -1199,22 +1206,52 @@ def run_replacement_test(
format=input_format,
schema=InputSchema,
mode="streaming_with_deletions",
autocommit_duration_ms=10,
autocommit_duration_ms=1,
with_metadata=True,
)
output_path = tmp_path / "output.csv"
pw.io.csv.write(table, str(output_path))
pw.io.jsonlines.write(table, str(output_path))
inputs_thread = threading.Thread(target=streaming_target, daemon=True)
inputs_thread.start()
assert wait_result_with_checker(
CsvLinesNumberChecker(output_path, expected_output_lines), 30
FileLinesNumberChecker(output_path, expected_output_lines), 30
)
parsed_rows = []
with open(output_path, "r") as f:
for row in f:
parsed_row = json.loads(row)
parsed_rows.append(parsed_row)
parsed_rows.sort(key=lambda row: (row["time"], row["diff"]))
key_metadata = {}
time_removed = {}
for parsed_row in parsed_rows:
key = parsed_row["key"]
metadata = parsed_row["_metadata"]
file_name = metadata["path"]
is_insertion = parsed_row["diff"] == 1
timestamp = parsed_row["time"]
if is_insertion:
if has_only_file_replacements and file_name in time_removed:
# If there are only replacement and the file has been removed
# already, then we need to check that the insertion and its'
# removal were consolidated, i.e. happened in the same timestamp
assert time_removed[file_name] == timestamp
key_metadata[key] = metadata
else:
# Check that the metadata for the deleted object corresponds to the
# initially reported metadata
assert key_metadata[key] == metadata
time_removed[file_name] = timestamp
@xfail_on_darwin(reason="running pw.run from separate process not supported")
def test_simple_forgetting(tmp_path: pathlib.Path, monkeypatch):
def test_simple_replacement_with_removal(tmp_path: pathlib.Path, monkeypatch):
def stream_inputs():
time.sleep(1)
first_line = {"key": 1, "value": "one"}
@ -1234,6 +1271,54 @@ def test_simple_forgetting(tmp_path: pathlib.Path, monkeypatch):
)
@xfail_on_darwin(reason="running pw.run from separate process not supported")
def test_simple_insert_consolidation(tmp_path: pathlib.Path, monkeypatch):
def stream_inputs():
time.sleep(1)
first_line = {"key": 1, "value": "one"}
second_line = {"key": 2, "value": "two"}
write_lines(tmp_path / "inputs/input1.jsonlines", json.dumps(first_line))
time.sleep(1)
write_lines(tmp_path / "inputs/input1.jsonlines", json.dumps(second_line))
time.sleep(1)
write_lines(tmp_path / "inputs/input1.jsonlines", json.dumps(first_line))
time.sleep(1)
run_replacement_test(
streaming_target=stream_inputs,
input_format="json",
expected_output_lines=5,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
has_only_file_replacements=True,
)
@xfail_on_darwin(reason="running pw.run from separate process not supported")
def test_simple_replacement_on_file(tmp_path: pathlib.Path, monkeypatch):
def stream_inputs():
time.sleep(1)
first_line = {"key": 1, "value": "one"}
second_line = {"key": 2, "value": "two"}
third_line = {"key": 3, "value": "three"}
write_lines(tmp_path / "inputs/input.jsonlines", json.dumps(first_line))
time.sleep(1)
write_lines(tmp_path / "inputs/input.jsonlines", json.dumps(second_line))
time.sleep(1)
os.remove(tmp_path / "inputs/input.jsonlines")
time.sleep(1)
write_lines(tmp_path / "inputs/input.jsonlines", json.dumps(third_line))
run_replacement_test(
streaming_target=stream_inputs,
input_format="json",
expected_output_lines=5,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
inputs_path_override=tmp_path / "inputs/input.jsonlines",
)
@xfail_on_darwin(reason="running pw.run from separate process not supported")
def test_simple_replacement(tmp_path: pathlib.Path, monkeypatch):
def stream_inputs():
@ -1253,6 +1338,7 @@ def test_simple_replacement(tmp_path: pathlib.Path, monkeypatch):
expected_output_lines=4,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
has_only_file_replacements=True,
)
@ -1275,6 +1361,7 @@ def test_last_file_replacement_json(tmp_path: pathlib.Path, monkeypatch):
expected_output_lines=4,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
has_only_file_replacements=True,
)
@ -1306,11 +1393,12 @@ def test_last_file_replacement_csv(tmp_path: pathlib.Path, monkeypatch):
expected_output_lines=4,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
has_only_file_replacements=True,
)
@xfail_on_darwin(reason="running pw.run from separate process not supported")
def test_simple_forgetting_autogenerated_key(tmp_path: pathlib.Path, monkeypatch):
def test_file_removal_autogenerated_key(tmp_path: pathlib.Path, monkeypatch):
def stream_inputs():
time.sleep(1)
first_line = {"key": 1, "value": "one"}
@ -1349,6 +1437,7 @@ def test_simple_replacement_autogenerated_key(tmp_path: pathlib.Path, monkeypatc
expected_output_lines=4,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
has_only_file_replacements=True,
)
@ -1913,16 +2002,18 @@ def test_stream_generator_from_list():
class InputSchema(pw.Schema):
number: int
stream_generator = pw.debug.StreamGenerator()
events = [
[{"number": 1}, {"number": 2}, {"number": 5}],
[{"number": 4}, {"number": 4}],
]
t = pw.debug.table_from_list_of_batches(events, InputSchema)
t = stream_generator.table_from_list_of_batches(events, InputSchema)
on_change = mock.Mock()
pw.io.subscribe(t, on_change=on_change)
pw.run()
pw.run(persistence_config=stream_generator.persistence_config())
timestamps = set([call.kwargs["time"] for call in on_change.mock_calls])
assert len(timestamps) == 2
@ -1967,6 +2058,7 @@ def test_stream_generator_from_list():
def test_stream_generator_from_list_multiple_workers(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("PATHWAY_THREADS", "2")
stream_generator = pw.debug.StreamGenerator()
class InputSchema(pw.Schema):
number: int
@ -1976,11 +2068,11 @@ def test_stream_generator_from_list_multiple_workers(monkeypatch: pytest.MonkeyP
{0: [{"number": 4}], 1: [{"number": 4}]},
]
t = pw.debug.table_from_list_of_batches_by_workers(events, InputSchema)
t = stream_generator.table_from_list_of_batches_by_workers(events, InputSchema)
on_change = mock.Mock()
pw.io.subscribe(t, on_change=on_change)
pw.run()
pw.run(persistence_config=stream_generator.persistence_config())
timestamps = set([call.kwargs["time"] for call in on_change.mock_calls])
assert len(timestamps) == 2
@ -2025,7 +2117,8 @@ def test_stream_generator_from_list_multiple_workers(monkeypatch: pytest.MonkeyP
@pytest.mark.filterwarnings("ignore:timestamps are required to be even")
def test_stream_generator_from_markdown():
t = pw.debug.table_from_markdown(
stream_generator = pw.debug.StreamGenerator()
t = stream_generator.table_from_markdown(
"""
| colA | colB | _time
1 | 1 | 2 | 1
@ -2036,7 +2129,7 @@ def test_stream_generator_from_markdown():
on_change = mock.Mock()
pw.io.subscribe(t, on_change=on_change)
pw.run()
pw.run(persistence_config=stream_generator.persistence_config())
on_change.assert_has_calls(
[
@ -2065,7 +2158,8 @@ def test_stream_generator_from_markdown():
def test_stream_generator_from_markdown_with_diffs():
t = pw.debug.table_from_markdown(
stream_generator = pw.debug.StreamGenerator()
t = stream_generator.table_from_markdown(
"""
| colA | colB | _time | _diff
1 | 1 | 2 | 2 | 1
@ -2085,17 +2179,20 @@ def test_stream_generator_from_markdown_with_diffs():
"""
)
assert_table_equality(t, expected)
assert_table_equality(
t, expected, persistence_config=stream_generator.persistence_config()
)
def test_stream_generator_two_tables_multiple_workers(monkeypatch: pytest.MonkeyPatch):
stream_generator = pw.debug.StreamGenerator()
monkeypatch.setenv("PATHWAY_THREADS", "4")
class InputSchema(pw.Schema):
colA: int
colB: int
t1 = pw.debug.table_from_markdown(
t1 = stream_generator.table_from_markdown(
"""
colA | colB | _time | _worker
1 | 2 | 2 | 0
@ -2106,7 +2203,7 @@ def test_stream_generator_two_tables_multiple_workers(monkeypatch: pytest.Monkey
"""
)
t2 = pw.debug.stream_generator._table_from_dict(
t2 = stream_generator._table_from_dict(
{
2: {0: [(1, api.ref_scalar(0), [1, 4])]},
4: {2: [(1, api.ref_scalar(1), [3, 7])]},
@ -2124,7 +2221,7 @@ def test_stream_generator_two_tables_multiple_workers(monkeypatch: pytest.Monkey
on_change = mock.Mock()
pw.io.subscribe(t3, on_change=on_change)
pw.run()
pw.run(persistence_config=stream_generator.persistence_config())
on_change.assert_has_calls(
[

View File

@ -267,6 +267,7 @@ def test_schema_properties():
assert A["a"].append_only is True
assert A["b"].append_only is True
assert A.universe_properties.append_only is True
class B(pw.Schema, append_only=False):
a: int = pw.column_definition(append_only=False)
@ -274,6 +275,7 @@ def test_schema_properties():
assert B["a"].append_only is False
assert B["b"].append_only is False
assert B.universe_properties.append_only is False
class C(pw.Schema):
a: int = pw.column_definition(append_only=True)
@ -283,3 +285,9 @@ def test_schema_properties():
assert C["a"].append_only is True
assert C["b"].append_only is False
assert C["c"].append_only is False
assert C.universe_properties.append_only is True
class D(pw.Schema, append_only=True):
pass
assert D.universe_properties.append_only is True

View File

@ -127,6 +127,32 @@ class CheckKeyConsistentInStreamCallback(CheckKeyEntriesInStreamCallback):
assert not self.state, f"Non empty final state = {self.state!r}"
# this callback does not verify the order of entries, only that all of them were present
class CheckStreamEntriesEqualityCallback(CheckKeyEntriesInStreamCallback):
def __call__(
self,
key: api.Pointer,
row: dict[str, api.Value],
time: int,
is_addition: bool,
) -> Any:
q = self.state.get(key)
assert (
q
), f"Got unexpected entry {key=} {row=} {time=} {is_addition=}, expected entries= {self.state!r}"
entry = q.popleft()
assert (is_addition, row) == (
entry.insertion,
entry.row,
), f"Got unexpected entry {key=} {row=} {time=} {is_addition=}, expected entries= {self.state!r}"
if not q:
self.state.pop(key)
def on_end(self):
assert not self.state, f"Non empty final state = {self.state!r}"
# assert_key_entries_in_stream_consistent verifies for each key, whether:
# - a sequence of updates in the table is a subsequence
# of the sequence of updates defined in expected
@ -136,6 +162,11 @@ def assert_key_entries_in_stream_consistent(expected: list[DiffEntry], table: pw
pw.io.subscribe(table, callback, callback.on_end)
def assert_stream_equal(expected: list[DiffEntry], table: pw.Table):
callback = CheckStreamEntriesEqualityCallback(expected)
pw.io.subscribe(table, callback, callback.on_end)
def assert_equal_tables(t0: api.CapturedTable, t1: api.CapturedTable):
assert t0 == t1
@ -167,6 +198,19 @@ class CsvLinesNumberChecker:
return len(result) == self.n_lines
class FileLinesNumberChecker:
def __init__(self, path, n_lines):
self.path = path
self.n_lines = n_lines
def __call__(self):
n_lines_actual = 0
with open(self.path, "r") as f:
for row in f:
n_lines_actual += 1
return n_lines_actual == self.n_lines
def expect_csv_checker(expected, output_path, usecols=("k", "v"), index_col=("k")):
expected = (
pw.debug._markdown_to_pandas(expected)

View File

@ -22,8 +22,7 @@ use std::str::{from_utf8, Utf8Error};
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use std::time::SystemTime;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use chrono::{DateTime, FixedOffset};
use log::{error, warn};
@ -41,6 +40,7 @@ use crate::persistence::{ExternalPersistentId, PersistentId};
use crate::python_api::threads::PythonThreadState;
use crate::python_api::with_gil_and_pool;
use crate::python_api::PythonSubject;
use crate::timestamp::current_unix_timestamp_secs;
use bincode::ErrorKind as BincodeError;
use elasticsearch::{BulkParts, Elasticsearch};
@ -162,6 +162,7 @@ impl ReaderContext {
pub enum ReadResult {
Finished,
NewSource(Option<SourceMetadata>),
FinishedSource { commit_allowed: bool },
Data(ReaderContext, Offset),
}
@ -436,6 +437,7 @@ pub struct FilesystemReader {
reader: Option<BufReader<std::fs::File>>,
filesystem_scanner: FilesystemScanner,
total_entries_read: u64,
deferred_read_result: Option<ReadResult>,
}
impl FilesystemReader {
@ -445,15 +447,9 @@ impl FilesystemReader {
persistent_id: Option<PersistentId>,
read_method: ReadMethod,
object_pattern: &str,
with_metadata: bool,
) -> Result<FilesystemReader, ReadError> {
let filesystem_scanner = FilesystemScanner::new(
path,
persistent_id,
streaming_mode,
object_pattern,
with_metadata,
)?;
let filesystem_scanner =
FilesystemScanner::new(path, persistent_id, streaming_mode, object_pattern)?;
Ok(Self {
persistent_id,
@ -462,6 +458,7 @@ impl FilesystemReader {
filesystem_scanner,
total_entries_read: 0,
read_method,
deferred_read_result: None,
})
}
}
@ -497,6 +494,10 @@ impl Reader for FilesystemReader {
}
fn read(&mut self) -> Result<ReadResult, ReadError> {
if let Some(deferred_read_result) = self.deferred_read_result.take() {
return Ok(deferred_read_result);
}
loop {
if let Some(reader) = &mut self.reader {
let mut line = Vec::new();
@ -522,6 +523,9 @@ impl Reader for FilesystemReader {
.expect("scanner action can't be empty");
if self.read_method == ReadMethod::Full {
self.deferred_read_result = Some(ReadResult::FinishedSource {
commit_allowed: !self.filesystem_scanner.has_planned_insertion(),
});
self.reader = None;
}
@ -530,21 +534,20 @@ impl Reader for FilesystemReader {
offset,
));
}
self.reader = None;
return Ok(ReadResult::FinishedSource {
commit_allowed: !self.filesystem_scanner.has_planned_insertion(),
});
}
self.reader = None;
if self.filesystem_scanner.next_action_determined()? {
let file = File::open(
self.filesystem_scanner
.current_file()
.as_ref()
.unwrap()
.as_path(),
)?;
self.reader = Some(BufReader::new(file));
return Ok(ReadResult::NewSource(
self.filesystem_scanner.maybe_current_object_metadata(),
));
let next_read_result = self.filesystem_scanner.next_action_determined()?;
if let Some(next_read_result) = next_read_result {
if let Some(selected_file) = self.filesystem_scanner.current_file() {
let file = File::open(&*selected_file)?;
self.reader = Some(BufReader::new(file));
}
return Ok(next_read_result);
}
if self.filesystem_scanner.is_polling_enabled() {
@ -714,7 +717,8 @@ struct FilesystemScanner {
cached_modify_times: HashMap<PathBuf, Option<SystemTime>>,
inotify: Option<inotify_support::Inotify>,
object_pattern: GlobPattern,
with_metadata: bool,
next_file_for_insertion: Option<PathBuf>,
cached_metadata: HashMap<PathBuf, Option<SourceMetadata>>,
}
impl FilesystemScanner {
@ -723,12 +727,10 @@ impl FilesystemScanner {
persistent_id: Option<PersistentId>,
streaming_mode: ConnectorMode,
object_pattern: &str,
with_metadata: bool,
) -> Result<FilesystemScanner, ReadError> {
let path = std::fs::canonicalize(path.into())?;
if !path.exists() {
return Err(io::Error::from(io::ErrorKind::NotFound).into());
let mut path = path.into();
if path.exists() || matches!(streaming_mode, ConnectorMode::Static) {
path = std::fs::canonicalize(path)?;
}
let is_directory = path.is_dir();
@ -766,10 +768,15 @@ impl FilesystemScanner {
cached_modify_times: HashMap::new(),
inotify,
object_pattern: GlobPattern::new(object_pattern)?,
with_metadata,
next_file_for_insertion: None,
cached_metadata: HashMap::new(),
})
}
fn has_planned_insertion(&self) -> bool {
self.next_file_for_insertion.is_some()
}
fn is_polling_enabled(&self) -> bool {
self.streaming_mode.is_polling_enabled()
}
@ -794,19 +801,6 @@ impl FilesystemScanner {
}
}
fn maybe_current_object_metadata(&self) -> Option<SourceMetadata> {
if !self.with_metadata {
return None;
}
let path: &Path = match &self.current_action {
Some(PosixScannerAction::Read(path) | PosixScannerAction::Delete(path)) => {
path.as_ref()
}
None => return None,
};
Some(SourceMetadata::from_fs_path(path))
}
/// Returns the name of the currently processed file in the input directory
fn current_offset_file(&self) -> Option<Arc<PathBuf>> {
match &self.current_action {
@ -822,14 +816,21 @@ impl FilesystemScanner {
todo!("seek for snapshot mode");
}
self.known_files.clear();
if !self.is_directory {
self.current_action = Some(PosixScannerAction::Read(Arc::new(
seek_file_path.to_path_buf(),
)));
let modify_system_time = std::fs::metadata(seek_file_path)?.modified().unwrap();
let modify_unix_timestamp = modify_system_time
.duration_since(UNIX_EPOCH)
.expect("File modified earlier than UNIX epoch")
.as_secs();
self.known_files
.insert(seek_file_path.to_path_buf(), modify_unix_timestamp);
return Ok(());
}
self.known_files.clear();
let target_modify_time = match std::fs::metadata(seek_file_path) {
Ok(metadata) => metadata.modified()?,
Err(e) => {
@ -882,37 +883,54 @@ impl FilesystemScanner {
}
}
fn next_action_determined(&mut self) -> io::Result<bool> {
/// Finish reading the current file and find the next one to read from.
/// If there is a file to read from, the method returns a `ReadResult`
/// specifying the action to be provided downstream.
///
/// It can either be a `NewSource` event when the new action is found or
/// a `FinishedSource` event when we've had a scheduled action but the
/// corresponding file was deleted before we were able to execute this scheduled action.
/// scheduled action.
fn next_action_determined(&mut self) -> io::Result<Option<ReadResult>> {
// Finalize the current processing action
let is_processing_finalized = match take(&mut self.current_action) {
Some(PosixScannerAction::Read(_)) => true,
Some(PosixScannerAction::Delete(path)) => {
let cached_path = self
.cached_file_path(&path)
.expect("in case of enabled deletions cache should exist");
std::fs::remove_file(cached_path)?;
true
}
None => false,
};
if let Some(PosixScannerAction::Delete(path)) = take(&mut self.current_action) {
let cached_path = self
.cached_file_path(&path)
.expect("in case of enabled deletions cache should exist");
std::fs::remove_file(cached_path)?;
}
if !self.is_directory && is_processing_finalized {
return Ok(false);
// File modification is handled as combination of its deletion and insertion
// If a file was deleted in the last action, now we must add it, and after that
// we may allow commit
if let Some(next_file_for_insertion) = take(&mut self.next_file_for_insertion) {
if next_file_for_insertion.exists() {
return Ok(Some(
self.initiate_file_insertion(&next_file_for_insertion)?,
));
}
// The scheduled insertion after deletion is impossible because
// the file has already been deleted.
// The action was done in full now, and we can allow commits.
return Ok(Some(ReadResult::FinishedSource {
commit_allowed: true,
}));
}
// First check if we need to delete something
if self.streaming_mode.are_deletions_enabled() {
let has_something_to_delete = self.next_file_for_deletion_found();
if has_something_to_delete {
return Ok(true);
let next_for_deletion = self.next_deletion_entry();
if next_for_deletion.is_some() {
return Ok(next_for_deletion);
}
}
// If there is nothing to delete, ingest the new entries
self.next_file_for_insertion_found()
self.next_insertion_entry()
}
fn next_file_for_deletion_found(&mut self) -> bool {
fn next_deletion_entry(&mut self) -> Option<ReadResult> {
let mut path_for_deletion: Option<PathBuf> = None;
for (path, modified_at) in &self.known_files {
let metadata = std::fs::metadata(path);
@ -946,11 +964,22 @@ impl FilesystemScanner {
match path_for_deletion {
Some(path) => {
// Metadata of the deleted file must be the same as when it was added
// so that the deletion event is processed correctly by timely. To achieve
// this, we just take the cached metadata
let old_metadata = self
.cached_metadata
.remove(&path)
.expect("inconsistency between known_files and cached_metadata");
self.known_files.remove(&path.clone().clone());
self.current_action = Some(PosixScannerAction::Delete(Arc::new(path.clone())));
true
if path.exists() {
self.next_file_for_insertion = Some(path);
}
Some(ReadResult::NewSource(old_metadata))
}
None => false,
None => None,
}
}
@ -962,7 +991,7 @@ impl FilesystemScanner {
})
}
fn next_file_for_insertion_found(&mut self) -> io::Result<bool> {
fn next_insertion_entry(&mut self) -> io::Result<Option<ReadResult>> {
let mut selected_file: Option<(PathBuf, SystemTime)> = None;
if self.is_directory {
let files_in_directory = std::fs::read_dir(self.path.as_path())?;
@ -1009,33 +1038,40 @@ impl FilesystemScanner {
}
}
} else {
let is_existing_file = self.path.exists() && self.path.is_file();
if !self.known_files.is_empty() || !is_existing_file {
return Ok(None);
}
selected_file = Some((self.path.clone(), SystemTime::now()));
}
match selected_file {
Some((new_file_name, new_file_modify_time)) => {
let new_file_path = self.path.as_path().join(new_file_name);
let new_file_modify_timestamp = new_file_modify_time
.duration_since(SystemTime::UNIX_EPOCH)
.expect("System time should be after the Unix epoch")
.as_secs();
self.known_files
.insert(new_file_path.clone(), new_file_modify_timestamp);
let cached_path = self.cached_file_path(&new_file_path);
if let Some(cached_path) = cached_path {
std::fs::copy(&new_file_path, cached_path)?;
}
self.current_action = Some(PosixScannerAction::Read(Arc::new(new_file_path)));
Ok(true)
}
None => Ok(false),
Some((new_file_name, _)) => Ok(Some(self.initiate_file_insertion(&new_file_name)?)),
None => Ok(None),
}
}
fn initiate_file_insertion(&mut self, new_file_name: &PathBuf) -> io::Result<ReadResult> {
let new_file_meta =
SourceMetadata::from_fs_meta(new_file_name, &std::fs::metadata(new_file_name)?);
self.cached_metadata
.insert(new_file_name.clone(), Some(new_file_meta.clone()));
self.known_files.insert(
new_file_name.clone(),
new_file_meta
.modified_at
.unwrap_or(current_unix_timestamp_secs()),
);
let cached_path = self.cached_file_path(new_file_name);
if let Some(cached_path) = cached_path {
std::fs::copy(new_file_name, cached_path)?;
}
self.current_action = Some(PosixScannerAction::Read(Arc::new(new_file_name.clone())));
Ok(ReadResult::NewSource(Some(new_file_meta)))
}
fn sleep_duration() -> Duration {
Duration::from_millis(500)
}
@ -1103,15 +1139,9 @@ impl CsvFilesystemReader {
streaming_mode: ConnectorMode,
persistent_id: Option<PersistentId>,
object_pattern: &str,
with_metadata: bool,
) -> Result<CsvFilesystemReader, ReadError> {
let filesystem_scanner = FilesystemScanner::new(
path.into(),
persistent_id,
streaming_mode,
object_pattern,
with_metadata,
)?;
let filesystem_scanner =
FilesystemScanner::new(path.into(), persistent_id, streaming_mode, object_pattern)?;
Ok(CsvFilesystemReader {
parser_builder,
persistent_id,
@ -1216,37 +1246,31 @@ impl Reader for CsvFilesystemReader {
offset,
));
}
if self.filesystem_scanner.next_action_determined()? {
self.reader = Some(
self.parser_builder.from_path(
self.filesystem_scanner
.current_file()
.as_ref()
.unwrap()
.as_path(),
)?,
);
return Ok(ReadResult::NewSource(
self.filesystem_scanner.maybe_current_object_metadata(),
));
let next_read_result = self.filesystem_scanner.next_action_determined()?;
if let Some(next_read_result) = next_read_result {
if let Some(selected_file) = self.filesystem_scanner.current_file() {
self.reader = Some(self.parser_builder.from_path(&*selected_file)?);
}
return Ok(next_read_result);
}
// The file came to its end, so we should drop the reader
self.reader = None;
return Ok(ReadResult::FinishedSource {
commit_allowed: !self.filesystem_scanner.has_planned_insertion(),
});
}
None => {
if self.filesystem_scanner.next_action_determined()? {
self.reader = Some(
self.parser_builder.flexible(true).from_path(
self.filesystem_scanner
.current_file()
.as_ref()
.unwrap()
.as_path(),
)?,
);
return Ok(ReadResult::NewSource(
self.filesystem_scanner.maybe_current_object_metadata(),
));
let next_read_result = self.filesystem_scanner.next_action_determined()?;
if let Some(next_read_result) = next_read_result {
if let Some(selected_file) = self.filesystem_scanner.current_file() {
self.reader = Some(
self.parser_builder
.flexible(true)
.from_path(&*selected_file)?,
);
}
return Ok(next_read_result);
}
}
}
@ -2047,6 +2071,7 @@ pub struct S3GenericReader {
persistent_id: Option<PersistentId>,
total_entries_read: u64,
current_bytes_read: u64,
deferred_read_result: Option<ReadResult>,
}
impl S3GenericReader {
@ -2066,6 +2091,7 @@ impl S3GenericReader {
persistent_id,
total_entries_read: 0,
current_bytes_read: 0,
deferred_read_result: None,
})
}
@ -2133,6 +2159,10 @@ impl Reader for S3GenericReader {
}
fn read(&mut self) -> Result<ReadResult, ReadError> {
if let Some(deferred_read_result) = self.deferred_read_result.take() {
return Ok(deferred_read_result);
}
loop {
match &mut self.reader {
Some(reader) => {
@ -2152,6 +2182,9 @@ impl Reader for S3GenericReader {
);
if self.read_method == ReadMethod::Full {
self.deferred_read_result = Some(ReadResult::FinishedSource {
commit_allowed: true,
});
self.reader = None;
}

View File

@ -1,4 +1,3 @@
use log::error;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
@ -11,7 +10,7 @@ pub struct SourceMetadata {
// Creation and modification time may not be available at some platforms
// Stored in u64 for easy serialization
created_at: Option<u64>,
modified_at: Option<u64>,
pub modified_at: Option<u64>,
// Owner may be unavailable at some platforms and on S3
owner: Option<String>,
@ -23,18 +22,10 @@ pub struct SourceMetadata {
}
impl SourceMetadata {
pub fn from_fs_path(path: &Path) -> Self {
let (created_at, modified_at, owner) = match std::fs::metadata(path) {
Ok(metadata) => (
metadata_time_to_unix_timestamp(metadata.created().ok()),
metadata_time_to_unix_timestamp(metadata.modified().ok()),
file_owner::get_owner(&metadata),
),
Err(e) => {
error!("Failed to get metadata for filesystem object {path:?}, details: {e}");
(None, None, None)
}
};
pub fn from_fs_meta(path: &Path, meta: &std::fs::Metadata) -> Self {
let created_at = metadata_time_to_unix_timestamp(meta.created().ok());
let modified_at = metadata_time_to_unix_timestamp(meta.modified().ok());
let owner = file_owner::get_owner(meta);
Self {
created_at,

View File

@ -402,6 +402,7 @@ where
let connector_monitor = Rc::new(RefCell::new(ConnectorMonitor::new(reader_name)));
let cloned_connector_monitor = connector_monitor.clone();
let mut commit_allowed = true;
let poller = Box::new(move || {
let iteration_start = SystemTime::now();
if matches!(replay_mode, ReplayMode::Speedrun)
@ -413,7 +414,7 @@ where
if let Some(next_commit_at_timestamp) = next_commit_at {
if next_commit_at_timestamp <= iteration_start {
if backfilling_finished {
if backfilling_finished && commit_allowed {
/*
We don't auto-commit for the initial batch, which consists of the
data, which shouldn't trigger any output.
@ -460,6 +461,7 @@ where
&mut snapshot_writer,
&offsets_by_time_writer,
&mut Some(&mut *connector_monitor.borrow_mut()),
&mut commit_allowed,
);
}
Err(TryRecvError::Empty) => return ControlFlow::Continue(next_commit_at),
@ -489,24 +491,38 @@ where
snapshot_writer: &mut Option<SharedSnapshotWriter>,
offsets_by_time_writer: &Mutex<HashMap<Timestamp, OffsetAntichain>>,
connector_monitor: &mut Option<&mut ConnectorMonitor>,
commit_allowed: &mut bool,
) {
let has_persistent_storage = snapshot_writer.is_some();
match entry {
Entry::Realtime(read_result) => match read_result {
ReadResult::Finished => {}
ReadResult::FinishedSource {
commit_allowed: commit_allowed_external,
} => {
*commit_allowed = commit_allowed_external;
if *commit_allowed {
let parsed_entries = vec![ParsedEvent::AdvanceTime];
self.on_parsed_data(
parsed_entries,
None, // no key generation for time advancement
input_session,
values_to_key,
snapshot_writer,
connector_monitor,
);
}
}
ReadResult::NewSource(metadata) => {
// If a connector produces events of this kind, we consider the
// objects atomic. That means that we won't do commits in between
// of data source processing.
//
// So, we will block the ability to commit until an event allowing
// the commits is received again.
*commit_allowed = false;
parser.on_new_source_started(metadata.as_ref());
let parsed_entries = vec![ParsedEvent::AdvanceTime];
self.on_parsed_data(
parsed_entries,
None, // no key generation for time advancement
input_session,
values_to_key,
snapshot_writer,
connector_monitor,
);
}
ReadResult::Data(reader_context, offset) => {
let mut parsed_entries = match parser.parse(&reader_context) {

View File

@ -2977,7 +2977,6 @@ pub struct DataStorage {
persistent_id: Option<ExternalPersistentId>,
max_batch_size: Option<usize>,
object_pattern: String,
with_metadata: bool,
mock_events: Option<HashMap<(ExternalPersistentId, usize), Vec<SnapshotEvent>>>,
}
@ -3209,7 +3208,6 @@ impl DataStorage {
persistent_id = None,
max_batch_size = None,
object_pattern = "*".to_string(),
with_metadata = false,
mock_events = None,
))]
#[allow(clippy::too_many_arguments)]
@ -3229,7 +3227,6 @@ impl DataStorage {
persistent_id: Option<ExternalPersistentId>,
max_batch_size: Option<usize>,
object_pattern: String,
with_metadata: bool,
mock_events: Option<HashMap<(ExternalPersistentId, usize), Vec<SnapshotEvent>>>,
) -> Self {
DataStorage {
@ -3248,7 +3245,6 @@ impl DataStorage {
persistent_id,
max_batch_size,
object_pattern,
with_metadata,
mock_events,
}
}
@ -3447,7 +3443,6 @@ impl DataStorage {
self.internal_persistent_id(),
self.read_method,
&self.object_pattern,
self.with_metadata,
)
.map_err(|e| {
PyIOError::new_err(format!("Failed to initialize Filesystem reader: {e}"))
@ -3485,7 +3480,6 @@ impl DataStorage {
self.mode,
self.internal_persistent_id(),
&self.object_pattern,
self.with_metadata,
)
.map_err(|e| {
PyIOError::new_err(format!("Failed to initialize CsvFilesystem reader: {e}"))

View File

@ -6,3 +6,10 @@ pub fn current_unix_timestamp_ms() -> u128 {
.expect("Failed to get the current timestamp")
.as_millis()
}
pub fn current_unix_timestamp_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Failed to get the current timestamp")
.as_secs()
}

View File

@ -112,6 +112,7 @@ pub fn full_cycle_read(
new_parsed_entries.push(event);
}
}
Entry::Realtime(ReadResult::FinishedSource { .. }) => continue,
Entry::Realtime(ReadResult::NewSource(metadata)) => {
parser.on_new_source_started(metadata.as_ref());
}
@ -174,6 +175,7 @@ pub fn read_data_from_reader(
panic!("Unexpected erroneous reply: {parse_result:?}");
}
}
ReadResult::FinishedSource { .. } => continue,
ReadResult::NewSource(metadata) => parser.on_new_source_started(metadata.as_ref()),
ReadResult::Finished => break,
}
@ -249,6 +251,7 @@ pub fn data_parsing_fails(
return Ok(true);
}
}
ReadResult::FinishedSource { .. } => continue,
ReadResult::NewSource(metadata) => parser.on_new_source_started(metadata.as_ref()),
ReadResult::Finished => break,
}

View File

@ -13,7 +13,6 @@ fn read_bytes_from_path(path: &str) -> eyre::Result<Vec<ParsedEvent>> {
None,
ReadMethod::Full,
"*",
false,
)?;
let mut parser = IdentityParser::new(vec!["data".to_string()], false);
let mut events = Vec::new();
@ -30,6 +29,7 @@ fn read_bytes_from_path(path: &str) -> eyre::Result<Vec<ParsedEvent>> {
}
}
ReadResult::Finished => break,
ReadResult::FinishedSource { .. } => continue,
ReadResult::NewSource(_) => continue,
}
}

View File

@ -30,7 +30,6 @@ fn test_dsv_with_default_end_of_line() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -85,7 +84,6 @@ fn test_dsv_with_default_middle_of_line() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -136,7 +134,6 @@ fn test_dsv_fails_without_default() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -170,7 +167,6 @@ fn test_dsv_with_default_nullable() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -215,7 +211,6 @@ fn test_jsonlines_fails_without_default() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),
@ -244,7 +239,6 @@ fn test_jsonlines_with_default() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),
@ -297,7 +291,6 @@ fn test_jsonlines_with_default_at_jsonpath() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),
@ -344,7 +337,6 @@ fn test_jsonlines_explicit_null_not_overridden() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),

View File

@ -20,7 +20,6 @@ fn test_debezium_reads_ok() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = DebeziumMessageParser::new(
Some(vec!["id".to_string()]),
@ -166,7 +165,6 @@ fn test_debezium_mongodb_format() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = DebeziumMessageParser::new(
Some(vec!["id".to_string()]),

View File

@ -21,7 +21,6 @@ fn test_dsv_read_ok() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','),
@ -65,7 +64,6 @@ fn test_dsv_column_does_not_exist() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(Some(vec!["a".to_string()]), vec!["c".to_string()], ','),
@ -89,7 +87,6 @@ fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','),
@ -126,7 +123,6 @@ fn test_dsv_not_enough_columns() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','),
@ -172,7 +168,6 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(None, vec!["a".to_string(), "b".to_string()], ','),
@ -200,6 +195,7 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> {
}
}
ReadResult::Finished => break,
ReadResult::FinishedSource { .. } => continue,
ReadResult::NewSource(_) => continue,
}
}
@ -215,7 +211,6 @@ fn test_dsv_composite_pkey() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(
@ -246,6 +241,7 @@ fn test_dsv_composite_pkey() -> eyre::Result<()> {
}
}
ReadResult::Finished => break,
ReadResult::FinishedSource { .. } => continue,
ReadResult::NewSource(_) => continue,
}
}
@ -278,7 +274,6 @@ fn test_dsv_read_schema_ok() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(
@ -348,7 +343,6 @@ fn test_dsv_read_schema_nonparsable() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut parser = DsvParser::new(
DsvSettings::new(

View File

@ -20,7 +20,6 @@ fn test_dsv_dir_ok() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(Some(vec!["key".to_string()]), vec!["foo".to_string()], ','),
@ -54,7 +53,6 @@ fn test_single_file_ok() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','),
@ -80,7 +78,6 @@ fn test_custom_delimiter() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -108,7 +105,6 @@ fn test_escape_fields() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -155,7 +151,6 @@ fn test_escape_newlines() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -192,7 +187,6 @@ fn test_nonexistent_file() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
);
assert!(reader.is_err());
@ -210,7 +204,6 @@ fn test_special_fields() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
false,
)?;
let parser = DsvParser::new(
DsvSettings::new(

View File

@ -18,7 +18,6 @@ fn test_jsonlines_ok() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),
@ -58,7 +57,6 @@ fn test_jsonlines_incorrect_key() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string(), "d".to_string()]),
@ -85,7 +83,6 @@ fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string(), "d".to_string()]),
@ -109,7 +106,6 @@ fn test_jsonlines_incorrect_values() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),
@ -136,7 +132,6 @@ fn test_jsonlines_types_parsing() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string()]),
@ -187,7 +182,6 @@ fn test_jsonlines_complex_paths() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut routes = HashMap::new();
@ -244,7 +238,6 @@ fn test_jsonlines_complex_paths_error() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut routes = HashMap::new();
@ -286,7 +279,6 @@ fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let mut routes = HashMap::new();
@ -325,7 +317,6 @@ fn test_jsonlines_incorrect_key_verbose_error() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string(), "d".to_string()]),
@ -355,7 +346,6 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
Some(vec!["a".to_string(), "d".to_string()]),
@ -382,7 +372,6 @@ fn test_jsonlines_failed_to_parse_field() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
false,
)?;
let parser = JsonLinesParser::new(
None,

View File

@ -34,7 +34,6 @@ fn test_metadata_fs_dir() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
true,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -65,7 +64,6 @@ fn test_metadata_fs_file() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
true,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -97,7 +95,6 @@ fn test_metadata_csv_dir() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
true,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -131,7 +128,6 @@ fn test_metadata_csv_file() -> eyre::Result<()> {
ConnectorMode::Static,
None,
"*",
true,
)?;
let parser = DsvParser::new(
DsvSettings::new(
@ -160,7 +156,6 @@ fn test_metadata_json_file() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
true,
)?;
let parser = JsonLinesParser::new(
None,
@ -184,7 +179,6 @@ fn test_metadata_json_dir() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
true,
)?;
let parser = JsonLinesParser::new(
None,
@ -209,7 +203,6 @@ fn test_metadata_identity_file() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
true,
)?;
let parser = IdentityParser::new(vec!["data".to_string(), "_metadata".to_string()], false);
@ -227,7 +220,6 @@ fn test_metadata_identity_dir() -> eyre::Result<()> {
None,
ReadMethod::ByLine,
"*",
true,
)?;
let parser = IdentityParser::new(vec!["data".to_string(), "_metadata".to_string()], false);

View File

@ -32,7 +32,6 @@ fn csv_reader_parser_pair(input_path: &Path) -> (Box<dyn ReaderBuilder>, Box<dyn
ConnectorMode::Static,
Some(1),
"*",
false,
)
.unwrap();
let parser = DsvParser::new(
@ -53,7 +52,6 @@ fn json_reader_parser_pair(input_path: &Path) -> (Box<dyn ReaderBuilder>, Box<dy
Some(1),
ReadMethod::ByLine,
"*",
false,
)
.unwrap();
let parser = JsonLinesParser::new(