Skip to content

Commit 4bb25fd

Browse files
committed
[HSTACK] - move codec() to physical_plan, now callable from python
1 parent a5960f2 commit 4bb25fd

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

src/dataframe.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use crate::catalog::PyTable;
5656
use crate::common::df_schema::PyDFSchema;
5757
use crate::errors::{py_datafusion_err, PyDataFusionError};
5858
use crate::expr::sort_expr::to_sort_expressions;
59-
use crate::physical_plan::PyExecutionPlan;
59+
use crate::physical_plan::{ codec, PyExecutionPlan } ;
6060
use crate::record_batch::PyRecordBatchStream;
6161
use crate::sql::logical::PyLogicalPlan;
6262
use crate::utils::{get_tokio_runtime, validate_pycapsule, wait_for_future};
@@ -865,11 +865,6 @@ pub fn partition_stream(serialized_plan: &[u8], partition: usize, py: Python) ->
865865
.map_err(py_datafusion_err)
866866
}
867867

868-
fn codec() -> &'static dyn PhysicalExtensionCodec {
869-
static CODEC: DeltaPhysicalCodec = DeltaPhysicalCodec {};
870-
&CODEC
871-
}
872-
873868
/// Print DataFrame
874869
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
875870
// Get string representation of record batches

src/physical_plan.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
// under the License.
1717

1818
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
19-
use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
19+
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
20+
use deltalake::delta_datafusion::DeltaPhysicalCodec;
2021
use prost::Message;
21-
use std::sync::Arc;
22-
2322
use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes};
23+
use std::sync::Arc;
2424

2525
use crate::{context::PySessionContext, errors::PyDataFusionResult};
2626

@@ -59,10 +59,9 @@ impl PyExecutionPlan {
5959
}
6060

6161
pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
62-
let codec = DefaultPhysicalExtensionCodec {};
6362
let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
6463
self.plan.clone(),
65-
&codec,
64+
codec(),
6665
)?;
6766

6867
let bytes = proto.encode_to_vec();
@@ -83,8 +82,7 @@ impl PyExecutionPlan {
8382
))
8483
})?;
8584

86-
let codec = DefaultPhysicalExtensionCodec {};
87-
let plan = proto_plan.try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), &codec)?;
85+
let plan = proto_plan.try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), codec())?;
8886
Ok(Self::new(plan))
8987
}
9088

@@ -109,3 +107,8 @@ impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
109107
PyExecutionPlan { plan: plan.clone() }
110108
}
111109
}
110+
111+
pub(crate) fn codec() -> &'static dyn PhysicalExtensionCodec {
112+
static CODEC: DeltaPhysicalCodec = DeltaPhysicalCodec {};
113+
&CODEC
114+
}

0 commit comments

Comments
 (0)