Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,10 @@ def _apply_diffs(
# Emit OpenLineage events for applied objects
self._emit_openlineage_apply_diffs(registry_diff)

# Register feature views as Unity Catalog feature tables (if using
# the databricks_uc offline store)
self._register_uc_feature_tables_from_diffs(registry_diff)

# Emit MLflow events for applied objects (Phase 7)
self._mlflow_log_apply_diffs(registry_diff)

Expand Down Expand Up @@ -1637,6 +1641,10 @@ def apply(
# Emit OpenLineage events for applied objects
self._emit_openlineage_apply(objects)

# Register feature views as Unity Catalog feature tables (if using
# the databricks_uc offline store)
self._register_uc_feature_tables_legacy(objects)

# Emit MLflow events for applied objects (Phase 7)
self._mlflow_log_apply(objects)

Expand Down Expand Up @@ -1665,6 +1673,68 @@ def _emit_openlineage_apply(self, objects: List[Any]):
except Exception as e:
warnings.warn(f"Failed to emit OpenLineage apply events: {e}")

# ------------------------------------------------------------------ #
# Unity Catalog feature table registration hooks
# ------------------------------------------------------------------ #

def _register_uc_feature_tables_from_diffs(
self, registry_diff: RegistryDiff
) -> None:
"""Register applied feature views as UC feature tables.

Only active when the offline store is ``databricks_uc`` and
``uc_registration.enabled`` is True.
"""
from feast.infra.offline_stores.contrib.spark_offline_store.databricks_uc import (
DatabricksUCOfflineStoreConfig,
)
from feast.infra.offline_stores.contrib.spark_offline_store.uc_registration import (
register_uc_feature_tables,
)

if not isinstance(self.config.offline_store, DatabricksUCOfflineStoreConfig):
return

uc_config = self.config.offline_store.uc_registration
if uc_config is None or not uc_config.enabled:
return

fvs = [
d.new_feast_object
for d in registry_diff.feast_object_diffs
if d.new_feast_object and isinstance(d.new_feast_object, FeatureView)
]
if not fvs:
return

register_uc_feature_tables(self.config.offline_store, fvs, self.project)

def _register_uc_feature_tables_legacy(self, objects: List[Any]) -> None:
"""Register feature views as UC feature tables (legacy apply path).

Only active when the offline store is ``databricks_uc`` and
``uc_registration.enabled`` is True.
"""
from feast.infra.offline_stores.contrib.spark_offline_store.databricks_uc import (
DatabricksUCOfflineStoreConfig,
)
from feast.infra.offline_stores.contrib.spark_offline_store.uc_registration import (
register_uc_feature_tables,
)

if not isinstance(self.config.offline_store, DatabricksUCOfflineStoreConfig):
return

uc_config = self.config.offline_store.uc_registration
if uc_config is None or not uc_config.enabled:
return

fvs = [obj for obj in objects if isinstance(obj, FeatureView)]
if not fvs:
return

register_uc_feature_tables(self.config.offline_store, fvs, self.project)

def teardown(self):
"""Tears down all local and cloud resources for the feature store."""
tables: List[BaseFeatureView] = []
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/infra/compute_engines/local/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,16 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue:
progress=lambda x: None,
)

# UC-backed materialization hook (Phase L3)
from feast.infra.offline_stores.contrib.spark_offline_store.uc_registration import (
write_uc_materialized_data,
)

write_uc_materialized_data(
config=context.repo_config,
fv=self.feature_view,
df=input_table,
project=context.repo_config.project,
)

return output
12 changes: 12 additions & 0 deletions sdk/python/feast/infra/compute_engines/spark/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,18 @@ def execute(self, context: ExecutionContext) -> DAGValue:
)
spark_df.write.format(file_format).mode("append").save(dest_path)

# UC-backed materialization hook (Phase L3)
from feast.infra.offline_stores.contrib.spark_offline_store.uc_registration import (
write_uc_materialized_data,
)

write_uc_materialized_data(
config=context.repo_config,
fv=self.feature_view,
df=spark_df,
project=context.repo_config.project,
)

return DAGValue(
data=spark_df,
format=DAGFormat.SPARK,
Expand Down
Loading
Loading