Validating Data in a Production Pipeline: The TFX Way

A deep dive into data validation using Tensorflow Data Validation

Imagine this. We have a fully functional machine learning pipeline, and it is flawless. So we decide to push it to the production environment. All is well in prod, and one day a tiny change happens in one of the components that generates input data for our pipeline, and the pipeline breaks. Oops!!!

Photo by Sarah Kilian on Unsplash

Why did this happen??

Because ML models rely heavily on the data being used, remember the age old saying, Garbage In, Garabage Out. Given the right data, the pipeline performs well, any change and the pipeline tends to go awry.

Data passed into pipelines are generated mostly through automated systems, thereby lowering control in the type of data being generated.

So, what do we do?

Data Validation is the answer.

Data Validation is the guardian system that would verify if the data is in appropriate format for the pipeline to consume.

Read this article to understand why validation is crucial in an ML pipeline and the 5 stages of machine learning validations.

TensorFlow Data Validation

TensorFlow Data Validation (TFDV), is a part of the TFX ecosystem, that can be used for validating data in an ML pipeline.

TFDV computes descriptive statistics, schemas and identifies anomalies by comparing the training and serving data. This ensures training and serving data are consistent and does not break or create unintended predictions in the pipeline.

People at Google wanted TFDV to be used right from the earliest stage in an ML process. Hence they ensured TFDV could be used with notebooks. We are going to do the same here.

To begin, we need to install tensorflow-data-validation library using pip. Preferably create a virtual environment and start with your installations.

A note of caution: Prior to installation, ensure version compatibility in TFX libraries

pip install tensorflow-data-validation

The following are the steps we will follow for the data validation process:

  1. Generating Statistics from Training Data
  2. Infering Schema from Training Data
  3. Generating Statistics for Evaluation Data and Comparing it with Training Data
  4. Identifying and Fixing Anomalies
  5. Checking for Drifts and Data Skew
  6. Save the Schema

We will be using 3 types of datasets here; training data, evaluation data and serving data, to mimic real-time usage. The ML model is trained using the training data. Evaluation data aka test data is a part of the data that is designated to test the model as soon as the training phase is completed. Serving data is presented to the model in the production environment for making predictions.

The entire code discussed in this article is available in my GitHub repo. You can download it from here.

Step 0: Preparations

We will be using the spaceship titanic dataset from Kaggle. You can learn more and download the dataset using this link.

Sample view of Spaceship Titanic Dataset

The data is composed of a mixture of numerical and categorical data. It is a classification dataset, and the class label is Transported. It holds the value True or False.

Data Description

The necessary imports are done, and paths for the csv file is defined. The actual dataset contains the training and the test data. I have manually introduced some errors and saved the file as ‘titanic_test_anomalies.csv’ (This file is not available in Kaggle. You can download it from my GitHub repository link).

Here, we will be using ANOMALOUS_DATA as the evaluation data and TEST_DATA as serving data.

import tensorflow_data_validation as tfdv
import tensorflow as tf

TRAIN_DATA = '/data/titanic_train.csv'
TEST_DATA = '/data/titanic_test.csv'
ANOMALOUS_DATA = '/data/titanic_test_anomalies.csv'

Step 1: Generating Statistics from Training Data

First step is to analyze the training data and identify its statistical properties. TFDV has the generate_statistics_from_csv function, which directly reads data from a csv file. TFDV also has a generate_statistics_from_tfrecord function if you have the data as a TFRecord .

The visualize_statistics function presents an 8 point summary, along with helpful charts that can help us understand the underlying statistics of the data. This is called the Facets view. Some critical details that needs our attention are highlighted in red. Loads of other features to analyze the data are available here. Play around and get to know it better.

# Generate statistics for training data
train_stats=tfdv.generate_statistics_from_csv(TRAIN_DATA)
tfdv.visualize_statistics(train_stats)
Statistics generated for the dataset

Here we see missing values in Age and RoomService features that needs to be imputed. We also see that RoomService has 65.52% zeros. It is the way this particular data is distributed, so we do not consider it an anomaly, and we move ahead.

Step 2: Infering Schema from Training Data

Once all the issues have been satisfactorily resolved, we infer the schema using the infer_schema function.

schema=tfdv.infer_schema(statistics=train_stats)
tfdv.display_schema(schema=schema)

Schema is usually presented in two sections. The first section presents details like the data type, presence, valency and its domain. The second section presents values that the domain constitutes.

Section 1: Details about Features
Section 2: Domain Values

This is the initial raw schema, we will be refining this in the later steps.

Step 3: Generating Statistics for Evaluation Data and Comparing it with Training Data

Now we pick up the evaluation data and generate the statistics. We need to understand how anomalies need to be handled, so we are going to use ANOMALOUS_DATA as our evaluation data. We have manually introduced anomalies into this data.

After generating the statistics, we visualize the data. Visualization can be applied for the evaluation data alone (like we did for the training data), however it makes more sense to compare the statistics of evaluation data with the training statistics. This way we can understand how different the evaluation data is from the training data.

# Generate statistics for evaluation data

eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA)

tfdv.visualize_statistics(lhs_statistics = train_stats, rhs_statistics = eval_stats,
lhs_name = "Training Data", rhs_name = "Evaluation Data")

Comparison of Statistics of the Training data and the Evaluation data

Here we can see that RoomService feature is absent in the evaluation data (Big Red Flag). The other features seem fairly ok, as they exhibit distributions similar to the training data.

However, eyeballing is not sufficient in a production environment, so we are going to ask TFDV to actually analyze and report if everything is OK.

