-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1049971: What's the correct usage of ext_modules
in snowflake.ml.registry.Registry.log_model
?
#87
Comments
ext_modules
in snowflake.ml.registry.Registry.log_model
?ext_modules
in snowflake.ml.registry.Registry.log_model
?
Hi ftrifoglio, Thank you for your feedback. I think it might because that |
Thank you @sfc-gh-wzhao!! Makes sense. I've tried using So I've done another test. I had a feeling that the fact that the reference to the function in I got rid of that and added the import within the CustomModel subclass. That works. X, y = make_classification()
X = pd.DataFrame(X, columns=["X" + str(i) for i in range(20)])
# log_trans = Pipeline(
# [
# ("impute", SimpleImputer()),
# ("scaler", MinMaxScaler()),
# (
# "logger",
# FunctionTransformer(
# np.log1p,
# feature_names_out=partial(column_labeller, "LOG"),
# ),
# ),
# ]
# )
# preproc_pipe = ColumnTransformer(
# [("log", log_trans, ["X0", "X1"])],
# remainder="passthrough",
# verbose_feature_names_out=False,
# )
# preproc_pipe.set_output(transform="pandas")
# preproc_pipe.fit(X, y)
# joblib.dump(preproc_pipe, "model/preproc_pipe.joblib.gz")
# # ['model/preproc_pipe.joblib.gz']
# xgb_data = xgb.DMatrix(preproc_pipe.transform(X), y)
xgb_data = xgb.DMatrix(X, y)
booster = xgb.train(dict(max_depth=5), xgb_data, num_boost_round=10)
joblib.dump(booster, "model/booster.joblib.gz")
# ['model/booster.joblib.gz']
class MyModel(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = joblib.load(self.context.path("model"))
# self.pipeline = joblib.load(self.context.path("pipeline"))
@custom_model.inference_api
def predict(self, X: pd.DataFrame) -> pd.DataFrame:
from my_module.utils import column_labeller
X = X.copy()
# xgb_data = xgb.DMatrix(self.pipeline.transform(X))
xgb_data = xgb.DMatrix(X)
preds = self.model.predict(xgb_data)
res_df = pd.DataFrame({"output": preds})
return res_df
model_signature = ModelSignature(
inputs=[FeatureSpec(dtype=DataType.FLOAT, name=f"X{i}") for i in range(20)],
outputs=[FeatureSpec(dtype=DataType.FLOAT, name="output")],
)
my_model = MyModel(
custom_model.ModelContext(
models={},
artifacts={
"model": "model/booster.joblib.gz",
# "pipeline": "model/preproc_pipe.joblib.gz",
},
)
)
print(my_model.predict(X))
# output
# 0 0.968972
# 1 0.016913
# 2 0.956805
# 3 0.016913
# 4 0.016913
# .. ...
# 95 0.984613
# 96 0.986547
# 97 0.102893
# 98 0.009444
# 99 0.016913
# [100 rows x 1 columns]
registry = Registry(session=session)
registry.log_model(
my_model,
model_name="MyModel",
version_name="v1",
python_version="3.11",
conda_dependencies=["scikit-learn", "pandas", "xgboost"],
signatures={"predict": model_signature},
code_paths=["my_module"]
)
# <snowflake.ml.model._client.model.model_version_impl.ModelVersion at 0x2c0579d50> This works, but my actual Is it possible that the serialized pipeline object is evaluated before or in a different environment where |
Hi ftrifoglio, If your actual use-case is similar to what you showed here which is a combination of scikit-learn transformer and an xgboost booster, you could use the
|
Thanks @sfc-gh-wzhao! so helpful. but it turns out you also need I suppose that's not the intended workflow, else you would have pointed that out. Let me know if there are other test you'd like me to run. Happy to help. from functools import partial
from importlib import import_module
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.compose import ColumnTransformer
from sklearn.datasets import make_classification
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, MinMaxScaler
from snowflake import snowpark
from snowflake.ml.model import custom_model
from snowflake.ml.model.model_signature import DataType, FeatureSpec, ModelSignature
from snowflake.ml.registry import Registry
from snowflake.ml.version import VERSION
print(VERSION)
# 1.2.1
from my_module.utils import column_labeller
# # my_module/__init__.py
# from my_module import utils
#
# # my_module/utils.py
# def column_labeller(suffix, self, columns):
# return [suffix + "_" + c for c in columns]
#
connection_parameters = {
"account": ***************,
"user": ***************,,
"password": ***************,,
"role": ***************,
"warehouse": ***************,,
"database": ***************,,
"schema": ***************,,
}
session = snowpark.Session.builder.configs(connection_parameters).create()
X, y = make_classification()
X = pd.DataFrame(X, columns=["X" + str(i) for i in range(20)])
log_trans = Pipeline(
[
("impute", SimpleImputer()),
("scaler", MinMaxScaler()),
(
"logger",
FunctionTransformer(
np.log1p,
feature_names_out=partial(column_labeller, "LOG"),
),
),
]
)
preproc_pipe = ColumnTransformer(
[("log", log_trans, ["X0", "X1"])],
remainder="passthrough",
verbose_feature_names_out=False,
)
preproc_pipe.set_output(transform="pandas")
preproc_pipe.fit(X, y)
joblib.dump(preproc_pipe, "model/preproc_pipe.joblib.gz")
# ['model/preproc_pipe.joblib.gz']
xgb_data = xgb.DMatrix(preproc_pipe.transform(X), y)
booster = xgb.train(dict(max_depth=5), xgb_data, num_boost_round=10)
joblib.dump(booster, "model/booster.joblib.gz")
# ['model/booster.joblib.gz']
class MyModel(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
@custom_model.inference_api
def predict(self, X: pd.DataFrame) -> pd.DataFrame:
xgb_data = xgb.DMatrix(self.context.model_ref("pipeline").transform(X))
preds = self.context.model_ref("model").predict(xgb_data)
res_df = pd.DataFrame({"output": preds})
return res_df
model = joblib.load("model/booster.joblib.gz")
pipeline = joblib.load("model/preproc_pipe.joblib.gz")
my_model = MyModel(
custom_model.ModelContext(
models={
"pipeline": preproc_pipe,
"model": booster,
},
artifacts={},
)
)
model_signature = ModelSignature(
inputs=[FeatureSpec(dtype=DataType.FLOAT, name=f"X{i}") for i in range(20)],
outputs=[FeatureSpec(dtype=DataType.FLOAT, name="output")],
)
my_module = import_module("my_module")
registry = Registry(session=session)
registry.log_model(
my_model,
model_name="MyModel",
version_name="v1",
python_version="3.11",
signatures={"predict": model_signature},
conda_dependencies=["scikit-learn==1.3.0", "pandas", "xgboost"],
ext_modules=[my_module],
code_paths=["my_module"],
)
mv = registry.get_model("MYMODEL").version("V1")
print(mv.run(X, function_name="predict"))
# output
# 0 0.968972
# 1 0.016913
# 2 0.956805
# 3 0.016913
# 4 0.016913
# .. ...
# 95 0.984613
# 96 0.986547
# 97 0.102893
# 98 0.009444
# 99 0.016913
# [100 rows x 1 columns] |
Hi ftrifoglio, Thank you for your patience, and sorry that I made a mistake in the previous example. I think if you use |
Hi ftrifoglio, We have implemented the fix and it is included in the just released version 1.3.0. Please take a try and see if that fixes your issue. I am closing this issue and if you believe that the issue still exists, please re-open it. Thank you! |
I want to log a
CustomModel
that requires a custom module.Here's a reproducible example of what I'm doing but it seems the module cannot be found.
The text was updated successfully, but these errors were encountered: