diff --git a/.env.example b/.env.example index cb991043..d68b73c5 100644 --- a/.env.example +++ b/.env.example @@ -32,6 +32,9 @@ EVALUATE_SCRIPT_PATH = 'evaluate/evaluate_model.py' REGISTER_SCRIPT_PATH = 'register/register_model.py' SOURCES_DIR_TRAIN = 'diabetes_regression' DATASET_NAME = 'diabetes_ds' +DATASET_VERSION = 'latest' +# Optional. Set it if you have configured non default datastore to point to your data +DATASTORE_NAME = '' SCORE_SCRIPT = 'scoring/score.py' # Optional. Used by a training pipeline with R on Databricks diff --git a/.pipelines/diabetes_regression-variables-template.yml b/.pipelines/diabetes_regression-variables-template.yml index a12fe67e..6d4d9797 100644 --- a/.pipelines/diabetes_regression-variables-template.yml +++ b/.pipelines/diabetes_regression-variables-template.yml @@ -23,6 +23,11 @@ variables: value: mlopspython - name: DATASET_NAME value: diabetes_ds + # Uncomment DATASTORE_NAME if you have configured non default datastore to point to your data + # - name: DATASTORE_NAME + # value: datablobstore + - name: DATASET_VERSION + value: latest - name: TRAINING_PIPELINE_NAME value: "diabetes-Training-Pipeline" - name: MODEL_NAME diff --git a/diabetes_regression/register/register_model.py b/diabetes_regression/register/register_model.py index 8c63506c..3376285e 100644 --- a/diabetes_regression/register/register_model.py +++ b/diabetes_regression/register/register_model.py @@ -28,7 +28,7 @@ import argparse import traceback import joblib -from azureml.core import Run, Experiment, Workspace +from azureml.core import Run, Experiment, Workspace, Dataset from azureml.core.model import Model as AMLModel @@ -105,8 +105,15 @@ def main(): print("Tags present: {parent_tags}") if (model is not None): + dataset_id = parent_tags["dataset_id"] if (build_id is None): - register_aml_model(model_file, model_name, model_mse, exp, run_id) + register_aml_model( + model_file, + model_name, + model_mse, + exp, + run_id, + dataset_id) elif (build_uri is None): register_aml_model( model_file, @@ -114,6 +121,7 @@ def main(): model_mse, exp, run_id, + dataset_id, build_id) else: register_aml_model( @@ -122,6 +130,7 @@ def main(): model_mse, exp, run_id, + dataset_id, build_id, build_uri) else: @@ -146,6 +155,7 @@ def register_aml_model( model_mse, exp, run_id, + dataset_id, build_id: str = 'none', build_uri=None ): @@ -164,7 +174,9 @@ def register_aml_model( workspace=exp.workspace, model_name=model_name, model_path=model_path, - tags=tagsValue) + tags=tagsValue, + datasets=[('training data', + Dataset.get_by_id(exp.workspace, dataset_id))]) os.chdir("..") print( "Model registered: {} \nModel Description: {} " diff --git a/diabetes_regression/training/train.py b/diabetes_regression/training/train.py index c3f1203c..66dbc20f 100644 --- a/diabetes_regression/training/train.py +++ b/diabetes_regression/training/train.py @@ -31,6 +31,22 @@ from sklearn.model_selection import train_test_split import joblib import json +from azureml.core import Dataset, Datastore, Workspace + + +def register_dataset( + aml_workspace: Workspace, + dataset_name: str, + datastore_name: str, + file_path: str +) -> Dataset: + datastore = Datastore.get(aml_workspace, datastore_name) + dataset = Dataset.Tabular.from_delimited_files(path=(datastore, file_path)) + dataset = dataset.register(workspace=aml_workspace, + name=dataset_name, + create_new_version=True) + + return dataset def train_model(run, data, alpha): @@ -64,13 +80,47 @@ def main(): help=("output for passing data to next step") ) + parser.add_argument( + "--dataset_version", + type=str, + help=("dataset version") + ) + + parser.add_argument( + "--data_file_path", + type=str, + help=("data file path, if specified,\ + a new version of the dataset will be registered") + ) + + parser.add_argument( + "--caller_run_id", + type=str, + help=("caller run id, for example ADF pipeline run id") + ) + + parser.add_argument( + "--dataset_name", + type=str, + help=("Dataset name. Dataset must be passed by name\ + to always get the desired dataset version\ + rather than the one used while the pipeline creation") + ) + args = parser.parse_args() print("Argument [model_name]: %s" % args.model_name) print("Argument [step_output]: %s" % args.step_output) + print("Argument [dataset_version]: %s" % args.dataset_version) + print("Argument [data_file_path]: %s" % args.data_file_path) + print("Argument [caller_run_id]: %s" % args.caller_run_id) + print("Argument [dataset_name]: %s" % args.dataset_name) model_name = args.model_name step_output_path = args.step_output + dataset_version = args.dataset_version + data_file_path = args.data_file_path + dataset_name = args.dataset_name print("Getting training parameters") @@ -86,16 +136,27 @@ def main(): run = Run.get_context() # Get the dataset - dataset = run.input_datasets['training_data'] - if (dataset): - df = dataset.to_pandas_dataframe() - X = df.drop('Y', axis=1).values - y = df['Y'].values + if (dataset_name): + if (data_file_path == 'none'): + dataset = Dataset.get_by_name(run.experiment.workspace, dataset_name, dataset_version) # NOQA: E402, E501 + else: + dataset = register_dataset(run.experiment.workspace, + dataset_name, + os.environ.get("DATASTORE_NAME"), + data_file_path) else: e = ("No dataset provided") print(e) raise Exception(e) + # Link dataset to the step run so it is trackable in the UI + run.input_datasets['training_data'] = dataset + run.parent.tag("dataset_id", value=dataset.id) + + df = dataset.to_pandas_dataframe() + X = df.drop('Y', axis=1).values + y = df['Y'].values + X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=0) data = {"train": {"X": X_train, "y": y_train}, diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py index 7192d308..f0e7bf3a 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py @@ -1,9 +1,8 @@ from azureml.pipeline.core.graph import PipelineParameter from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline, PipelineData -from azureml.core import Workspace +from azureml.core import Workspace, Dataset, Datastore from azureml.core.runconfig import RunConfiguration -from azureml.core import Dataset from ml_service.util.attach_compute import get_compute from ml_service.util.env_variables import Env from ml_service.util.manage_environment import get_environment @@ -39,8 +38,20 @@ def main(): run_config = RunConfiguration() run_config.environment = environment + if (e.datastore_name): + datastore_name = e.datastore_name + else: + datastore_name = aml_workspace.get_default_datastore().name + run_config.environment.environment_variables["DATASTORE_NAME"] = datastore_name # NOQA: E501 + model_name_param = PipelineParameter( name="model_name", default_value=e.model_name) + dataset_version_param = PipelineParameter( + name="dataset_version", default_value=e.dataset_version) + data_file_path_param = PipelineParameter( + name="data_file_path", default_value="none") + caller_run_id_param = PipelineParameter( + name="caller_run_id", default_value="none") # Get dataset name dataset_name = e.dataset_name @@ -57,9 +68,9 @@ def main(): df.to_csv(file_name, index=False) # Upload file to default datastore in workspace - default_ds = aml_workspace.get_default_datastore() + datatstore = Datastore.get(aml_workspace, datastore_name) target_path = 'training-data/' - default_ds.upload_files( + datatstore.upload_files( files=[file_name], target_path=target_path, overwrite=True, @@ -68,7 +79,7 @@ def main(): # Register dataset path_on_datastore = os.path.join(target_path, file_name) dataset = Dataset.Tabular.from_delimited_files( - path=(default_ds, path_on_datastore)) + path=(datatstore, path_on_datastore)) dataset = dataset.register( workspace=aml_workspace, name=dataset_name, @@ -76,9 +87,6 @@ def main(): tags={'format': 'CSV'}, create_new_version=True) - # Get the dataset - dataset = Dataset.get_by_name(aml_workspace, dataset_name) - # Create a PipelineData to pass data between steps pipeline_data = PipelineData( 'pipeline_data', @@ -89,11 +97,14 @@ def main(): script_name=e.train_script_path, compute_target=aml_compute, source_directory=e.sources_directory_train, - inputs=[dataset.as_named_input('training_data')], outputs=[pipeline_data], arguments=[ "--model_name", model_name_param, - "--step_output", pipeline_data + "--step_output", pipeline_data, + "--dataset_version", dataset_version_param, + "--data_file_path", data_file_path_param, + "--caller_run_id", caller_run_id_param, + "--dataset_name", dataset_name, ], runconfig=run_config, allow_reuse=False, diff --git a/ml_service/util/env_variables.py b/ml_service/util/env_variables.py index 90bc906e..c00ee603 100644 --- a/ml_service/util/env_variables.py +++ b/ml_service/util/env_variables.py @@ -40,6 +40,8 @@ def __init__(self): self._score_script = os.environ.get("SCORE_SCRIPT") self._build_uri = os.environ.get("BUILD_URI") self._dataset_name = os.environ.get("DATASET_NAME") + self._datastore_name = os.environ.get("DATASTORE_NAME") + self._dataset_version = os.environ.get("DATASET_VERSION") self._run_evaluation = os.environ.get("RUN_EVALUATION", "true") self._allow_run_cancel = os.environ.get( "ALLOW_RUN_CANCEL", "true") @@ -145,6 +147,14 @@ def build_uri(self): def dataset_name(self): return self._dataset_name + @property + def datastore_name(self): + return self._datastore_name + + @property + def dataset_version(self): + return self._dataset_version + @property def run_evaluation(self): return self._run_evaluation