Ensuring distributional integrity in splits of small datasets
We all require to sample our population to perform statistical analysis and gain insights. When we do so, the aim is to ensure that our sample’s distribution closely matches that of the population.
For this, we have various methods: simple random sampling (where every member of the population has an equal chance of being selected), stratified sampling (which involves dividing the population into subgroups and sampling from each subgroup), cluster sampling (where the population is divided into clusters and entire clusters are randomly selected), systematic sampling (which involves selecting every nth member of the population), etc etc. Each method has its advantages and is chosen based on the specific needs and characteristics of the study.
In this article, we won’t be focusing on sampling methods themselves per se, but rather on using these concepts to split the dataset used for machine learning approaches into Train-Test-Validation sets. These approaches work for all kinds of Tabular data. We will be working in Python here.
Below are some approaches that you already might know:
1. Simple Train-Test-Val Split
This approach uses random-sampling method.
Example code:
from sklearn.model_selection import train_test_split
# Assuming X is your feature set and y is your target variable
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
2. Stratified Train-Test-Val Split
This approach ensures that the splits maintain the same proportion of classes as the original dataset (with random sampling again of course), which is useful for imbalanced datasets. This approach will work when your target variable is not a continuous variable.
from sklearn.model_selection import train_test_split# Stratified split to maintain class distribution
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, stratify=y, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)
3. K-Fold Cross-Validation
In K-Fold cross-validation, the dataset is split into k
subsets (folds). The model is trained on k-1
folds and tested on the remaining fold. This process is repeated k
times.
from sklearn.model_selection import KFold, train_test_splitkf = KFold(n_splits=5, shuffle=True, random_state=42)
for train_index, test_index in kf.split(X):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# Further split X_train and y_train into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
# Now you have X_train, X_val, X_test, y_train, y_val, y_test for each fold
# You can now train and evaluate your model using these sets
4. Stratified K-Fold Cross-Validation
As the name suggests, this is a combination of Stratified sampling and K-fold cross-validation.
from sklearn.model_selection import StratifiedKFold, train_test_splitskf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
for train_index, test_index in skf.split(X, y):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# Further split X_train and y_train into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train, random_state=42)
# Now you have X_train, X_val, X_test, y_train, y_val, y_test for each fold
# You can now train and evaluate your model using these sets
Full example usage:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score# Initialize lists to store the scores for each fold
accuracy_scores = []
precision_scores = []
recall_scores = []
f1_scores = []
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
for train_index, test_index in skf.split(X, y): #y is a categorical target variable
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# Further split X_train and y_train into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train, random_state=42)
# Train the model
model = LogisticRegression(random_state=42)
model.fit(X_train, y_train)
# Validate the model
y_val_pred = model.predict(X_val)
val_accuracy = accuracy_score(y_val, y_val_pred)
val_precision = precision_score(y_val, y_val_pred, average='weighted')
val_recall = recall_score(y_val, y_val_pred, average='weighted')
val_f1 = f1_score(y_val, y_val_pred, average='weighted')
print(f"Validation Scores - Accuracy: {val_accuracy}, Precision: {val_precision}, Recall: {val_recall}, F1 Score: {val_f1}")
# Test the model
y_test_pred = model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
test_precision = precision_score(y_test, y_test_pred, average='weighted')
test_recall = recall_score(y_test, y_test_pred, average='weighted')
test_f1 = f1_score(y_test, y_test_pred, average='weighted')
# Store the scores
accuracy_scores.append(test_accuracy)
precision_scores.append(test_precision)
recall_scores.append(test_recall)
f1_scores.append(test_f1)
print(f"Test Scores - Accuracy: {test_accuracy}, Precision: {test_precision}, Recall: {test_recall}, F1 Score: {test_f1}")
# Calculate and print the average scores across all folds
print(f"nAverage Test Scores across all folds - Accuracy: {sum(accuracy_scores) / len(accuracy_scores)}, Precision: {sum(precision_scores) / len(precision_scores)}, Recall: {sum(recall_scores) / len(recall_scores)}, F1 Score: {sum(f1_scores) / len(f1_scores)}")
Now, you can use these methods to split your dataset but they have the following limitations:
- Random Train-Test-Val Split: This method can’t guarantee similar distributions among the splits, especially if the dataset is not large enough or if there is an imbalance in the target variable.
- Stratified Split: This method is useful only when you have a non-continuous target variable (y). Although there are workarounds for continuous target variables (such as converting the continuous variable into categorical through some conditions, e.g., if y ≥ quartile1 → 1, else 0), these approaches may still not always ensure similar distributions among the splits.
Now, suppose you have a small total number of observations in your dataset and it’s difficult to ensure similar distributions amongst your splits. In that case, you can combine clustering and random sampling (or stratified sampling).
Below is how I did it for my problem at hand:
5. Clustering-based Train-Test-Validation split
In this method, first, we cluster our dataset and then use sampling methods on each cluster to obtain our data splits.
For example, using HDBSCAN:
import hdbscan
from sklearn.metrics import silhouette_score
from sklearn.model_selection import ParameterGrid
import random
random.seed(48) #for regeneration of same resultsdef get_clusters(df):
to_drop =["cluster_", "ID"]
req_cols = sorted(set(df.columns) - set(to_drop))
X = df[req_cols] #keep only required columns in X
X_std = X.values #no need of scaling the training set for HDBSCAN
# Define parameter grid for HDBSCAN, you can play with this grid accordingly
param_grid = {
'min_cluster_size': list(range(2,20))
#'min_samples': [1, 2, 3]
}
best_score = -1
best_params = None
# Iterate over parameter grid
for params in ParameterGrid(param_grid):
model = hdbscan.HDBSCAN(**params, gen_min_span_tree=True)
cluster_labels = model.fit_predict(X_std)
unique_labels = np.unique(cluster_labels)
if len(unique_labels) > 1: # Check if more than one cluster is formed
silhouette_avg = silhouette_score(X_std, cluster_labels) if len(unique_labels) > 1 else -1
if silhouette_avg > best_score:
best_score = silhouette_avg
best_params = params
if best_params is not None:
print(best_params)
best_model = hdbscan.HDBSCAN(**best_params, gen_min_span_tree=True)
cluster_labels = best_model.fit_predict(X_std) #get cluster labels from best model
df["cluster_"] = [str(i) for i in cluster_labels]
else:
print("HDBSCAN produced only one cluster label. Unable to split the data.")
df["cluster_"] = "0" #when no clusters are found
return df
You can also use other clustering methods according to your problem for eg. K-Means clustering:
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
from yellowbrick.cluster import KElbowVisualizerdef get_clusters(df):
to_drop =["cluster_", "ID"]
req_cols = sorted(set(df.columns) - set(to_drop))
X = df[req_cols].values #keep only required columns in X
scaler = StandardScaler()
X_std = scaler.fit_transform(X) #scaling is needed in case of K-Means
model = KMeans()
visualizer = KElbowVisualizer(model, k=(2, 50)) #you can play with the range accordingly
visualizer.fit(X_std)
#visualizer.show()
optimal_n_clusters = visualizer.elbow_value_ #using elbow method to get optimal no. of clusters
kmeans = KMeans(n_clusters=optimal_n_clusters, random_state=42)
kmeans.fit(X_std)
clust_labels = [str(i) for i in kmeans.labels_]
# Evaluate the clustering using silhouette score
silhouette_avg = silhouette_score(X_std, clust_labels)
df["cluster_"] = clust_labels
return df
Now you can also add levels of granularities (any categorical variable) to your dataset to get more refined clusters as follows:
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
from yellowbrick.cluster import KElbowVisualizerdef get_clusters(df):
# taking animal categorical variable as a level of granularity to split on
grp1 = df.loc[(df['animal']=='cat')]
grp2 = df.loc[(df['animal']=='dog')]
temps = []
for num, temp in enumerate([grp1, grp2]):
to_drop =["cluster_", "ID"]
final_cols = sorted(set(temp.columns) - set(to_drop))
X = temp[final_cols]
X = X.values
scaler = StandardScaler()
X_std = scaler.fit_transform(X) #scaling of variables is needed for K-Means clustering
model = KMeans()
visualizer = KElbowVisualizer(model, k=(2, 50))
visualizer.fit(X_std)
# visualizer.show()
#get optimal no. of clusters, K using elbow method
optimal_n_clusters = visualizer.elbow_value_
kmeans = KMeans(n_clusters=optimal_n_clusters, random_state=42)
kmeans.fit(X_std)
clust_labels = [str(num) + "_" + str(i) for i in kmeans.labels_]
# Evaluate the clustering using silhouette score
silhouette_avg = silhouette_score(X_std, clust_labels)
temp["cluster_"] = clust_labels
temps.append(temp)
df = pd.concat(temps, axis=1)
return df
Once you have obtained cluster labels from any clustering method, you can use random sampling or stratified sampling to select samples from each cluster.
We will select indices randomly and then use these indices to select our train-test-val sets as follows:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split# Assuming df is your DataFrame, "cluster_" is the column with cluster labels,
unique_clusters = df["cluster_"].unique()
train_indices = []
val_indices = []
test_indices = []
for cluster in unique_clusters:
cluster_data = df[df["cluster_"] == cluster]
cluster_indices = cluster_data.index.values
cluster_y = cluster_data['y'].values
if stratify_ == True: #if you have categorical target variable
train_idx, temp_idx, _, temp_y = train_test_split(cluster_indices, cluster_y, test_size=0.4, stratify=cluster_y, random_state=42)
val_idx, test_idx = train_test_split(temp_idx, test_size=0.5, stratify=temp_y, random_state=42)
else:
# Split indices of the current cluster into train and temp (which will be further split into val and test)
train_idx, temp_idx = train_test_split(cluster_indices, test_size=0.4, random_state=42)
val_idx, test_idx = train_test_split(temp_idx, test_size=0.5, random_state=42)
train_indices.extend(train_idx)
val_indices.extend(val_idx)
test_indices.extend(test_idx)
# Convert the indices lists to numpy arrays
train_indices = np.array(train_indices)
val_indices = np.array(val_indices)
test_indices = np.array(test_indices)
# Assuming 'X' are the features and 'y' is the target column
X = df.drop(columns=['y', 'cluster_']).values
y = df['y'].values
# Select the corresponding data for train, validation, and test sets
X_train, y_train = X[train_indices], y[train_indices]
X_val, y_val = X[val_indices], y[val_indices]
X_test, y_test = X[test_indices], y[test_indices]
As per my use-case, it was useful to sort my target variable y and then select every 1st, 2nd, and 3rd indices for train, test, and validation set respectively (all mutually exclusive), a.k.a systematic random sampling as below:
def get_indices(df):
np.random.seed(seed=48)total_length = len(df)
sample1_length = int(0.60 * total_length) #you can choose proportion accordingly
remaining_length = total_length - sample1_length
sample2_length = int(remaining_length / 2)
sample3_length = total_length - (sample1_length + sample2_length)
#create an array with range 0 - length of the df
all_indxs = np.array(range(total_length))
# Create arrays of indices divisible by 2 and 3 exclusively
indices_divisible_by_2 = np.array(list(set(np.where(all_indxs % 2 == 0)[0]) - set(np.where(all_indxs % 6 == 0)[0])))
indices_divisible_by_3 = np.array(list(set(np.where(all_indxs % 3 == 0)[0]) - set([0])))
#randomly choose indices divisibly by 2 with sample2_length
sample2_indices = sorted(indices_divisible_by_2[np.random.choice(len(indices_divisible_by_2), size=sample2_length, replace=False)])
try:
sample3_indices = sorted(indices_divisible_by_3[np.random.choice(len(indices_divisible_by_3), size=sample3_length, replace=False)])
except:
sample3_indices = []
sample1_indices = sorted(set(all_indxs) - set(sample2_indices) - set(sample3_indices))
return sample1_indices, sample2_indices, sample3_indices
indices_train = []
indices_test = []
indices_val = []for num, cluster in enumerate(df['cluster_'].unique()):
temp_df = df[df['cluster_'] == cluster]
sample1_indices, sample2_indices, sample3_indices = get_indices(temp_df)
indices_train.append(list(temp_df.iloc[sample1_indices].index))
indices_test.append(list(temp_df.iloc[sample2_indices].index))
indices_val.append(list(temp_df.iloc[sample3_indices].index))
# to flatten the list of lists containing indices for train,test,val set
indices_train = [x for xs in indices_train for x in xs]
indices_test = [x for xs in indices_test for x in xs]
indices_val = [x for xs in indices_val for x in xs]
def traintestvalsplit(df, id_col, cols_to_drop, cont_var, train_indices, test_indices, val_indices):train, test, val = df.loc[train_indices], df.loc[test_indices], df.loc[val_indices]
# Split the data into train, validation, and test sets based on indices
X_train = train.drop(cols_to_drop + [cont_var] ,axis=1) #add which columns to drop
X_test = test.drop(cols_to_drop + [cont_var] ,axis=1)
X_val = val.drop(cols_to_drop + [cont_var] ,axis=1)
y_train = train[[cont_var]] #target variable
y_test = test[[cont_var]]
y_val = val[[cont_var]]
train_ids = train[[id_col]] #to preserve the IDs
test_ids = test[[id_col]]
val_ids = val[[id_col]]
print("Train set size:", X_train.shape, len(train_ids))
print("Test set size:", X_test.shape, len(test_ids))
print("Validation set size:", X_val.shape, len(val_ids))
return X_train, X_val, X_test, y_train, y_val, y_test, train_ids, val_ids, test_ids
X_train, X_val, X_test, y_train, y_val, y_test, train_ids, val_ids, test_ids = traintestvalsplit(df, id_col, cols_to_drop, cont_var, train_indices, test_indices, val_indices)
The above-discussed approaches of combining clustering with different sampling methods are very useful when you have a small number of observations in your dataset as they ensure to maintain similar distributions amongst the Train, Test and Validation sets.
Thanks for reading, and I hope you find this article helpful!