Step 4: Identifying and Fixing Anomalies

Our next step is to validate the statistics obtained from the evaluation data. We are going to compare it with the schema that we had generated with the training data. The display_anomalies function will give us a tabulated view of the anomalies TFDV has identified and a description as well.

# Identifying Anomalies
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
Anomaly List provided by TFDV

From the table, we see that our evaluation data is missing 2 columns (Transported and RoomService), Destination feature has an additional value called ‘Anomaly’ in its domain (which was not present in the training data), CryoSleep and VIP features have values ‘TRUE’ and ‘FALSE’ which is not present in the training data, finally, 5 features contain integer values, while the schema expects floating point values.

That’s a handful. So let’s get to work.

There are two ways to fix anomalies; either process the evaluation data (manually) to ensure it fits the schema or modify schema to ensure these anomalies are accepted. Again a domain expert has to decide on which anomalies are acceptable and which mandates data processing.

Let us start with the ‘Destination’ feature. We found a new value ‘Anomaly’, that was missing in the domain list from the training data. Let us add it to the domain and say that it is also an acceptable value for the feature.

# Adding a new value for 'Destination'
destination_domain=tfdv.get_domain(schema, 'Destination')
destination_domain.value.append('Anomaly')

anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

We have removed this anomaly, and the anomaly list does not show it anymore. Let us move to the next one.

Destination Anomaly has been resolved

Looking at the VIP and CryoSleep domains, we see that the training data has lowercase values while the evaluation data has the same values in uppercase. One option is to pre-process the data and ensure that all the data is converted to lower or uppercase. However, we are going to add these values in the domain. Since, VIP and CryoSleep use the same set of values(true and false), we set the domain of CryoSleep to use VIP’s domain.

# Adding data in CAPS to domain for VIP and CryoSleep

vip_domain=tfdv.get_domain(schema, 'VIP')
vip_domain.value.extend(['TRUE','FALSE'])

# Setting domain of one feature to another
tfdv.set_domain(schema, 'CryoSleep', vip_domain)

anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

Resolved anomalies from CryoSleep and VIP

It is fairly safe to convert integer features to float. So, we ask the evaluation data to infer data types from the schema of the training data. This solves the issue related to data types.

# INT can be safely converted to FLOAT. So we can safely ignore it and ask TFDV to use schema

options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA, stats_options=options)

anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

Resolved datatype issue

Finally, we end up with the last set of anomalies; 2 columns that are present in the Training data are missing in the Evaluation data.

‘Transported’ is the class label and it will obviously not be available in the Evalutation data. To solve cases where we know that training and evaluation features might differ from each other, we can create multiple environments. Here we create a Training and a Serving environment. We specify that the ‘Transported’ feature will be available in the Training environment but will not be available in the Serving environment.

# Transported is the class label and will not be available in Evaluation data.
# To indicate that we set two environments; Training and Serving

schema.default_environment.append('Training')
schema.default_environment.append('Serving')

tfdv.get_feature(schema, 'Transported').not_in_environment.append('Serving')

serving_anomalies_with_environment=tfdv.validate_statistics(
statistics=eval_stats, schema=schema, environment='Serving')

tfdv.display_anomalies(serving_anomalies_with_environment)

‘RoomService’ is a required feature that is not available in the Serving environment. Such cases call for manual interventions by domain experts.

Keep resolving issues until you get this output.

All Anomalies Resolved

All the anomalies have been resolved

Step 5: Training-Serving Drift and Skew Detection

The next step is to check for drifts and skews. Skew occurs due to irregularity in the distribution of data. Initially when a model is trained, its predictions are usually perfect. However, as time goes by, the data distribution changes and misclassification errors start to increase, this is called drift. These issues require model retraining.

L-infinity distance is used to measure skew and drift. A threshold value is set based on the L-infinity distance. If the difference between the analyzed features in training and serving environment exceeds the given threshold, the feature is considered to have experienced drift. A similar threshold based approach is followed for skew. For our example, we have set the threshold level to be 0.01 for both drift and skew.

serving_stats = tfdv.generate_statistics_from_csv(TEST_DATA)

# Skew Comparator
spa_analyze=tfdv.get_feature(schema, 'Spa')
spa_analyze.skew_comparator.infinity_norm.threshold=0.01

# Drift Comparator
CryoSleep_analyze=tfdv.get_feature(schema, 'CryoSleep')
CryoSleep_analyze.drift_comparator.infinity_norm.threshold=0.01

skew_anomalies=tfdv.validate_statistics(statistics=train_stats, schema=schema,
previous_statistics=eval_stats,
serving_statistics=serving_stats)
tfdv.display_anomalies(skew_anomalies)

We can see that the skew level exhibited by ‘Spa’ is acceptable (as it is not listed in the anomaly list), however, ‘CryoSleep’ exhibits high drift levels. When creating automated pipelines, these anomalies could be used as triggers for automated model retraining.

High Skew in CryoSleep

Step 6: Save the Schema

After resolving all the anomalies, the schema could be saved as an artifact, or could be saved in the metadata repository and could be used in the ML pipeline.

# Saving the Schema
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format

file_io.recursive_create_dir('schema')
schema_file = os.path.join('schema', 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)

# Loading the Schema
loaded_schema= tfdv.load_schema_text(schema_file)
loaded_schema

You can download the notebook and the data files from my GitHub repository using this link

Other options to look into

You can read the following articles to know what your choices are and how to select the right framework for your ML pipeline project