update rust: 1.82 -> 1.88 (#8947)
GitOrigin-RevId: ff7af9a7eee55b6d7542eebaee920c7ffb8648fd
This commit is contained in:
parent
106e423050
commit
02cd6a76d4
|
@ -3,7 +3,7 @@ name = "pathway"
|
|||
version = "0.24.1"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
rust-version = "1.82"
|
||||
rust-version = "1.88"
|
||||
license = "BUSL-1.1"
|
||||
|
||||
[lib]
|
||||
|
|
|
@ -73,6 +73,8 @@
|
|||
#![forbid(missing_docs)]
|
||||
#![allow(array_into_iter)]
|
||||
|
||||
// Required for deriving `Abomonation`. Note: the associated crate has not been updated in 5 years and triggers warnings starting with Rust 1.85.
|
||||
#![allow(non_local_definitions)]
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
|
|
|
@ -75,6 +75,9 @@
|
|||
|
||||
#![forbid(missing_docs)]
|
||||
|
||||
// Required for deriving `Abomonation`. Note: the associated crate has not been updated in 5 years and triggers warnings starting with Rust 1.85.
|
||||
#![allow(non_local_definitions)]
|
||||
|
||||
#[cfg(feature = "getopts")]
|
||||
extern crate getopts;
|
||||
#[cfg(feature = "bincode")]
|
||||
|
|
|
@ -57,6 +57,9 @@
|
|||
|
||||
#![forbid(missing_docs)]
|
||||
|
||||
// Required for deriving `Abomonation`. Note: the associated crate has not been updated in 5 years and triggers warnings starting with Rust 1.85.
|
||||
#![allow(non_local_definitions)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate abomonation_derive;
|
||||
extern crate abomonation;
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
[toolchain]
|
||||
channel = "1.82"
|
||||
channel = "1.88"
|
||||
|
|
|
@ -63,15 +63,14 @@ impl<Timestamp: TimelyTimestamp + Lattice + TotalOrder> UpsertSession<Timestamp>
|
|||
}
|
||||
}
|
||||
|
||||
/// The implementation below mostly reuses differetial dataflow's `InputSession` internals.
|
||||
///
|
||||
/// The main difference is the consolidation of the buffer before flushing.
|
||||
/// Without consolidation, if we have multiple entries for a single key,
|
||||
/// we may end up with any entry for this key, not necessarily the final one.
|
||||
impl<Timestamp: TimelyTimestamp + Lattice + TotalOrder> InputAdaptor<Timestamp>
|
||||
for UpsertSession<Timestamp>
|
||||
{
|
||||
/// The implementation below mostly reuses differetial dataflow's `InputSession` internals.
|
||||
///
|
||||
/// The main difference is the consolidation of the buffer before flushing.
|
||||
/// Without consolidation, if we have multiple entries for a single key,
|
||||
/// we may end up with any entry for this key, not necessarily the final one.
|
||||
|
||||
fn new() -> Self {
|
||||
let handle: Handle<Timestamp, _> = Handle::new();
|
||||
UpsertSession {
|
||||
|
|
|
@ -24,7 +24,7 @@ use async_nats::header::HeaderMap as NatsHeaders;
|
|||
use base64::engine::general_purpose::STANDARD as base64encoder;
|
||||
use base64::Engine;
|
||||
use bincode::ErrorKind as BincodeError;
|
||||
use itertools::{chain, Itertools};
|
||||
use itertools::Itertools;
|
||||
use log::error;
|
||||
use mongodb::bson::{
|
||||
bson, spec::BinarySubtype as BsonBinarySubtype, Binary as BsonBinaryContents,
|
||||
|
@ -372,7 +372,7 @@ impl FormatterContext {
|
|||
|
||||
fn construct_message_headers(
|
||||
&self,
|
||||
header_fields: &Vec<(String, usize)>,
|
||||
header_fields: &[(String, usize)],
|
||||
encode_bytes: bool,
|
||||
) -> Vec<PreparedMessageHeader> {
|
||||
let mut headers = Vec::with_capacity(header_fields.len() + 2);
|
||||
|
@ -395,7 +395,7 @@ impl FormatterContext {
|
|||
headers
|
||||
}
|
||||
|
||||
pub fn construct_kafka_headers(&self, header_fields: &Vec<(String, usize)>) -> KafkaHeaders {
|
||||
pub fn construct_kafka_headers(&self, header_fields: &[(String, usize)]) -> KafkaHeaders {
|
||||
let raw_headers = self.construct_message_headers(header_fields, false);
|
||||
let mut kafka_headers = KafkaHeaders::new_with_capacity(raw_headers.len());
|
||||
for header in raw_headers {
|
||||
|
@ -407,7 +407,7 @@ impl FormatterContext {
|
|||
kafka_headers
|
||||
}
|
||||
|
||||
pub fn construct_nats_headers(&self, header_fields: &Vec<(String, usize)>) -> NatsHeaders {
|
||||
pub fn construct_nats_headers(&self, header_fields: &[(String, usize)]) -> NatsHeaders {
|
||||
let raw_headers = self.construct_message_headers(header_fields, true);
|
||||
let mut nats_headers = NatsHeaders::new();
|
||||
for header in raw_headers {
|
||||
|
@ -571,7 +571,7 @@ fn parse_str_with_type(raw_value: &str, type_: &Type) -> Result<Value, DynError>
|
|||
// Anything else can be safely treated as a `Value::None`
|
||||
Type::Json if raw_value != "null" => return Ok(Value::None),
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
match type_.unoptionalize() {
|
||||
Type::Any | Type::String => Ok(Value::from(raw_value)),
|
||||
|
@ -640,11 +640,15 @@ fn parse_with_type(
|
|||
}
|
||||
|
||||
fn ensure_all_fields_in_schema(
|
||||
key_column_names: &Option<Vec<String>>,
|
||||
value_column_names: &Vec<String>,
|
||||
key_column_names: Option<&Vec<String>>,
|
||||
value_column_names: &[String],
|
||||
schema: &HashMap<String, InnerSchemaField>,
|
||||
) -> Result<()> {
|
||||
for name in chain!(key_column_names.iter().flatten(), value_column_names) {
|
||||
for name in key_column_names
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.chain(value_column_names)
|
||||
{
|
||||
if !schema.contains_key(name) {
|
||||
return Err(Error::FieldNotInSchema {
|
||||
name: name.clone(),
|
||||
|
@ -664,7 +668,7 @@ impl DsvParser {
|
|||
schema: HashMap<String, InnerSchemaField>,
|
||||
) -> Result<DsvParser> {
|
||||
ensure_all_fields_in_schema(
|
||||
&settings.key_column_names,
|
||||
settings.key_column_names.as_ref(),
|
||||
&settings.value_column_names,
|
||||
&schema,
|
||||
)?;
|
||||
|
@ -698,7 +702,7 @@ impl DsvParser {
|
|||
None => {
|
||||
requested_indices.insert(field.clone(), vec![index]);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
for (index, value) in tokenized_entries.iter().enumerate() {
|
||||
|
@ -870,7 +874,7 @@ pub enum KeyGenerationPolicy {
|
|||
}
|
||||
|
||||
impl KeyGenerationPolicy {
|
||||
fn generate(self, key: &Option<Vec<u8>>, parse_utf8: bool) -> Option<DynResult<Vec<Value>>> {
|
||||
fn generate(self, key: Option<&Vec<u8>>, parse_utf8: bool) -> Option<DynResult<Vec<Value>>> {
|
||||
match &self {
|
||||
Self::AlwaysAutogenerate => None,
|
||||
Self::PreferMessageKey => key
|
||||
|
@ -917,7 +921,8 @@ impl Parser for IdentityParser {
|
|||
KeyValue((key, value)) => match value {
|
||||
Some(bytes) => (
|
||||
DataEventType::Insert,
|
||||
self.key_generation_policy.generate(key, self.parse_utf8),
|
||||
self.key_generation_policy
|
||||
.generate(key.as_ref(), self.parse_utf8),
|
||||
value_from_bytes(bytes, self.parse_utf8),
|
||||
Ok(None),
|
||||
),
|
||||
|
@ -1636,7 +1641,7 @@ impl JsonLinesParser {
|
|||
session_type: SessionType,
|
||||
schema_registry_decoder: Option<RegistryJsonDecoder>,
|
||||
) -> Result<JsonLinesParser> {
|
||||
ensure_all_fields_in_schema(&key_field_names, &value_field_names, &schema)?;
|
||||
ensure_all_fields_in_schema(key_field_names.as_ref(), &value_field_names, &schema)?;
|
||||
Ok(JsonLinesParser {
|
||||
key_field_names,
|
||||
value_field_names,
|
||||
|
@ -1748,7 +1753,7 @@ impl TransparentParser {
|
|||
schema: HashMap<String, InnerSchemaField>,
|
||||
session_type: SessionType,
|
||||
) -> Result<TransparentParser> {
|
||||
ensure_all_fields_in_schema(&key_field_names, &value_field_names, &schema)?;
|
||||
ensure_all_fields_in_schema(key_field_names.as_ref(), &value_field_names, &schema)?;
|
||||
Ok(TransparentParser {
|
||||
key_field_names,
|
||||
value_field_names,
|
||||
|
|
|
@ -159,31 +159,31 @@ impl DeltaOptimizerRule {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct SchemaMismatchDetails {
|
||||
columns_outside_existing_schema: Vec<String>,
|
||||
columns_missing_in_user_schema: Vec<String>,
|
||||
columns_mismatching_types: Vec<FieldMismatchDetails>,
|
||||
outside_existing_schema: Vec<String>,
|
||||
missing_in_user_schema: Vec<String>,
|
||||
mismatching_types: Vec<FieldMismatchDetails>,
|
||||
}
|
||||
|
||||
impl fmt::Display for SchemaMismatchDetails {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let mut error_parts = Vec::new();
|
||||
if !self.columns_outside_existing_schema.is_empty() {
|
||||
if !self.outside_existing_schema.is_empty() {
|
||||
let error_part = format!(
|
||||
"Fields in the provided schema that aren't present in the existing table: {:?}",
|
||||
self.columns_outside_existing_schema
|
||||
self.outside_existing_schema
|
||||
);
|
||||
error_parts.push(error_part);
|
||||
}
|
||||
if !self.columns_missing_in_user_schema.is_empty() {
|
||||
if !self.missing_in_user_schema.is_empty() {
|
||||
let error_part = format!(
|
||||
"Fields in the existing table that aren't present in the provided schema: {:?}",
|
||||
self.columns_missing_in_user_schema
|
||||
self.missing_in_user_schema
|
||||
);
|
||||
error_parts.push(error_part);
|
||||
}
|
||||
if !self.columns_mismatching_types.is_empty() {
|
||||
if !self.mismatching_types.is_empty() {
|
||||
let formatted_mismatched_types = self
|
||||
.columns_mismatching_types
|
||||
.mismatching_types
|
||||
.iter()
|
||||
.map(|item| format!("{item}"))
|
||||
.join(", ");
|
||||
|
@ -304,9 +304,9 @@ impl DeltaBatchWriter {
|
|||
existing_schema: &IndexMap<String, DeltaTableStructField>,
|
||||
user_schema: &[DeltaTableStructField],
|
||||
) -> Result<(), WriteError> {
|
||||
let mut columns_outside_existing_schema: Vec<String> = Vec::new();
|
||||
let mut columns_missing_in_user_schema: Vec<String> = Vec::new();
|
||||
let mut columns_mismatching_types = Vec::new();
|
||||
let mut outside_existing_schema: Vec<String> = Vec::new();
|
||||
let mut missing_in_user_schema: Vec<String> = Vec::new();
|
||||
let mut mismatching_types = Vec::new();
|
||||
let mut has_error = false;
|
||||
|
||||
let mut defined_user_columns = HashSet::new();
|
||||
|
@ -314,14 +314,14 @@ impl DeltaBatchWriter {
|
|||
let name = &user_column.name;
|
||||
defined_user_columns.insert(name.to_string());
|
||||
let Some(schema_column) = existing_schema.get(name) else {
|
||||
columns_outside_existing_schema.push(name.to_string());
|
||||
outside_existing_schema.push(name.to_string());
|
||||
has_error = true;
|
||||
continue;
|
||||
};
|
||||
let nullability_differs = user_column.nullable != schema_column.nullable;
|
||||
let data_type_differs = user_column.data_type != schema_column.data_type;
|
||||
if nullability_differs || data_type_differs {
|
||||
columns_mismatching_types.push(FieldMismatchDetails {
|
||||
mismatching_types.push(FieldMismatchDetails {
|
||||
schema_field: schema_column.clone(),
|
||||
user_field: user_column.clone(),
|
||||
});
|
||||
|
@ -330,16 +330,16 @@ impl DeltaBatchWriter {
|
|||
}
|
||||
for schema_column in existing_schema.keys() {
|
||||
if !defined_user_columns.contains(schema_column) {
|
||||
columns_missing_in_user_schema.push(schema_column.to_string());
|
||||
missing_in_user_schema.push(schema_column.to_string());
|
||||
has_error = true;
|
||||
}
|
||||
}
|
||||
|
||||
if has_error {
|
||||
let schema_mismatch_details = SchemaMismatchDetails {
|
||||
columns_outside_existing_schema,
|
||||
columns_missing_in_user_schema,
|
||||
columns_mismatching_types,
|
||||
outside_existing_schema,
|
||||
missing_in_user_schema,
|
||||
mismatching_types,
|
||||
};
|
||||
Err(WriteError::DeltaTableSchemaMismatch(
|
||||
schema_mismatch_details,
|
||||
|
@ -585,7 +585,6 @@ pub fn read_delta_table<S: std::hash::BuildHasher>(
|
|||
}
|
||||
Err(_) => {
|
||||
n_errors += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,11 +147,7 @@ pub fn parquet_value_into_pathway_value(
|
|||
}
|
||||
(ParquetValue::Bytes(b), Type::Bytes | Type::Any) => Some(Value::Bytes(b.data().into())),
|
||||
(ParquetValue::Bytes(b), Type::PyObjectWrapper) => {
|
||||
if let Ok(value) = bincode::deserialize::<Value>(b.data()) {
|
||||
Some(value)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
bincode::deserialize::<Value>(b.data()).ok()
|
||||
}
|
||||
(ParquetValue::ListInternal(parquet_list), Type::List(nested_type)) => {
|
||||
let mut values = Vec::new();
|
||||
|
|
|
@ -40,7 +40,7 @@ impl Writer for LakeWriter {
|
|||
let commit_needed = self.buffer.has_updates()
|
||||
&& (self
|
||||
.min_commit_frequency
|
||||
.map_or(true, |f| self.last_commit_at.elapsed() >= f)
|
||||
.is_none_or(|f| self.last_commit_at.elapsed() >= f)
|
||||
|| forced);
|
||||
if commit_needed {
|
||||
let (batch, payload_type) = self.buffer.build_update_record_batch()?;
|
||||
|
|
|
@ -1306,7 +1306,7 @@ impl PsqlWriter {
|
|||
snapshot_mode: bool,
|
||||
table_name: &str,
|
||||
schema: &HashMap<String, Type>,
|
||||
key_field_names: &Option<Vec<String>>,
|
||||
key_field_names: Option<&Vec<String>>,
|
||||
mode: SqlWriterInitMode,
|
||||
) -> Result<PsqlWriter, WriteError> {
|
||||
let mut writer = PsqlWriter {
|
||||
|
@ -1325,7 +1325,7 @@ impl PsqlWriter {
|
|||
mode: SqlWriterInitMode,
|
||||
table_name: &str,
|
||||
schema: &HashMap<String, Type>,
|
||||
key_field_names: &Option<Vec<String>>,
|
||||
key_field_names: Option<&Vec<String>>,
|
||||
) -> Result<(), WriteError> {
|
||||
match mode {
|
||||
SqlWriterInitMode::Default => return Ok(()),
|
||||
|
@ -1353,7 +1353,7 @@ impl PsqlWriter {
|
|||
transaction: &mut PsqlTransaction,
|
||||
table_name: &str,
|
||||
schema: &HashMap<String, Type>,
|
||||
key_field_names: &Option<Vec<String>>,
|
||||
key_field_names: Option<&Vec<String>>,
|
||||
) -> Result<(), WriteError> {
|
||||
let columns: Vec<String> = schema
|
||||
.iter()
|
||||
|
@ -1662,7 +1662,6 @@ impl Writer for KafkaWriter {
|
|||
)) => {
|
||||
self.producer.poll(Duration::from_millis(10));
|
||||
entry = unsent_entry;
|
||||
continue;
|
||||
}
|
||||
Err((e, _unsent_entry)) => return Err(WriteError::Kafka(e)),
|
||||
}
|
||||
|
|
|
@ -409,7 +409,7 @@ impl Connector {
|
|||
error_reporter.report(EngineError::ReaderFailed(error));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if finished {
|
||||
break;
|
||||
|
@ -803,7 +803,7 @@ impl Connector {
|
|||
SnapshotEvent::AdvanceTime(_, _) | SnapshotEvent::Finished => {
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -905,7 +905,7 @@ impl Connector {
|
|||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ impl HashInto for OffsetKey {
|
|||
}
|
||||
OffsetKey::Nats(worker_index) => worker_index.hash_into(hasher),
|
||||
OffsetKey::Empty => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,7 +142,7 @@ impl HashInto for OffsetValue {
|
|||
snapshot_id.hash_into(hasher);
|
||||
}
|
||||
OffsetValue::Empty => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ impl FilesystemScanner {
|
|||
result.push(QueuedAction::Update(encoded_path.clone(), actual_metadata));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
@ -128,7 +128,10 @@ impl FilesystemScanner {
|
|||
|
||||
// Otherwise scan all files in all subdirectories and add them
|
||||
let Some(path) = entry.to_str() else {
|
||||
error!("Non-unicode paths are not supported. Ignoring: {entry:?}");
|
||||
error!(
|
||||
"Non-unicode paths are not supported. Ignoring: {}",
|
||||
entry.display()
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
|
|
|
@ -241,7 +241,7 @@ impl ConnectorGroup {
|
|||
// minimal must be taken into consideration
|
||||
if self.next_proposed_value[source_id]
|
||||
.as_ref()
|
||||
.map_or(true, |current| current > candidate)
|
||||
.is_none_or(|current| current > candidate)
|
||||
{
|
||||
self.next_proposed_value[source_id] = Some(candidate.clone());
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ impl ConnectorGroup {
|
|||
}
|
||||
if chosen_ws_ref
|
||||
.as_ref()
|
||||
.map_or(true, |current| current.requested_value > ws.requested_value)
|
||||
.is_none_or(|current| current.requested_value > ws.requested_value)
|
||||
{
|
||||
chosen_ws_ref = Some(ws);
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ use std::cell::RefCell;
|
|||
use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Write;
|
||||
use std::hash::Hash;
|
||||
use std::iter::once;
|
||||
use std::marker::PhantomData;
|
||||
|
@ -610,9 +611,9 @@ impl ErrorLogInner {
|
|||
fn maybe_flush(&mut self) -> SystemTime {
|
||||
// returns time of the next flush
|
||||
let now = SystemTime::now();
|
||||
let flush = self.last_flush.map_or(true, |last_flush| {
|
||||
last_flush + ERROR_LOG_FLUSH_PERIOD <= now
|
||||
});
|
||||
let flush = self
|
||||
.last_flush
|
||||
.is_none_or(|last_flush| last_flush + ERROR_LOG_FLUSH_PERIOD <= now);
|
||||
if flush {
|
||||
self.last_flush = Some(now);
|
||||
let new_timestamp = Timestamp::new_from_current_time();
|
||||
|
@ -1368,7 +1369,7 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
|
|||
&values,
|
||||
column_properties.trace.clone(),
|
||||
)?;
|
||||
};
|
||||
}
|
||||
|
||||
let column_handle = self.columns.alloc(
|
||||
Column::from_collection(universe_handle, values)
|
||||
|
@ -2112,7 +2113,7 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
|
|||
.join_core(column.values_arranged(), |k, (), v| once((*k, v.clone())));
|
||||
if !self.ignore_asserts {
|
||||
self.assert_input_keys_match_output_keys(universe.keys(), &new_values, trace)?;
|
||||
};
|
||||
}
|
||||
let new_column_handle = self
|
||||
.columns
|
||||
.alloc(Column::from_collection(universe_handle, new_values));
|
||||
|
@ -2153,7 +2154,7 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
|
|||
|
||||
if !self.ignore_asserts && same_universes {
|
||||
self.assert_input_keys_match_output_keys(original_table.keys(), &result, trace)?;
|
||||
};
|
||||
}
|
||||
|
||||
Ok(self
|
||||
.tables
|
||||
|
@ -3040,7 +3041,7 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
|
|||
let column_value = column_path
|
||||
.extract(key, values)
|
||||
.unwrap_with_reporter(&error_reporter);
|
||||
values_str.push_str(&format!(", {name}={column_value:?}"));
|
||||
write!(&mut values_str, ", {name}={column_value:?}").unwrap();
|
||||
}
|
||||
println!("[{worker}][{tag}] @{time:?} {diff:+} id={key}{values_str}");
|
||||
});
|
||||
|
@ -3724,7 +3725,7 @@ impl<S: MaybeTotalScope<MaybeTotalTimestamp = Timestamp>> DataflowGraphInner<S>
|
|||
&& self
|
||||
.persistence_wrapper
|
||||
.get_persistence_config()
|
||||
.map_or(true, |config| config.continue_after_replay);
|
||||
.is_none_or(|config| config.continue_after_replay);
|
||||
let persisted_table = internal_persistent_id.is_some()
|
||||
&& self
|
||||
.persistence_wrapper
|
||||
|
@ -4133,7 +4134,7 @@ impl<S: MaybeTotalScope<MaybeTotalTimestamp = Timestamp>> DataflowGraphInner<S>
|
|||
.send(OutputEvent::Commit(frontier.first().copied()))
|
||||
.expect("sending output commit should not fail");
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
.probe_with(&self.output_probe);
|
||||
|
||||
|
@ -6326,7 +6327,7 @@ where
|
|||
catch_unwind(AssertUnwindSafe(|| drop(guards.join()))).unwrap_or(());
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let res = guards
|
||||
.join()
|
||||
|
|
|
@ -1,33 +1,5 @@
|
|||
// Copyright © 2025 Pathway
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::mem::take;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use differential_dataflow::input::InputSession;
|
||||
use differential_dataflow::{AsCollection, Collection};
|
||||
use timely::dataflow::operators::{Exchange, Inspect, Map};
|
||||
use timely::progress::Timestamp as _;
|
||||
|
||||
use crate::connectors::adaptors::InputAdaptor;
|
||||
use crate::connectors::data_format::Parser;
|
||||
use crate::connectors::data_storage::ReaderBuilder;
|
||||
use crate::connectors::{Connector, PersistenceMode, SnapshotAccess};
|
||||
use crate::engine::graph::{SubscribeCallbacks, SubscribeConfig};
|
||||
use crate::engine::{
|
||||
ColumnPath, Error, Key, OriginalOrRetraction, Result, TableHandle, TableProperties, Timestamp,
|
||||
Value,
|
||||
};
|
||||
|
||||
use super::maybe_total::MaybeTotalScope;
|
||||
use super::operators::output::ConsolidateForOutput;
|
||||
use super::operators::{MapWrapped, MaybeTotal, Reshard};
|
||||
use super::{DataflowGraphInner, MaybePersist, Table, Tuple};
|
||||
|
||||
/// Copyright © 2025 Pathway
|
||||
///
|
||||
/// `AsyncTransformer` allows for fully asynchronous computation on the python side.
|
||||
/// Computation results are returned to the engine in a later (or equal) time
|
||||
/// than the entry that triggered the computation.
|
||||
|
@ -66,6 +38,33 @@ use super::{DataflowGraphInner, MaybePersist, Table, Tuple};
|
|||
/// - upsert operator with persistence. Upsert is there to avoid recomputation for deletions. Without upserting,
|
||||
/// there could be problems with inconsistency as the `AsyncTransformer` can be non-deterministic.
|
||||
/// Persistence is there to be able to update values computed in previous runs.
|
||||
use std::cell::RefCell;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::mem::take;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use differential_dataflow::input::InputSession;
|
||||
use differential_dataflow::{AsCollection, Collection};
|
||||
use timely::dataflow::operators::{Exchange, Inspect, Map};
|
||||
use timely::progress::Timestamp as _;
|
||||
|
||||
use crate::connectors::adaptors::InputAdaptor;
|
||||
use crate::connectors::data_format::Parser;
|
||||
use crate::connectors::data_storage::ReaderBuilder;
|
||||
use crate::connectors::{Connector, PersistenceMode, SnapshotAccess};
|
||||
use crate::engine::graph::{SubscribeCallbacks, SubscribeConfig};
|
||||
use crate::engine::{
|
||||
ColumnPath, Error, Key, OriginalOrRetraction, Result, TableHandle, TableProperties, Timestamp,
|
||||
Value,
|
||||
};
|
||||
|
||||
use super::maybe_total::MaybeTotalScope;
|
||||
use super::operators::output::ConsolidateForOutput;
|
||||
use super::operators::{MapWrapped, MaybeTotal, Reshard};
|
||||
use super::{DataflowGraphInner, MaybePersist, Table, Tuple};
|
||||
|
||||
struct AsyncTransformerSession {
|
||||
input_session: InputSession<Timestamp, (Key, Value, i64), isize>,
|
||||
|
@ -82,12 +81,11 @@ impl AsyncTransformerSession {
|
|||
}
|
||||
}
|
||||
|
||||
/// The implementation below mostly reuses differetial dataflow's `InputSession` internals.
|
||||
///
|
||||
/// It adds a sequential id for each entry so that it is possible later to
|
||||
/// deduplicate entries for a single (key, time) pair leaving only the last one.
|
||||
impl InputAdaptor<Timestamp> for AsyncTransformerSession {
|
||||
/// The implementation below mostly reuses differetial dataflow's `InputSession` internals.
|
||||
///
|
||||
/// It adds a sequential id for each entry so that it is possible later to
|
||||
/// deduplicate entries for a single (key, time) pair leaving only the last one.
|
||||
|
||||
fn new() -> Self {
|
||||
AsyncTransformerSession {
|
||||
input_session: InputSession::new(),
|
||||
|
|
|
@ -117,7 +117,7 @@ struct Context<'a> {
|
|||
requests: RefCell<Vec<Request>>,
|
||||
}
|
||||
|
||||
impl<'a> ContextTrait for Context<'a> {
|
||||
impl ContextTrait for Context<'_> {
|
||||
fn this_row(&self) -> Key {
|
||||
self.key
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ impl CountStats {
|
|||
self.current_rows += diff;
|
||||
}
|
||||
|
||||
#[allow(clippy::manual_midpoint)]
|
||||
pub fn get_insertions(&self) -> isize {
|
||||
(self.current_rows + self.total_rows) / 2
|
||||
}
|
||||
|
|
|
@ -26,15 +26,13 @@ pub trait Index<K, V, R, K2, V2, Ret> {
|
|||
fn search(&self, batch: Vec<(K2, V2, R)>) -> Vec<(K2, Ret, R)>;
|
||||
}
|
||||
|
||||
/**
|
||||
Trait denoting that given collection can accept:
|
||||
- a query stream,
|
||||
- an implementation of Index providing indices:
|
||||
-- accepting self to modify the index (`take_update`, handling adding / removing index entries)
|
||||
-- accepting elements of query stream as queries (`search`)
|
||||
|
||||
and produces a stream of queries extended by tuples of matching IDs (according to current state (as-of-now) `ExternalIndex`)
|
||||
*/
|
||||
/// Trait denoting that given collection can accept:
|
||||
/// - a query stream,
|
||||
/// - an implementation of Index providing indices:
|
||||
/// -- accepting self to modify the index (`take_update`, handling adding / removing index entries)
|
||||
/// -- accepting elements of query stream as queries (`search`)
|
||||
///
|
||||
/// and produces a stream of queries extended by tuples of matching IDs (according to current state (as-of-now) `ExternalIndex`)
|
||||
pub trait UseExternalIndexAsOfNow<G: Scope, K: ExchangeData, V: ExchangeData, R: Abelian> {
|
||||
fn use_external_index_as_of_now<K2, V2, Ret>(
|
||||
&self,
|
||||
|
@ -68,16 +66,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Implementation of `use_external_index_as_of_now`.
|
||||
- it duplicates the index stream, to make it available for all workers
|
||||
- it synchronizes index and query streams via concatenation, so that we work on data with the same timestamp
|
||||
|
||||
The index stream only changes the state of the external index (according to its implementation), each query
|
||||
in the query stream generates one entry in the output stream (so it's a map-like operator from the point of view
|
||||
of query stream)
|
||||
*/
|
||||
|
||||
/// Implementation of `use_external_index_as_of_now`.
|
||||
/// - it duplicates the index stream, to make it available for all workers
|
||||
/// - it synchronizes index and query streams via concatenation, so that we work on data with the same timestamp
|
||||
///
|
||||
/// The index stream only changes the state of the external index (according to its implementation), each query
|
||||
/// in the query stream generates one entry in the output stream (so it's a map-like operator from the point of view
|
||||
/// of query stream)
|
||||
fn use_external_index_as_of_now_core<G, K, K2, V, V2, R, Ret>(
|
||||
index_stream: &Collection<G, (K, V), R>,
|
||||
query_stream: &Collection<G, (K2, V2), R>,
|
||||
|
|
|
@ -147,7 +147,7 @@ fn apply_to_fragment<K, V, T, R, St, Sc, C, P>(
|
|||
to: Option<&K>,
|
||||
negate: bool,
|
||||
output: &mut OutputHandle<'_, T, ((K, (V, Sc)), T, R), P>,
|
||||
time: &Option<T>,
|
||||
time: Option<&T>,
|
||||
capability: &Capability<T>,
|
||||
) where
|
||||
K: ExchangeData + HasMaxValue,
|
||||
|
@ -213,7 +213,7 @@ fn replace_in_fragment<K, V, T, R, St, Sc, C, P>(
|
|||
from: Option<&K>,
|
||||
to: Option<&K>,
|
||||
output: &mut OutputHandle<'_, T, ((K, (V, Sc)), T, R), P>,
|
||||
time: &Option<T>,
|
||||
time: Option<&T>,
|
||||
capability: &Capability<T>,
|
||||
) where
|
||||
K: ExchangeData + HasMaxValue,
|
||||
|
@ -255,7 +255,7 @@ fn replace_in_fragment<K, V, T, R, St, Sc, C, P>(
|
|||
}
|
||||
|
||||
fn get_new_triplet_from_input_vec<K, V, T, R>(
|
||||
current: &Option<ApxToBroadcast<V>>,
|
||||
current: Option<&ApxToBroadcast<V>>,
|
||||
input: &Vec<((K, (V, V, V)), T, R)>,
|
||||
) -> Option<(T, ApxToBroadcast<V>)>
|
||||
where
|
||||
|
@ -291,7 +291,7 @@ where
|
|||
let (&(_, received_triplet), _) = non_zero[0];
|
||||
|
||||
let (&(_key, received_triplet), (retrieved_time, _r)) = if non_zero.len() == 2
|
||||
&& current == &Some(ApxToBroadcast::from_triplet(received_triplet.clone()))
|
||||
&& current == Some(&ApxToBroadcast::from_triplet(received_triplet.clone()))
|
||||
{
|
||||
non_zero[1]
|
||||
} else {
|
||||
|
@ -370,7 +370,7 @@ where
|
|||
None,
|
||||
false,
|
||||
output,
|
||||
&None,
|
||||
None,
|
||||
&cap1,
|
||||
);
|
||||
}
|
||||
|
@ -391,7 +391,8 @@ where
|
|||
|
||||
old_triplet.clone_from(&triplet);
|
||||
|
||||
let processed = get_new_triplet_from_input_vec(&triplet, &input2_buffer);
|
||||
let processed =
|
||||
get_new_triplet_from_input_vec(triplet.as_ref(), &input2_buffer);
|
||||
if processed.is_none() {
|
||||
return;
|
||||
}
|
||||
|
@ -431,7 +432,7 @@ where
|
|||
Some(from),
|
||||
Some(to),
|
||||
output,
|
||||
&Some(time.clone()),
|
||||
Some(&time.clone()),
|
||||
&cap2,
|
||||
);
|
||||
}
|
||||
|
@ -446,7 +447,7 @@ where
|
|||
None,
|
||||
None,
|
||||
output,
|
||||
&Some(time.clone()),
|
||||
Some(&time.clone()),
|
||||
&cap2,
|
||||
);
|
||||
} else if let Some(unwrapped_triplet) = triplet.clone() {
|
||||
|
@ -465,7 +466,7 @@ where
|
|||
Some(from),
|
||||
Some(to),
|
||||
output,
|
||||
&Some(time.clone()),
|
||||
Some(&time.clone()),
|
||||
&cap2,
|
||||
);
|
||||
} else {
|
||||
|
@ -477,7 +478,7 @@ where
|
|||
None,
|
||||
false,
|
||||
output,
|
||||
&Some(time.clone()),
|
||||
Some(&time.clone()),
|
||||
&cap2,
|
||||
);
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ impl<K, T> CarryEntry<K, T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns true, if total weight associated with the current key
|
||||
/// Returns true, if total weight associated with the current key
|
||||
/// of the cursor passed in wrapper is zero.
|
||||
fn key_has_zero_weight<C: Cursor>(wrapper: &mut CursorStorageWrapper<C>, time: &C::Time) -> bool
|
||||
where
|
||||
|
@ -216,7 +216,7 @@ fn push_insert_replace<C: Cursor>(
|
|||
fn push_prev_replace<K, C: Cursor<Key = K, Val = (Option<K>, Option<K>)>>(
|
||||
wrapper: &mut CursorStorageWrapper<C>,
|
||||
key: &K,
|
||||
new_prev: &Option<K>,
|
||||
new_prev: Option<&K>,
|
||||
time: &C::Time,
|
||||
batch_builder: &mut OutputBatchBuilder<C::Key, C::Val, C::Time, C::R>,
|
||||
) where
|
||||
|
@ -239,11 +239,11 @@ fn push_prev_replace<K, C: Cursor<Key = K, Val = (Option<K>, Option<K>)>>(
|
|||
));
|
||||
log::debug!(
|
||||
"bb.p inserting {:?} for {key:?} in push-prev-replace",
|
||||
(new_prev.clone(), val.1.clone())
|
||||
(new_prev, val.1.clone())
|
||||
);
|
||||
batch_builder.push((
|
||||
key.clone(),
|
||||
(new_prev.clone(), val.1.clone()),
|
||||
(new_prev.cloned(), val.1.clone()),
|
||||
time.clone(),
|
||||
weight.unwrap(),
|
||||
));
|
||||
|
@ -518,7 +518,7 @@ where
|
|||
push_prev_replace(
|
||||
output_wrapper,
|
||||
carry_entry.next.as_ref().unwrap(),
|
||||
&carry_entry.key.clone(),
|
||||
carry_entry.key.clone().as_ref(),
|
||||
time,
|
||||
batch_builder,
|
||||
);
|
||||
|
@ -633,7 +633,7 @@ where
|
|||
push_prev_replace(
|
||||
output_wrapper,
|
||||
&carry_entry.next.unwrap(),
|
||||
&carry_entry.key,
|
||||
carry_entry.key.as_ref(),
|
||||
&carry_entry.time.unwrap(),
|
||||
batch_builder,
|
||||
);
|
||||
|
@ -759,7 +759,7 @@ fn handle_one_instance<
|
|||
push_prev_replace(
|
||||
output_wrapper,
|
||||
&key_to_fix,
|
||||
&carry_entry.key,
|
||||
carry_entry.key.as_ref(),
|
||||
carry_entry.time.as_ref().unwrap(),
|
||||
batch_builder,
|
||||
);
|
||||
|
@ -821,7 +821,7 @@ where
|
|||
let (mut cursor, storage) =
|
||||
input_arrangement.trace.bidirectional_cursor();
|
||||
|
||||
log::debug!("pushing batch {:?}", entries);
|
||||
log::debug!("pushing batch {entries:?}");
|
||||
let mut output_wrapper = CursorStorageWrapper {
|
||||
cursor: &mut output_cursor,
|
||||
storage: &output_storage,
|
||||
|
|
|
@ -258,7 +258,7 @@ fn push_key_values_to_output<K, C: Cursor, P>(
|
|||
output: &mut OutputHandle<'_, C::Time, ((K, C::Val), C::Time, C::R), P>,
|
||||
capability: &Capability<C::Time>,
|
||||
k: &K,
|
||||
time: &Option<C::Time>,
|
||||
time: Option<&C::Time>,
|
||||
) where
|
||||
K: Data + 'static,
|
||||
C::Val: Data + 'static,
|
||||
|
@ -277,11 +277,13 @@ fn push_key_values_to_output<K, C: Cursor, P>(
|
|||
let weight = key_val_total_weight(wrapper);
|
||||
let curr_val = wrapper.cursor.val(wrapper.storage);
|
||||
if let Some(weight) = weight.filter(|w| !w.is_zero()) {
|
||||
let time = time.as_ref().unwrap();
|
||||
assert!(time >= capability.time());
|
||||
output
|
||||
.session(&capability)
|
||||
.give(((k.clone(), curr_val.clone()), time.clone(), weight));
|
||||
let time = time.cloned().unwrap();
|
||||
assert!(time >= *capability.time());
|
||||
output.session(&capability).give((
|
||||
(k.clone(), curr_val.clone()),
|
||||
time.clone().clone(),
|
||||
weight,
|
||||
));
|
||||
}
|
||||
wrapper.cursor.step_val(wrapper.storage);
|
||||
}
|
||||
|
@ -451,7 +453,7 @@ where
|
|||
output,
|
||||
&capability.delayed(&time),
|
||||
&tk.key,
|
||||
&Some(time.clone()),
|
||||
Some(&time.clone()),
|
||||
);
|
||||
input_wrapper.cursor.step_key(input_wrapper.storage);
|
||||
}
|
||||
|
@ -510,7 +512,7 @@ where
|
|||
output,
|
||||
maybe_cap.as_ref().unwrap(),
|
||||
&tk.key,
|
||||
&Some(maybe_cap.as_ref().unwrap().time().clone()),
|
||||
Some(&maybe_cap.as_ref().unwrap().time().clone()),
|
||||
);
|
||||
|
||||
input_wrapper.cursor.step_key(input_wrapper.storage);
|
||||
|
|
|
@ -174,7 +174,7 @@ where
|
|||
/// Because we have `Box<dyn PersistenceWrapper<S>>` in `DataflowGraphInner`
|
||||
/// and when `PersistenceWrapper` trait has a generic method, we can't create `Box<dyn PersistenceWrapper<S>>`
|
||||
/// ("the trait cannot be made into an object" error).
|
||||
|
||||
///
|
||||
/// And why do we need `PersistenceWrapper` at all?
|
||||
/// We need it to create operator snapshot writer that uses `Timestamp` type. The operator snapshot writer
|
||||
/// cannot be generic in `MaybeTotalTimestamp` as the whole persistence uses concrete `Timestamp`, not generic `MaybeTotalTimestamp`. It makes
|
||||
|
@ -183,7 +183,6 @@ where
|
|||
/// because is used in methods that don't have the constraint S: `MaybeTotalScope`<`MaybeTotalTimestamp` = `Timestamp`>.
|
||||
/// To handle this, operator snapshot writer is created in a separate object (instance of `TimestampBasedPersistenceWrapper`)
|
||||
/// that is aware that `MaybeTotalTimestamp` = `Timestamp`.
|
||||
|
||||
pub(super) enum PersistableCollection<S: MaybeTotalScope> {
|
||||
KeyValueIsize(Collection<S, (Key, Value), isize>),
|
||||
KeyIntSumState(Collection<S, Key, IntSumState>),
|
||||
|
|
|
@ -362,9 +362,7 @@ pub fn register_custom_panic_hook() {
|
|||
Some(message) => Some(*message),
|
||||
None => payload.downcast_ref::<String>().map(String::as_str),
|
||||
};
|
||||
if message.map_or(true, |message| {
|
||||
!OTHER_WORKER_ERROR_MESSAGES.contains(&message)
|
||||
}) {
|
||||
if message.is_none_or(|message| !OTHER_WORKER_ERROR_MESSAGES.contains(&message)) {
|
||||
prev(panic_info);
|
||||
}
|
||||
}));
|
||||
|
|
|
@ -35,7 +35,7 @@ pub enum MaybeOwnedValues<'a> {
|
|||
Borrowed(&'a [Value]),
|
||||
}
|
||||
|
||||
impl<'a> Deref for MaybeOwnedValues<'a> {
|
||||
impl Deref for MaybeOwnedValues<'_> {
|
||||
type Target = [Value];
|
||||
|
||||
fn deref(&self) -> &[Value] {
|
||||
|
|
|
@ -171,7 +171,7 @@ pub fn start_http_server_thread(
|
|||
_ => {
|
||||
*response.status_mut() = StatusCode::NOT_FOUND;
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok::<_, Error>(response)
|
||||
}
|
||||
}))
|
||||
|
|
|
@ -86,7 +86,7 @@ impl License {
|
|||
License::NoLicenseKey => false,
|
||||
License::OfflineLicense(license) => license.telemetry_required,
|
||||
License::LicenseKey(key) => check_license_key_entitlements(key.clone(), vec![])
|
||||
.map_or(false, |result| result.telemetry_required()),
|
||||
.is_ok_and(|result| result.telemetry_required()),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -9,8 +9,8 @@ use differential_dataflow::{
|
|||
};
|
||||
use ordered_float::OrderedFloat;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::any::type_name;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{any::type_name, iter::repeat};
|
||||
use std::{cmp::Reverse, sync::Arc};
|
||||
|
||||
use super::{error::DynResult, DataError, Key, Value};
|
||||
|
@ -506,7 +506,7 @@ impl UnaryReducerImpl for SortedTupleReducer {
|
|||
.flat_map(|(state, cnt)| {
|
||||
state
|
||||
.iter()
|
||||
.flat_map(move |v| repeat(v).take(cnt.get()))
|
||||
.flat_map(move |v| std::iter::repeat_n(v, cnt.get()))
|
||||
.cloned()
|
||||
})
|
||||
.collect())
|
||||
|
@ -551,7 +551,7 @@ impl ReducerImpl for TupleReducer {
|
|||
.flat_map(|(state, cnt)| {
|
||||
state
|
||||
.iter()
|
||||
.flat_map(move |v| repeat(v).take(cnt.get()))
|
||||
.flat_map(move |v| std::iter::repeat_n(v, cnt.get()))
|
||||
.cloned()
|
||||
})
|
||||
.collect())
|
||||
|
|
|
@ -178,7 +178,7 @@ where
|
|||
|
||||
struct JsonVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for JsonVisitor {
|
||||
impl Visitor<'_> for JsonVisitor {
|
||||
type Value = Handle<JsonValue>;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
|
|
|
@ -395,7 +395,7 @@ where
|
|||
fn handle_filter_and_unpack_data(&mut self, entry: AddDataEntry) -> DynResult<DataType> {
|
||||
if let Some(f_data_un) = entry.filter_data {
|
||||
self.filter_data_map.insert(entry.key, f_data_un.unpack()?);
|
||||
};
|
||||
}
|
||||
entry.data.unpack()
|
||||
}
|
||||
|
||||
|
|
|
@ -13,10 +13,7 @@ pub fn ensure_directory(fs_path: &Path) -> Result<(), Error> {
|
|||
}
|
||||
} else if !fs_path.is_dir() {
|
||||
// use ErrorKind::NotADirectory when it becomes stable
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"target object should be a directory",
|
||||
));
|
||||
return Err(Error::other("target object should be a directory"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -3,8 +3,8 @@
|
|||
#![warn(clippy::pedantic)]
|
||||
#![warn(clippy::cargo)]
|
||||
#![allow(clippy::must_use_candidate)] // too noisy
|
||||
|
||||
// FIXME:
|
||||
#![allow(clippy::result_large_err)] // Too noisy, around 250 warnings across the codebase. Fix gradually.
|
||||
#![allow(clippy::missing_errors_doc)]
|
||||
#![allow(clippy::missing_panics_doc)]
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ impl PersistenceBackend for FilesystemKVStorage {
|
|||
keys.push(key);
|
||||
}
|
||||
} else {
|
||||
warn!("The path is not UTF-8 encoded: {entry:?}");
|
||||
warn!("The path is not UTF-8 encoded: {}", entry.display());
|
||||
}
|
||||
}
|
||||
Ok(keys)
|
||||
|
|
|
@ -116,7 +116,7 @@ impl BackgroundObjectUploader {
|
|||
}
|
||||
},
|
||||
BackgroundUploaderEvent::Finish => break,
|
||||
};
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("background uploader failed");
|
||||
|
|
|
@ -69,19 +69,19 @@ impl MetadataEvent {
|
|||
///
|
||||
/// The implementation is as follows:
|
||||
/// * There are two types of events: object addition and object removal.
|
||||
/// These events are stored both in the selected durable storage and in several
|
||||
/// in-memory indexes, denoting events by version; sorted event sequences by the
|
||||
/// object URI and the snapshot - the actual, version-unaware state of the data
|
||||
/// structure.
|
||||
/// These events are stored both in the selected durable storage and in several
|
||||
/// in-memory indexes, denoting events by version; sorted event sequences by the
|
||||
/// object URI and the snapshot - the actual, version-unaware state of the data
|
||||
/// structure.
|
||||
/// * When the data structure starts, it reads the sequence of events and
|
||||
/// constructs the mappings described above.
|
||||
/// constructs the mappings described above.
|
||||
/// * When a rewind takes place, the versions that need to be deleted are detected
|
||||
/// and undone one by one, starting from the latest. Note that these events are
|
||||
/// also removed from the durable storage.
|
||||
/// and undone one by one, starting from the latest. Note that these events are
|
||||
/// also removed from the durable storage.
|
||||
/// * When a lookup takes place, the snapshot is used.
|
||||
/// * When an upsert or removal takes place, a new version is created. An event
|
||||
/// corresponding to this version is added to the durable storage and to the local
|
||||
/// event indexes. It is also reflected in a locally stored snapshot.
|
||||
/// corresponding to this version is added to the durable storage and to the local
|
||||
/// event indexes. It is also reflected in a locally stored snapshot.
|
||||
#[derive(Debug)]
|
||||
pub struct CachedObjectStorage {
|
||||
backend: Box<dyn PersistenceBackend>,
|
||||
|
|
|
@ -157,8 +157,7 @@ impl ReadersQueryPurpose {
|
|||
if other_worker_id % total_workers == worker_id {
|
||||
if other_worker_id != worker_id {
|
||||
info!(
|
||||
"Assigning snapshot from the former worker {other_worker_id} to worker {} due to reduced number of worker threads",
|
||||
worker_id
|
||||
"Assigning snapshot from the former worker {other_worker_id} to worker {worker_id} due to reduced number of worker threads"
|
||||
);
|
||||
}
|
||||
true
|
||||
|
|
|
@ -106,7 +106,7 @@ fn get_chunks(keys: Vec<String>, threshold_time: TotalFrontier<Timestamp>) -> Ch
|
|||
too_new.push(chunk);
|
||||
} else if max_time_per_level
|
||||
.get(1)
|
||||
.map_or(true, |max_time| chunk.time > *max_time)
|
||||
.is_none_or(|max_time| chunk.time > *max_time)
|
||||
{
|
||||
// If max_time_per_level[1] exists it means there are merged chunks.
|
||||
// Unmerged chunks are valid if their time > last merged chunk time
|
||||
|
@ -292,7 +292,7 @@ where
|
|||
{
|
||||
fn persist(&mut self, time: Timestamp, mut data: Vec<(D, R)>) {
|
||||
self.maybe_save(TotalFrontier::At(time));
|
||||
assert!(self.max_time.map_or(true, |max_time| max_time <= time));
|
||||
assert!(self.max_time.is_none_or(|max_time| max_time <= time));
|
||||
self.max_time = Some(time);
|
||||
self.buffer.append(&mut data);
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ impl VersionInformation {
|
|||
error!("Got worker id {worker_id} while only {expected_workers} workers were expected");
|
||||
return;
|
||||
}
|
||||
if self.worker_finalized_times[worker_id].map_or(true, |time| time < finalized_time) {
|
||||
if self.worker_finalized_times[worker_id].is_none_or(|time| time < finalized_time) {
|
||||
self.worker_finalized_times[worker_id] = Some(finalized_time);
|
||||
}
|
||||
}
|
||||
|
@ -199,7 +199,7 @@ fn compute_threshold_time_and_versions(
|
|||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
let mut past_runs_threshold_time = TotalFrontier::At(Timestamp(0));
|
||||
|
@ -209,7 +209,7 @@ fn compute_threshold_time_and_versions(
|
|||
let Some(threshold_time) = threshold_time else {
|
||||
continue;
|
||||
};
|
||||
if latest_stable_version.map_or(true, |current_version| current_version < *version_number) {
|
||||
if latest_stable_version.is_none_or(|current_version| current_version < *version_number) {
|
||||
latest_stable_version = Some(*version_number);
|
||||
past_runs_threshold_time = threshold_time;
|
||||
}
|
||||
|
|
|
@ -726,7 +726,7 @@ impl From<EngineError> for PyErr {
|
|||
match error.downcast::<PyErr>() {
|
||||
Ok(error) => return error,
|
||||
Err(other) => error = other,
|
||||
};
|
||||
}
|
||||
Python::with_gil(|py| {
|
||||
if let EngineError::WithTrace { inner, trace } = error {
|
||||
let inner = PyErr::from(EngineError::from(inner));
|
||||
|
@ -4983,7 +4983,7 @@ impl CsvParserSettings {
|
|||
|
||||
impl DataStorage {
|
||||
fn extract_string_field<'a>(
|
||||
field: &'a Option<String>,
|
||||
field: Option<&'a String>,
|
||||
error_message: &'static str,
|
||||
) -> PyResult<&'a str> {
|
||||
let value = field
|
||||
|
@ -4994,26 +4994,29 @@ impl DataStorage {
|
|||
}
|
||||
|
||||
fn path(&self) -> PyResult<&str> {
|
||||
Self::extract_string_field(&self.path, "For fs/s3 storage, path must be specified")
|
||||
Self::extract_string_field(
|
||||
self.path.as_ref(),
|
||||
"For fs/s3 storage, path must be specified",
|
||||
)
|
||||
}
|
||||
|
||||
fn table_name(&self) -> PyResult<&str> {
|
||||
Self::extract_string_field(
|
||||
&self.table_name,
|
||||
self.table_name.as_ref(),
|
||||
"For MongoDB, the 'table_name' field must be specified",
|
||||
)
|
||||
}
|
||||
|
||||
fn database(&self) -> PyResult<&str> {
|
||||
Self::extract_string_field(
|
||||
&self.database,
|
||||
self.database.as_ref(),
|
||||
"For MongoDB, the 'database' field must be specified",
|
||||
)
|
||||
}
|
||||
|
||||
fn connection_string(&self) -> PyResult<&str> {
|
||||
Self::extract_string_field(
|
||||
&self.connection_string,
|
||||
self.connection_string.as_ref(),
|
||||
"For Postgres and MongoDB, the 'connection_string' field must be specified",
|
||||
)
|
||||
}
|
||||
|
@ -5498,7 +5501,7 @@ impl DataStorage {
|
|||
}
|
||||
|
||||
let uri = self.path()?;
|
||||
let warehouse = &self.database;
|
||||
let warehouse = self.database.as_ref();
|
||||
let table_name = self.table_name()?;
|
||||
let namespace = self
|
||||
.namespace
|
||||
|
@ -5511,7 +5514,7 @@ impl DataStorage {
|
|||
|
||||
let db_params = IcebergDBParams::new(
|
||||
uri.to_string(),
|
||||
warehouse.clone(),
|
||||
warehouse.cloned(),
|
||||
namespace,
|
||||
self.iceberg_s3_storage_options(),
|
||||
);
|
||||
|
@ -5681,7 +5684,7 @@ impl DataStorage {
|
|||
self.snapshot_maintenance_on_output,
|
||||
self.table_name()?,
|
||||
&data_format.value_fields_type_map(py),
|
||||
&data_format.key_field_names,
|
||||
data_format.key_field_names.as_ref(),
|
||||
self.sql_writer_init_mode,
|
||||
)
|
||||
.map_err(|e| {
|
||||
|
@ -5790,7 +5793,7 @@ impl DataStorage {
|
|||
}
|
||||
|
||||
let uri = self.path()?;
|
||||
let warehouse = &self.database;
|
||||
let warehouse = self.database.as_ref();
|
||||
let table_name = self.table_name()?;
|
||||
let namespace = self
|
||||
.namespace
|
||||
|
@ -5803,7 +5806,7 @@ impl DataStorage {
|
|||
|
||||
let db_params = IcebergDBParams::new(
|
||||
uri.to_string(),
|
||||
warehouse.clone(),
|
||||
warehouse.cloned(),
|
||||
namespace,
|
||||
self.iceberg_s3_storage_options(),
|
||||
);
|
||||
|
@ -6393,7 +6396,7 @@ impl<'py> WakeupHandler<'py> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'py> Drop for WakeupHandler<'py> {
|
||||
impl Drop for WakeupHandler<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.set_wakeup_fd
|
||||
.call1((&self.old_wakeup_fd,))
|
||||
|
|
|
@ -111,7 +111,7 @@ impl Logger {
|
|||
ack_sender.send(()).unwrap_or(());
|
||||
}
|
||||
Err(channel::RecvError) => break,
|
||||
};
|
||||
}
|
||||
}
|
||||
drop(thread_state);
|
||||
})
|
||||
|
|
|
@ -354,7 +354,7 @@ pub fn assert_error_shown_for_reader_context(
|
|||
error_placement: ErrorPlacement,
|
||||
) {
|
||||
let row_parse_result = parser.parse(context);
|
||||
println!("{:?}", row_parse_result);
|
||||
println!("{row_parse_result:?}");
|
||||
let errors = error_placement.extract_errors(row_parse_result);
|
||||
for e in errors {
|
||||
eprintln!("Error details: {e:?}");
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
// Copyright © 2024 Pathway
|
||||
|
||||
#![allow(clippy::result_large_err)]
|
||||
|
||||
mod helpers;
|
||||
mod operator_test_utils;
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ pub fn run_test<
|
|||
let collection = input.to_collection(scope);
|
||||
let res = op(collection);
|
||||
res.trace
|
||||
.map_batches(|batch| eprintln!("outer debug, res map batch{:?}", batch));
|
||||
.map_batches(|batch| eprintln!("outer debug, res map batch{batch:?}"));
|
||||
|
||||
res.as_collection(|key, val| (key.clone(), val.clone()))
|
||||
.inner
|
||||
|
|
|
@ -203,7 +203,7 @@ fn test_synchronization_several_workers() -> eyre::Result<()> {
|
|||
let group_1_repeated = sync.ensure_synchronization_group(&desc, 0).unwrap();
|
||||
|
||||
// The `source_id` is private, but `Debug` trait can be used to compare them
|
||||
assert_eq!(format!("{:?}", group_1), format!("{:?}", group_1_repeated));
|
||||
assert_eq!(format!("{group_1:?}"), format!("{:?}", group_1_repeated));
|
||||
|
||||
start_two_equal_groups(&mut group_1, &mut group_2)?;
|
||||
|
||||
|
|
|
@ -668,8 +668,8 @@ fn test_random() {
|
|||
}
|
||||
}
|
||||
|
||||
println!("{:?}", inputs);
|
||||
println!("{:?}", expected);
|
||||
println!("{inputs:?}");
|
||||
println!("{expected:?}");
|
||||
run_test(inputs, expected, |coll| {
|
||||
add_prev_next_pointers(coll.arrange_by_self(), &|_a, _b| true)
|
||||
});
|
||||
|
@ -762,8 +762,8 @@ fn test_instances_random() {
|
|||
}
|
||||
}
|
||||
|
||||
println!("{:?}", inputs);
|
||||
println!("{:?}", expected);
|
||||
println!("{inputs:?}");
|
||||
println!("{expected:?}");
|
||||
run_test(inputs, expected, |coll| {
|
||||
add_prev_next_pointers(coll.arrange_by_self(), &|a, b| a.0 == b.0)
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue