Skip to content

Commit ebcc9cf

Browse files
committed
FS creation,ingestion using mljob
1 parent 7ce0f66 commit ebcc9cf

File tree

5 files changed

+956
-0
lines changed

5 files changed

+956
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
Feature Store Creation and Ingestion using ML Job
2+
=====================
3+
4+
In this Example, you use the Oracle Cloud Infrastructure (OCI) Data Science service MLJob component to create OCI Feature store design time constructs and then ingest feature values into the offline feature store.
5+
6+
Tutorial picks use case of Electronic Heath Data consisting of Patient Test Results. The example demonstrates creation of feature store, entity , transformation and feature group design time constructs using a python script which is provided as job artifact. Another job artifact demonstrates ingestion of feature values into pre-created feature group.
7+
8+
# Prerequisites
9+
10+
The notebook makes connections to other OCI resources. This is done using [resource principals](https://docs.oracle.com/en-us/iaas/Content/Functions/Tasks/functionsaccessingociresources.htm). If you have not configured your tenancy to use resource principals then you can do so using the instructions that are [here](https://docs.oracle.com/en-us/iaas/data-science/using/create-dynamic-groups.htm). Alternatively, you can use API keys. The preferred method for authentication is resource principals.
11+
12+
13+
# Instructions
14+
15+
1. Open a Data Science Notebook session (i.e. JupyterLab).
16+
1. Open a file terminal by clicking on File -> New -> Terminal.
17+
1. In the terminal run the following commands:
18+
1. `odsc conda install -s fspyspark32_p38_cpu_v1` to install the feature store conda.
19+
1. `conda activate /home/datascience/conda/fspyspark32_p38_cpu_v1` to activate the conda.
20+
1. Copy the `notebooks` folder into the notebook session.
21+
1. Open the notebook `notebook/feature_store_using_mljob.ipynb`.
22+
1. Change the notebook kernel to `Python [conda env:fspyspark32_p38_cpu_v1]`.
23+
1. Read the notebook and execute each cell.
24+
1. Once the ml job run is completed successfully, user can validate creation of feature store construct using the feature store notebook ui extension.
25+
1. Now open the notebook `notebook/feature_store_ingestion_via_mljob.ipynb`.
26+
1. Change the notebook kernel to `Python [conda env:fspyspark32_p38_cpu_v1]`.
27+
1. Read the notebook and execute each cell.
28+
1. validate the ingestion ml job is executed successfully.
29+
1. User can validate the ingested data and other metadata using the feature store notebook ui extension.
30+
31+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from ads.feature_store.feature_group import FeatureGroup
2+
import pandas as pd
3+
import ads
4+
ads.set_auth(auth="resource_principal", client_kwargs={"service_endpoint": "https://fnk6p6iswuttzxwffxq6uwpj2u.apigateway.us-ashburn-1.oci.customer-oci.com/20230101"})
5+
ehr_feature_group = FeatureGroup.from_id("FED8117CDF1EE54F5A742EFFA2A88433")
6+
patient_result_df = pd.read_csv("https://objectstorage.us-ashburn-1.oraclecloud.com/p/hh2NOgFJbVSg4amcLM3G3hkTuHyBD-8aE_iCsuZKEvIav1Wlld-3zfCawG4ycQGN/n/ociodscdev/b/oci-feature-store/o/beta/data/EHR/data-ori.csv")
7+
if ehr_feature_group:
8+
ehr_feature_group.materialise(patient_result_df)
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import os
2+
import argparse
3+
4+
print("Initiating feature store lazy entities creation")
5+
6+
# Set NAME as environment variable on the job
7+
NAME = os.environ.get("NAME", "Job")
8+
9+
# set -g as command line argument on the job
10+
# parser = argparse.ArgumentParser(allow_abbrev=False)
11+
# parser.add_argument("-g", "--greeting", required=False, default="Hello")
12+
# args, unknown = parser.parse_known_args()
13+
14+
# to debug
15+
# print(f'args: {args}')
16+
# print(f'unknown: {unknown}')
17+
18+
import ads
19+
ads.set_auth(auth="resource_principal", client_kwargs={"service_endpoint": "https://fnk6p6iswuttzxwffxq6uwpj2u.apigateway.us-ashburn-1.oci.customer-oci.com/20230101"})
20+
import os
21+
22+
compartment_id = "ocid1.tenancy.oc1..aaaaaaaa462hfhplpx652b32ix62xrdijppq2c7okwcqjlgrbknhgtj2kofa"
23+
metastore_id = "ocid1.datacatalogmetastore.oc1.iad.amaaaaaabiudgxyap7tizm4gscwz7amu7dixz7ml3mtesqzzwwg3urvvdgua"
24+
import pandas as pd
25+
from ads.feature_store.feature_store import FeatureStore
26+
from ads.feature_store.feature_group import FeatureGroup
27+
from ads.feature_store.model_details import ModelDetails
28+
from ads.feature_store.dataset import Dataset
29+
from ads.feature_store.common.enums import DatasetIngestionMode
30+
31+
from ads.feature_store.feature_group_expectation import ExpectationType
32+
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
33+
from ads.feature_store.feature_store_registrar import FeatureStoreRegistrar
34+
35+
patient_result_df = pd.read_csv("https://objectstorage.us-ashburn-1.oraclecloud.com/p/hh2NOgFJbVSg4amcLM3G3hkTuHyBD-8aE_iCsuZKEvIav1Wlld-3zfCawG4ycQGN/n/ociodscdev/b/oci-feature-store/o/beta/data/EHR/data-ori.csv")
36+
37+
print(f"The dataset contains {patient_result_df.shape[0]} rows and {patient_result_df.shape[1]} columns")
38+
39+
# get all the features
40+
features = [feat for feat in patient_result_df.columns if feat !='SOURCE']
41+
num_features = [feat for feat in features if patient_result_df[feat].dtype != object]
42+
cat_features = [feat for feat in features if patient_result_df[feat].dtype == object]
43+
44+
print(f"Total number of features : {len(features)}")
45+
print(f"Number of numerical features : {len(num_features)}")
46+
print(f"Number of categorical features : {len(cat_features)}\n")
47+
print(patient_result_df.isna().mean().to_frame(name='Missing %'))
48+
print(patient_result_df.nunique().to_frame(name='# of unique values'))
49+
feature_store_resource = (
50+
FeatureStore().
51+
with_description("Electronic Heath Data consisting of Patient Test Results").
52+
with_compartment_id(compartment_id).
53+
with_display_name("EHR details").
54+
with_offline_config(metastore_id=metastore_id)
55+
)
56+
feature_store = feature_store_resource.create()
57+
print(feature_store)
58+
entity = feature_store.create_entity(
59+
display_name="EHR",
60+
description="Electronic Health Record predictions"
61+
)
62+
print(entity)
63+
64+
def chained_transformation(patient_result_df, **transformation_args):
65+
def label_encoder_transformation(patient_result_df, **transformation_args):
66+
from sklearn.preprocessing import LabelEncoder
67+
# creating instance of labelencoder
68+
labelencoder = LabelEncoder()
69+
result_df = patient_result_df.copy()
70+
column_labels= transformation_args.get("label_encode_column")
71+
if isinstance(column_labels,list):
72+
for col in column_labels:
73+
result_df[col] = labelencoder.fit_transform(result_df[col])
74+
elif isinstance(column_labels, str):
75+
result_df[column_labels] = labelencoder.fit_transform(result_df[column_labels])
76+
else:
77+
return None
78+
return result_df
79+
80+
def min_max_scaler(patient_result_df, **transformation_args):
81+
from sklearn.preprocessing import MinMaxScaler
82+
final_result_df = patient_result_df.copy()
83+
scaler = MinMaxScaler(feature_range=(0, 1))
84+
column_labels= transformation_args.get("scaling_column_labels")
85+
final_result_df[column_labels] = scaler.fit_transform(final_result_df[column_labels])
86+
return patient_result_df
87+
88+
def feature_removal(input_df, **transformation_args):
89+
output_df = input_df.copy()
90+
output_df.drop(transformation_args.get("redundant_feature_label"), axis=1, inplace=True)
91+
return output_df
92+
93+
out1 = label_encoder_transformation(patient_result_df, **transformation_args)
94+
out2 = min_max_scaler(out1, **transformation_args)
95+
return feature_removal(out2, **transformation_args)
96+
97+
transformation_args = {
98+
"label_encode_column": ["SEX","SOURCE"],
99+
"scaling_column_labels": num_features,
100+
"redundant_feature_label": ["MCH", "MCHC", "MCV"]
101+
}
102+
103+
from ads.feature_store.transformation import Transformation,TransformationMode
104+
105+
transformation = (
106+
Transformation()
107+
.with_display_name("chained_transformation")
108+
.with_feature_store_id(feature_store.id)
109+
.with_source_code_function(chained_transformation)
110+
.with_transformation_mode(TransformationMode.PANDAS)
111+
.with_description("transformation to perform feature engineering")
112+
.with_compartment_id(compartment_id)
113+
)
114+
115+
transformation.create()
116+
117+
feature_group_ehr = (
118+
FeatureGroup()
119+
.with_feature_store_id(feature_store.id)
120+
.with_primary_keys([])
121+
.with_name("ehr_feature_group")
122+
.with_entity_id(entity.id)
123+
.with_compartment_id(compartment_id)
124+
.with_schema_details_from_dataframe(patient_result_df)
125+
.with_transformation_id(transformation.id)
126+
.with_transformation_kwargs(transformation_args)
127+
)
128+
feature_group_ehr.create()
129+
expectation_suite_ehr = ExpectationSuite(
130+
expectation_suite_name="test_hcm_df"
131+
)
132+
expectation_suite_ehr.add_expectation(
133+
ExpectationConfiguration(
134+
expectation_type="expect_column_values_to_not_be_null",
135+
kwargs={"column": "AGE"},
136+
)
137+
)
138+
expectation_suite_ehr.add_expectation(
139+
ExpectationConfiguration(
140+
expectation_type="expect_column_values_to_be_between",
141+
kwargs={"column": "HAEMOGLOBINS", "min_value": 0, "max_value": 30},
142+
)
143+
)
144+
from ads.feature_store.common.enums import ExpectationType
145+
146+
feature_group_ehr.with_expectation_suite(expectation_suite_ehr, expectation_type = ExpectationType.STRICT)
147+
feature_group_ehr.update()

0 commit comments

Comments
 (0)