Sensor Framework Documentation

This document provides a detailed overview of all the sensors available in the data quality monitoring framework. Each sensor is detailed below, including its main logic and a breakdown of every feature it generates.

Basic Sensors

BasicSensorFeatureExtractor

A comprehensive sensor that operates on defined cohorts within a DataFrame. It calculates features at multiple levels: global schema properties (like column name conventions and type diversity), file metadata (like row counts and size), cohort-level date characteristics (like day of the week), and a granular, per-column analysis of structural and numeric properties within each cohort.

Main Function Logic

CLASS ComprehensiveFeatureExtractor:
  INITIALIZE with cohort_columns, numeric_columns, date_column

  FUNCTION fit_transform(data, last_file_date=None):
    // --- Phase 1: Global Schema & Metadata Features (calculated once) ---
    all_features = {}
    all_column_names = GET_COLUMN_NAMES(data)
    all_features['row_count'] = data.row_count
    all_features['col_count'] = data.column_count
    all_features['filesize_est'] = data.memory_usage_in_bytes
    all_features['schema_numeric_suffix_cols'] = COUNT(name FOR name in all_column_names IF name ends with '_<number>')
    all_features['schema_avg_underscores_per_col'] = MEAN(name.count('_') FOR name in all_column_names)
    all_features['schema_uppercase_cols'] = COUNT(name FOR name in all_column_names IF name is all uppercase)
    all_features['schema_lowercase_cols'] = COUNT(name FOR name in all_column_names IF name is all lowercase)
    all_features['schema_avg_col_name_length'] = MEAN(LENGTH(name) FOR name in all_column_names)
    all_features['schema_col_name_length_std'] = STD_DEV(LENGTH(name) FOR name in all_column_names)
    all_features['schema_dtype_diversity'] = COUNT_UNIQUE(data.get_dtypes())
    dominant_dtype_freq = MOST_FREQUENT(data.get_dtypes()).count
    all_features['schema_dominant_dtype_pct'] = dominant_dtype_freq / data.column_count

    // --- Phase 2: Create Cohorts ---
    CREATE cohort_id by concatenating values from cohort_columns
    cohort_groups = GROUP data by cohort_id

    // --- Phase 3: Per-Cohort Analysis ---
    results = []
    FOR each cohort_group in cohort_groups:
      cohort_features = all_features.copy()
      // A) Cohort-level Date Features (requires a date_column)
      IF date_column is not None:
        primary_date = cohort_group[date_column].mode() // Use the most frequent date
        cohort_features['day_of_week'] = primary_date.day_of_week
        cohort_features['month'] = primary_date.month
        cohort_features['is_weekend'] = 1 IF primary_date.day_of_week IN (Saturday, Sunday) ELSE 0
        cohort_features['is_month_end'] = 1 IF primary_date is the last day of its month ELSE 0
        cohort_features['is_quarter_end'] = 1 IF primary_date is the last day of a quarter ELSE 0
        IF last_file_date is not None:
          cohort_features['days_since_last_file'] = (primary_date - last_file_date).in_days

      // B) Per-Column, Per-Cohort Numeric Analysis
      FOR each col in numeric_columns:
        column_data = cohort_group[col]
        total_rows = LENGTH(column_data)
        total_nulls = column_data.count_nulls()
        cohort_features['{col}_basic_rows'] = total_rows
        cohort_features['{col}_basic_total_nulls'] = total_nulls
        cohort_features['{col}_basic_null_percentage'] = total_nulls / total_rows IF total_rows > 0 ELSE 0
        cohort_features['{col}_basic_avg_column_means'] = column_data.mean()
        cohort_features['{col}_basic_avg_column_stds'] = column_data.std_dev()
        cohort_features['{col}_basic_max_of_maxes'] = column_data.max()
        cohort_features['{col}_basic_total_zeros'] = column_data.count_where(value == 0)
        // NOTE: Some features are placeholders for schema consistency
        cohort_features['{col}_basic_std_column_means'] = 0.0
        cohort_features['{col}_basic_categorical_columns'] = 0
      ENDFOR
      results.APPEND(cohort_features)
    ENDFOR

  RETURN DataFrame(results)

Generated Features

Feature Name Description Data Type
row_count Total number of rows in the entire input file or DataFrame. Integer
FUNCTION GET_TOTAL_ROW_COUNT(data):
  RETURN data.shape[0]
ENDFUNCTION
col_count Total number of columns in the entire input file or DataFrame. Integer
FUNCTION GET_TOTAL_COLUMN_COUNT(data):
  RETURN data.shape[1]
ENDFUNCTION
filesize_est An estimate of the DataFrame's size in memory, in bytes. Integer
FUNCTION ESTIMATE_FILESIZE(data):
  RETURN data.memory_usage().sum()
ENDFUNCTION
schema_numeric_suffix_cols The count of columns whose names end with a numeric suffix, such as '_1' or '_v2'. Integer
FUNCTION COUNT_NUMERIC_SUFFIX_COLS(column_names):
  count = 0
  FOR name in column_names:
    IF name matches regex '.*_\d+$':
      count += 1
  RETURN count
ENDFUNCTION
schema_avg_underscores_per_col The average number of underscores per column name across the entire schema. Float
FUNCTION AVG_UNDERSCORES(column_names):
  IF column_names is empty: RETURN 0.0
  total_underscores = SUM(name.count('_') FOR name in column_names)
  RETURN total_underscores / LENGTH(column_names)
ENDFUNCTION
schema_uppercase_cols The total count of column names that consist entirely of uppercase letters and underscores. Integer
FUNCTION COUNT_UPPERCASE_COLS(column_names):
  RETURN COUNT(name FOR name in column_names IF name.is_upper())
ENDFUNCTION
schema_lowercase_cols The total count of column names that consist entirely of lowercase letters and underscores. Integer
FUNCTION COUNT_LOWERCASE_COLS(column_names):
  RETURN COUNT(name FOR name in column_names IF name.is_lower())
ENDFUNCTION
schema_dtype_diversity The number of unique data types (e.g., int64, float64, object) present in the DataFrame. Integer
FUNCTION COUNT_UNIQUE_DTYPES(data):
  all_dtypes = data.get_column_dtypes()
  RETURN LENGTH(UNIQUE(all_dtypes))
ENDFUNCTION
schema_dominant_dtype_pct The percentage of columns that have the most frequent (dominant) data type. Float
FUNCTION CALCULATE_DOMINANT_DTYPE_PCT(data):
  all_dtypes = data.get_column_dtypes()
  IF data.column_count == 0: RETURN 0.0
  dominant_dtype_freq = COUNT_MOST_FREQUENT(all_dtypes)
  RETURN dominant_dtype_freq / data.column_count
ENDFUNCTION
schema_avg_col_name_length The average character length of column names. Float
FUNCTION AVG_COL_NAME_LENGTH(column_names):
  IF column_names is empty: RETURN 0.0
  total_length = SUM(LENGTH(name) FOR name in column_names)
  RETURN total_length / LENGTH(column_names)
ENDFUNCTION
schema_col_name_length_std The standard deviation of column name character lengths. Float
FUNCTION STD_COL_NAME_LENGTH(column_names):
  lengths = [LENGTH(name) FOR name in column_names]
  RETURN CALCULATE_STD_DEV(lengths)
ENDFUNCTION
day_of_week The day of the week for the cohort's primary date (e.g., 0 for Monday, 6 for Sunday). Integer
FUNCTION GET_DAY_OF_WEEK(date_value):
  // Assumes a standard library where Monday=0 and Sunday=6
  RETURN date_value.weekday()
ENDFUNCTION
month The month of the year for the cohort's primary date (1-12). Integer
FUNCTION GET_MONTH(date_value):
  RETURN date_value.month
ENDFUNCTION
is_weekend A binary flag (1 or 0) indicating if the cohort's primary date falls on a weekend. Integer
FUNCTION IS_WEEKEND(date_value):
  RETURN 1 IF date_value.weekday() >= 5 ELSE 0 // Saturday=5, Sunday=6
ENDFUNCTION
is_month_end A binary flag (1 or 0) indicating if the cohort's primary date is the last day of its month. Integer
FUNCTION IS_MONTH_END(date_value):
  RETURN 1 IF date_value.is_month_end() ELSE 0
ENDFUNCTION
is_quarter_end A binary flag (1 or 0) indicating if the cohort's primary date is the last day of a quarter. Integer
FUNCTION IS_QUARTER_END(date_value):
  RETURN 1 IF date_value.is_quarter_end() ELSE 0
ENDFUNCTION
days_since_last_file The number of days between the cohort's primary date and the date of the previously processed file. Requires state to be passed from a higher-level orchestrator. Integer
FUNCTION DAYS_SINCE(current_date, previous_date):
  IF previous_date is None: RETURN 0
  time_delta = current_date - previous_date
  RETURN time_delta.in_days()
ENDFUNCTION
{col}_basic_rows Total number of rows in the cohort. Integer
FUNCTION GET_COHORT_ROW_COUNT(cohort_data):
  // This is the total number of records belonging to the current group.
  RETURN cohort_data.row_count
ENDFUNCTION
{col}_basic_total_nulls The total count of null/missing values for this specific col within the cohort. Integer
FUNCTION COUNT_NULLS_IN_COLUMN(column_data):
  // column_data represents the values for this col within the current cohort.
  RETURN column_data.count_nulls()
ENDFUNCTION
{col}_basic_null_percentage The percentage of values for this col that are null within the cohort. Float
FUNCTION CALCULATE_NULL_PERCENTAGE(column_data):
  total_rows = LENGTH(column_data)
  IF total_rows == 0: RETURN 0.0
  null_count = column_data.count_nulls()
  RETURN null_count / total_rows
ENDFUNCTION
{col}_basic_columns_with_nulls A binary flag (1 or 0) indicating if this col contains at least one null value within the cohort. Integer
FUNCTION HAS_NULLS(column_data):
  RETURN 1 IF column_data.count_nulls() > 0 ELSE 0
ENDFUNCTION
{col}_basic_dtype_float64_count A static feature indicating that the col being processed is numeric (specifically, treated as float64). Always returns 1. Integer
FUNCTION GET_DTYPE_COUNT():
  // Placeholder indicating one numeric column was processed.
  RETURN 1
ENDFUNCTION
{col}_basic_avg_column_means The mean of the values for this specific col within the cohort. (Note: The feature name is for consistency; it represents the mean of a single column, not an average of means). Float
FUNCTION CALCULATE_MEAN(column_data):
  IF column_data is empty: RETURN 0.0
  RETURN column_data.mean()
ENDFUNCTION
{col}_basic_std_column_means A placeholder feature, always returns 0.0. Included for schema consistency. Float
FUNCTION GET_STD_OF_MEANS():
  // This feature is not calculated from data and is always a fixed value.
  RETURN 0.0
ENDFUNCTION
{col}_basic_avg_column_stds The standard deviation of the values for this specific col within the cohort. (Note: The feature name is for consistency; it represents the standard deviation of a single column). Float
FUNCTION CALCULATE_STD_DEV(column_data):
  IF LENGTH(column_data) < 2: RETURN 0.0
  RETURN column_data.std_dev()
ENDFUNCTION
{col}_basic_max_of_maxes The maximum value for this specific col within the cohort. (Note: The feature name is for consistency; it represents the max of a single column). Float
FUNCTION CALCULATE_MAX(column_data):
  IF column_data is empty: RETURN 0.0
  RETURN column_data.max()
ENDFUNCTION
{col}_basic_total_zeros The total count of zero values for this specific col within the cohort. Integer
FUNCTION COUNT_ZEROS(column_data):
  RETURN column_data.count_where(value == 0)
ENDFUNCTION
{col}_basic_cardinality A feature than computes the cardinality of the categorical columns Float
FUNCTION GET_MEAN_CARDINALITY():
  RETURN UNIQUE(COLUMN)
ENDFUNCTION

Advanced Statistical Sensors

AdvancedStatisticalSensorExtractor

Goes beyond basic statistics to capture subtle distributional properties and outlier patterns for each specified col within defined cohorts. This sensor is essential for detecting sophisticated data quality anomalies that are invisible to simple mean and standard deviation checks, providing a granular, per-col view of the data's characteristics.

Main Function Logic

CLASS AdvancedStatisticalSensorExtractor:
  INITIALIZE with cohort_columns and col_columns

  FUNCTION fit_transform(data):
    // --- Phase 1: Create Cohorts ---
    CREATE cohort identifier by concatenating cohort_columns
    GROUP data by cohort

    // --- Phase 2: Per-Col, Per-Cohort Analysis ---
    // The following calculations are performed for EACH col within EACH cohort group.
    FOR each col:
      // Distribution Shape
      CALCULATE_SKEWNESS(of the col's data in the cohort)
      CALCULATE_KURTOSIS(of the col's data in the cohort)

      // Percentiles & Variability
      CALCULATE_5TH_AND_95TH_PERCENTILES()
      CALCULATE_PERCENTILE_RANGE(P95 - P5)
      CALCULATE_COEFFICIENT_OF_VARIATION(std / mean)

      // Outlier Counts
      CALCULATE_IQR_BOUNDS(Q1 - 1.5*IQR, Q3 + 1.5*IQR)
      COUNT_VALUES_OUTSIDE_BOUNDS() for IQR outliers
      CALCULATE_Z_SCORE(for each value)
      COUNT_VALUES_WHERE |Z-score| > 3 for Z-score outliers

      // Data Content
      CALCULATE_UNIQUENESS_RATIO(unique_count / total_count)
      CALCULATE_ZEROS_PERCENTAGE()
      CALCULATE_NEGATIVES_PERCENTAGE()

      // Optional Advanced Metrics
      IF enabled, CALCULATE_ENTROPY()
      IF enabled, CALCULATE_MONOTONIC_AND_CONSECUTIVE_PATTERNS()
    ENDFOR

  RETURN DataFrame with detailed statistical features for each col in each cohort

Generated Features

Feature Name Description Data Type
{col}_stat_skewness Measures the asymmetry of the col's data distribution within the cohort. Positive values indicate a tail to the right; negative values indicate a tail to the left. Float
FUNCTION CALCULATE_SKEWNESS(column_data):
  // This is the standardized third central moment.
  mean = MEAN(column_data)
  std_dev = STANDARD_DEVIATION(column_data)
  IF std_dev == 0: RETURN 0.0
  third_moment = MEAN( (value - mean)^3 FOR value in column_data )
  RETURN third_moment / (std_dev^3)
ENDFUNCTION
{col}_stat_kurtosis Measures the 'tailedness' of the col's distribution (excess kurtosis). High values indicate the presence of significant outliers compared to a normal distribution. Float
FUNCTION CALCULATE_EXCESS_KURTOSIS(column_data):
  // This is the standardized fourth central moment, minus 3.
  mean = MEAN(column_data)
  std_dev = STANDARD_DEVIATION(column_data)
  IF std_dev == 0: RETURN 0.0
  fourth_moment = MEAN( (value - mean)^4 FOR value in column_data )
  kurtosis = fourth_moment / (std_dev^4)
  RETURN kurtosis - 3.0
ENDFUNCTION
{col}_stat_percentile_5 The 5th percentile value for the col within the cohort, representing the lower bound of the typical data range. Float
FUNCTION CALCULATE_PERCENTILE(column_data, percentile_q):
  // e.g., for 5th percentile, percentile_q = 5
  sorted_data = SORT(column_data)
  index = (percentile_q / 100) * (LENGTH(sorted_data) - 1)
  RETURN value at index in sorted_data (using interpolation if index is not an integer)
ENDFUNCTION
{col}_stat_percentile_95 The 95th percentile value for the col within the cohort, representing the upper bound of the typical data range. Float
// Uses the same logic as the percentile_5 calculation.
FUNCTION CALCULATE_PERCENTILE(column_data, percentile_q=95):
  sorted_data = SORT(column_data)
  index = (95 / 100) * (LENGTH(sorted_data) - 1)
  RETURN value at index in sorted_data
ENDFUNCTION
{col}_stat_percentile_range The range between the 95th and 5th percentiles for the col, measuring the spread of the bulk of the data. Float
FUNCTION CALCULATE_PERCENTILE_RANGE(column_data):
  p95 = CALCULATE_PERCENTILE(column_data, 95)
  p5 = CALCULATE_PERCENTILE(column_data, 5)
  RETURN p95 - p5
ENDFUNCTION
{col}_stat_cv The Coefficient of Variation (std/mean) for the col. Measures relative variability, allowing comparison of spread across cols with different scales. Float
FUNCTION CALCULATE_CV(column_data):
  mean = MEAN(column_data)
  std_dev = STANDARD_DEVIATION(column_data)
  IF ABS(mean) < 1e-9: RETURN 0.0
  RETURN ABS(std_dev / mean)
ENDFUNCTION
{col}_stat_zeros_pct Percentage of values for this col within the cohort that are exactly zero. Float
FUNCTION CALCULATE_ZEROS_PCT(column_data):
  total_count = LENGTH(column_data)
  IF total_count == 0: RETURN 0.0
  zero_count = COUNT(value == 0 FOR value in column_data)
  RETURN zero_count / total_count
ENDFUNCTION
{col}_stat_negatives_pct Percentage of values for this col within the cohort that are negative. Float
FUNCTION CALCULATE_NEGATIVES_PCT(column_data):
  total_count = LENGTH(column_data)
  IF total_count == 0: RETURN 0.0
  negative_count = COUNT(value < 0 FOR value in column_data)
  RETURN negative_count / total_count
ENDFUNCTION
{col}_stat_unique_ratio Ratio of unique values to total non-null values for the col. Near 1 suggests high cardinality; near 0 suggests repetitive data. Float
FUNCTION CALCULATE_UNIQUE_RATIO(column_data):
  non_null_data = REMOVE_NULLS(column_data)
  total_count = LENGTH(non_null_data)
  IF total_count == 0: RETURN 0.0
  unique_count = COUNT(UNIQUE(non_null_data))
  RETURN unique_count / total_count
ENDFUNCTION
{col}_stat_outliers_iqr_count Total count of outliers for the col detected using the robust Interquartile Range (IQR) method. Integer
FUNCTION COUNT_IQR_OUTLIERS(column_data):
  Q1 = PERCENTILE(column_data, 25)
  Q3 = PERCENTILE(column_data, 75)
  IQR = Q3 - Q1
  lower_bound = Q1 - (1.5 * IQR)
  upper_bound = Q3 + (1.5 * IQR)
  RETURN COUNT(value < lower_bound OR value > upper_bound FOR value in column_data)
ENDFUNCTION
{col}_stat_outliers_iqr_pct Percentage of non-null values for the col that are outliers based on the IQR method. Float
FUNCTION CALCULATE_IQR_OUTLIER_PCT(column_data):
  non_null_count = COUNT_NON_NULLS(column_data)
  IF non_null_count == 0: RETURN 0.0
  outlier_count = COUNT_IQR_OUTLIERS(column_data)
  RETURN outlier_count / non_null_count
ENDFUNCTION
{col}_stat_outliers_zscore_count Total count of outliers for the col where the absolute Z-score is greater than 3. Integer
FUNCTION COUNT_ZSCORE_OUTLIERS(column_data):
  mean = MEAN(column_data)
  std_dev = STANDARD_DEVIATION(column_data)
  IF std_dev == 0: RETURN 0
  RETURN COUNT( ABS((value - mean) / std_dev) > 3 FOR value in column_data )
ENDFUNCTION
{col}_stat_outliers_zscore_pct Percentage of non-null values for the col that are outliers based on the Z-score method. Float
FUNCTION CALCULATE_ZSCORE_OUTLIER_PCT(column_data):
  non_null_count = COUNT_NON_NULLS(column_data)
  IF non_null_count == 0: RETURN 0.0
  outlier_count = COUNT_ZSCORE_OUTLIERS(column_data)
  RETURN outlier_count / non_null_count
ENDFUNCTION
{col}_stat_entropy Shannon entropy of the col's values, measuring unpredictability. Higher entropy means more diverse and less predictable data. Float
FUNCTION CALCULATE_ENTROPY(column_data):
  value_counts = GET_VALUE_COUNTS(column_data)
  total_count = LENGTH(column_data)
  entropy = 0.0
  FOR count in value_counts:
    probability = count / total_count
    entropy = entropy - (probability * LOG2(probability))
  ENDFOR
  RETURN entropy
ENDFUNCTION
{col}_stat_consecutive_same_value_max The length of the longest run of consecutive identical values for the col within the cohort. Integer
FUNCTION GET_MAX_CONSECUTIVE_RUN(column_data):
  max_run = 0
  current_run = 0
  FOR i from 0 to LENGTH(column_data) - 1:
    IF i > 0 AND column_data[i] == column_data[i-1]:
      current_run = current_run + 1
    ELSE:
      current_run = 1
    ENDIF
    IF current_run > max_run: max_run = current_run
  ENDFOR
  RETURN max_run
ENDFUNCTION
{col}_stat_monotonic_increase_pct The percentage of transitions between consecutive data points that are increases (value > previous_value). Float
FUNCTION GET_MONOTONIC_INCREASE_PCT(column_data):
  increases = 0
  transitions = 0
  FOR i from 1 to LENGTH(column_data) - 1:
    transitions = transitions + 1
    IF column_data[i] > column_data[i-1]:
      increases = increases + 1
    ENDIF
  ENDFOR
  IF transitions == 0: RETURN 0.0
  RETURN increases / transitions
ENDFUNCTION
{col}_stat_monotonic_decrease_pct The percentage of transitions between consecutive data points that are decreases (value < previous_value). Float
FUNCTION GET_MONOTONIC_DECREASE_PCT(column_data):
  decreases = 0
  transitions = 0
  FOR i from 1 to LENGTH(column_data) - 1:
    transitions = transitions + 1
    IF column_data[i] < column_data[i-1]:
      decreases = decreases + 1
    ENDIF
  ENDFOR
  IF transitions == 0: RETURN 0.0
  RETURN decreases / transitions
ENDFUNCTION

Cross-Column Relationship Sensors

CrossColumnFeatureExtractor

A modular feature extractor that analyzes cross-column relationships within cohorts (groups) of data. It computes correlation statistics, interaction effects between categorical and numerical columns, and PCA-based dimensionality metrics. Works with any tabular dataset by grouping data into cohorts and computing features for each cohort.

Main Function Logic

CLASS CrossColumnFeatureExtractor:
  INITIALIZE with:
    - cohort_columns: columns to group data by (e.g., ['city', 'date'])
    - correlation_pairs: list of (col1, col2) pairs to correlate
    - interaction_pairs: list of (categorical, numeric) column pairs
    - pca_columns: columns for PCA analysis

  FUNCTION fit_transform(data):
    // --- Phase 1: Create Cohorts ---
    CREATE cohort identifier by concatenating cohort_columns
    GROUP data by cohort

    // --- Phase 2: Correlation Analysis per Cohort ---
    FOR each (col1, col2) pair in correlation_pairs:
      CALCULATE Pearson correlation
      CALCULATE ratio statistics (col1/col2 mean and std)
      CALCULATE difference statistics (|col1-col2| mean and std)
      COUNT valid pairs (non-null values)
      OPTIONALLY CALCULATE Spearman rank correlation
      OPTIONALLY CALCULATE p-values for correlations

    // --- Phase 3: Interaction Features per Cohort ---
    FOR each (categorical_col, numeric_col) pair:
      GROUP BY categorical values within each cohort
      CALCULATE variance of numeric column within each category
      AGGREGATE variances (mean and std)

    // --- Phase 4: PCA Features per Cohort ---
    FOR each cohort:
      STANDARDIZE selected numeric columns
      COMPUTE eigenvalues of covariance matrix
      CALCULATE explained variance ratios
      COUNT components needed for 90% variance

    RETURN DataFrame with features per cohort

Generated Features

Feature Name Description Data Type
cohort_size Number of records in each cohort group. Integer
FUNCTION CALCULATE_COHORT_SIZE(cohort_data):
  // cohort_data is the subset of the full dataframe belonging to one group.
  RETURN COUNT_ROWS(cohort_data)
ENDFUNCTION
corr_{col1}_{col2}_pearson Pearson correlation coefficient between col1 and col2 within the cohort. Float
FUNCTION CALCULATE_PEARSON_CORRELATION(col1_data, col2_data):
  // Measures the linear relationship between two variables.
  covariance = COVARIANCE(col1_data, col2_data)
  std_dev1 = STANDARD_DEVIATION(col1_data)
  std_dev2 = STANDARD_DEVIATION(col2_data)
  IF std_dev1 == 0 OR std_dev2 == 0: RETURN 0.0
  RETURN covariance / (std_dev1 * std_dev2)
ENDFUNCTION
corr_{col1}_{col2}_spearman Spearman rank correlation coefficient between col1 and col2 within the cohort (optional). Float
FUNCTION CALCULATE_SPEARMAN_CORRELATION(col1_data, col2_data):
  // Measures the monotonic relationship using ranks.
  1. Convert each value in col1_data to its rank within the column.
  ranks1 = GET_RANKS(col1_data)
  2. Convert each value in col2_data to its rank within the column.
  ranks2 = GET_RANKS(col2_data)
  3. Calculate the Pearson correlation on the ranked data.
  RETURN CALCULATE_PEARSON_CORRELATION(ranks1, ranks2)
ENDFUNCTION
corr_{col1}_{col2}_valid_pairs Count of non-null value pairs used in correlation calculation. Integer
FUNCTION COUNT_VALID_PAIRS(col1_data, col2_data):
  count = 0
  FOR i from 0 to LENGTH(col1_data) - 1:
    IF col1_data[i] IS NOT NULL AND col2_data[i] IS NOT NULL:
      count = count + 1
    ENDIF
  ENDFOR
  RETURN count
ENDFUNCTION
corr_{col1}_{col2}_ratio_mean Mean of the ratio col1/col2 within the cohort (excludes division by zero). Float
FUNCTION CALCULATE_RATIO_MEAN(col1_data, col2_data):
  ratios = []
  FOR i from 0 to LENGTH(col1_data) - 1:
    IF col2_data[i] IS NOT NULL AND col2_data[i] != 0:
      ratios.APPEND(col1_data[i] / col2_data[i])
    ENDIF
  ENDFOR
  RETURN MEAN(ratios) IF ratios is not empty ELSE 0.0
ENDFUNCTION
corr_{col1}_{col2}_ratio_std Standard deviation of the ratio col1/col2 within the cohort. Float
FUNCTION CALCULATE_RATIO_STD(col1_data, col2_data):
  ratios = []
  FOR i from 0 to LENGTH(col1_data) - 1:
    IF col2_data[i] IS NOT NULL AND col2_data[i] != 0:
      ratios.APPEND(col1_data[i] / col2_data[i])
    ENDIF
  ENDFOR
  RETURN STANDARD_DEVIATION(ratios) IF LENGTH(ratios) > 1 ELSE 0.0
ENDFUNCTION
corr_{col1}_{col2}_diff_mean Mean of absolute difference |col1 - col2| within the cohort. Float
FUNCTION CALCULATE_DIFF_MEAN(col1_data, col2_data):
  differences = []
  FOR i from 0 to LENGTH(col1_data) - 1:
    IF col1_data[i] IS NOT NULL AND col2_data[i] IS NOT NULL:
      differences.APPEND(ABS(col1_data[i] - col2_data[i]))
    ENDIF
  ENDFOR
  RETURN MEAN(differences) IF differences is not empty ELSE 0.0
ENDFUNCTION
corr_{col1}_{col2}_diff_std Standard deviation of absolute difference |col1 - col2| within the cohort. Float
FUNCTION CALCULATE_DIFF_STD(col1_data, col2_data):
  differences = []
  FOR i from 0 to LENGTH(col1_data) - 1:
    IF col1_data[i] IS NOT NULL AND col2_data[i] IS NOT NULL:
      differences.APPEND(ABS(col1_data[i] - col2_data[i]))
    ENDIF
  ENDFOR
  RETURN STANDARD_DEVIATION(differences) IF LENGTH(differences) > 1 ELSE 0.0
ENDFUNCTION
corr_{col1}_{col2}_pearson_pvalue P-value for the Pearson correlation (optional, tests significance). Float
FUNCTION CALCULATE_CORRELATION_PVALUE(correlation_r, num_pairs_n):
  IF num_pairs_n <= 2: RETURN 1.0
  // Calculate t-statistic from the correlation coefficient.
  t_stat = correlation_r * SQRT((num_pairs_n - 2) / (1 - correlation_r^2))
  // Calculate two-tailed p-value from the t-distribution.
  degrees_of_freedom = num_pairs_n - 2
  p_value = 2 * (1 - CUMULATIVE_DISTRIBUTION_FUNCTION_T(ABS(t_stat), degrees_of_freedom))
  RETURN p_value
ENDFUNCTION
interact_mean_variance Average of variances: how much numeric columns vary within categorical groups, averaged across all interaction pairs. Float
FUNCTION CALCULATE_INTERACT_MEAN_VARIANCE(interaction_pairs, cohort_data):
  all_mean_variances = []
  FOR each pair (cat_col, num_col) in interaction_pairs:
    // 1. Group the cohort by the categorical column.
    sub_groups = GROUP_BY(cohort_data, cat_col)
    variances_for_this_pair = []
    // 2. Calculate variance of the numeric column for each sub-group.
    FOR each group in sub_groups:
      variances_for_this_pair.APPEND(VARIANCE(group[num_col]))
    ENDFOR
    // 3. Take the mean of those variances.
    all_mean_variances.APPEND(MEAN(variances_for_this_pair))
  ENDFOR
  // 4. Average the results across all interaction pairs.
  RETURN MEAN(all_mean_variances)
ENDFUNCTION
interact_std_variance Average of standard deviations of variances across interaction pairs. Float
FUNCTION CALCULATE_INTERACT_STD_VARIANCE(interaction_pairs, cohort_data):
  all_std_variances = []
  FOR each pair (cat_col, num_col) in interaction_pairs:
    sub_groups = GROUP_BY(cohort_data, cat_col)
    variances_for_this_pair = []
    FOR each group in sub_groups:
      variances_for_this_pair.APPEND(VARIANCE(group[num_col]))
    ENDFOR
    // 3. Take the standard deviation of those variances.
    all_std_variances.APPEND(STANDARD_DEVIATION(variances_for_this_pair))
  ENDFOR
  // 4. Average the results across all interaction pairs.
  RETURN MEAN(all_std_variances)
ENDFUNCTION
pca_explained_variance_1 Eigenvalue (variance) of the first principal component. Float
FUNCTION GET_PCA_EIGENVALUES(pca_columns_data):
  // 1. Standardize the data (mean=0, std=1 for each column).
  standardized_data = STANDARDIZE(pca_columns_data)
  // 2. Compute the covariance matrix.
  covariance_matrix = COVARIANCE_MATRIX(standardized_data)
  // 3. Calculate eigenvalues and sort them in descending order.
  eigenvalues = EIGENVALUES(covariance_matrix)
  SORT(eigenvalues, descending=True)
  RETURN eigenvalues
ENDFUNCTION

// Feature calculation:
eigenvalues = GET_PCA_EIGENVALUES(cohort_pca_data)
RETURN eigenvalues[0] IF eigenvalues exists ELSE 0.0
pca_explained_variance_2 Eigenvalue (variance) of the second principal component. Float
// Assumes eigenvalues are pre-calculated and sorted.
eigenvalues = GET_PCA_EIGENVALUES(cohort_pca_data)
RETURN eigenvalues[1] IF LENGTH(eigenvalues) > 1 ELSE 0.0
pca_explained_variance_ratio_1 Proportion of total variance explained by the first principal component. Float
eigenvalues = GET_PCA_EIGENVALUES(cohort_pca_data)
IF IS_EMPTY(eigenvalues): RETURN 0.0
total_variance = SUM(eigenvalues)
IF total_variance == 0: RETURN 0.0
RETURN eigenvalues[0] / total_variance
ENDFUNCTION
pca_explained_variance_ratio_2 Proportion of total variance explained by the second principal component. Float
eigenvalues = GET_PCA_EIGENVALUES(cohort_pca_data)
IF LENGTH(eigenvalues) < 2: RETURN 0.0
total_variance = SUM(eigenvalues)
IF total_variance == 0: RETURN 0.0
RETURN eigenvalues[1] / total_variance
ENDFUNCTION
pca_cumulative_variance_2 Cumulative proportion of variance explained by the first two principal components. Float
eigenvalues = GET_PCA_EIGENVALUES(cohort_pca_data)
IF LENGTH(eigenvalues) < 2: RETURN pca_explained_variance_ratio_1
total_variance = SUM(eigenvalues)
IF total_variance == 0: RETURN 0.0
RETURN (eigenvalues[0] + eigenvalues[1]) / total_variance
ENDFUNCTION
pca_n_components_90pct Number of principal components needed to explain at least 90% of total variance. Integer
eigenvalues = GET_PCA_EIGENVALUES(cohort_pca_data)
IF IS_EMPTY(eigenvalues): RETURN 0
total_variance = SUM(eigenvalues)
IF total_variance == 0: RETURN 0
cumulative_variance = 0.0
component_count = 0
FOR each eigenvalue in eigenvalues:
  cumulative_variance = cumulative_variance + (eigenvalue / total_variance)
  component_count = component_count + 1
  IF cumulative_variance >= 0.90: BREAK
ENDFOR
RETURN component_count
ENDFUNCTION

Time Series Sensors

timeseries_features_polars_optimized

An optimized time-series feature extraction module using Polars. It calculates a variety of rolling window features for sensor data, grouped by specified cohort columns.

Main Function Logic

// --- Cohort-Based Rolling Window Analysis ---
CLASS TimeSeriesFeatureExtractor(cohort_columns, sensor_columns, lookback_window, min_history):

  FUNCTION fit_transform(dataframe):
    // 1. Preparation
    CREATE a single '_cohort' identifier from cohort_columns.
    SORT the dataframe by cohort and timestamp.
    ADD a row number '_row_num' for each cohort.

    // 2. Feature Expression Generation
    INITIALIZE an empty list of feature expressions.
    FOR each sensor_column in sensor_columns:
      GENERATE expressions for all feature groups (rolling, statistical, trend, etc.).
      // Each expression uses a rolling window over the cohort.
      // e.g., rolling_mean(sensor_column, window_size=lookback_window).over(cohort)
      ADD expressions to the list.

    // 3. Computation
    APPLY all generated expressions to the dataframe in a single pass.

    // 4. Filtering
    APPLY min_history filter:
      SET feature values to NULL where _row_num < min_history.

    // 5. Finalization
    SELECT final columns.
    RETURN dataframe with new feature columns.

Generated Features

Feature Name Description Data Type
{col}_ts_rolling_mean Rolling mean of col values over the lookback window. Float
FUNCTION CALCULATE_ROLLING_MEAN(window_data):
  IF window_data is empty: RETURN 0.0
  RETURN SUM(window_data) / LENGTH(window_data)
ENDFUNCTION
{col}_ts_rolling_std Rolling standard deviation over the lookback window. Float
FUNCTION CALCULATE_ROLLING_STD(window_data):
  IF LENGTH(window_data) < 2: RETURN 0.0
  mean = MEAN(window_data)
  variance = SUM((x - mean)^2 FOR x in window_data) / (LENGTH(window_data) - 1)
  RETURN SQRT(variance)
ENDFUNCTION
{col}_ts_rolling_min Minimum value in the lookback window. Float
FUNCTION CALCULATE_ROLLING_MIN(window_data):
  IF window_data is empty: RETURN 0.0
  RETURN MINIMUM(window_data)
ENDFUNCTION
{col}_ts_rolling_max Maximum value in the lookback window. Float
FUNCTION CALCULATE_ROLLING_MAX(window_data):
  IF window_data is empty: RETURN 0.0
  RETURN MAXIMUM(window_data)
ENDFUNCTION
{col}_ts_zscore Z-score of current value relative to window statistics. Float
FUNCTION CALCULATE_ZSCORE(current_value, window_data):
  mean = MEAN(window_data)
  std_dev = STANDARD_DEVIATION(window_data)
  IF std_dev < 1e-9: RETURN 0.0
  RETURN (current_value - mean) / std_dev
ENDFUNCTION
{col}_ts_null_ratio Proportion of null values in the lookback window. Float
FUNCTION CALCULATE_NULL_RATIO(window_data):
  IF IS_EMPTY(window_data): RETURN 0.0
  null_count = COUNT(value IS NULL in window_data)
  RETURN null_count / LENGTH(window_data)
ENDFUNCTION
{col}_ts_outlier_score Binary flag (0/1) indicating if current value is an outlier based on IQR. Integer
FUNCTION CALCULATE_OUTLIER_SCORE(current_value, window_data):
  IF LENGTH(window_data) < 4: RETURN 0
  Q1 = PERCENTILE(window_data, 25)
  Q3 = PERCENTILE(window_data, 75)
  IQR = Q3 - Q1
  lower_bound = Q1 - (1.5 * IQR)
  upper_bound = Q3 + (1.5 * IQR)
  RETURN 1 IF (current_value < lower_bound OR current_value > upper_bound) ELSE 0
ENDFUNCTION
{col}_ts_trend_slope Approximated trend slope, calculated as the difference between the current value and the value at the start of the window, divided by the window size. Float
FUNCTION CALCULATE_APPROX_TREND_SLOPE(current_value, window_data, window_size):
  IF LENGTH(window_data) < window_size: RETURN 0.0
  first_value = FIRST_VALUE(window_data)
  RETURN (current_value - first_value) / window_size
ENDFUNCTION
{col}_ts_volatility Coefficient of variation (std/mean) measuring relative variability. Float
FUNCTION CALCULATE_VOLATILITY(window_data):
  mean = MEAN(window_data)
  std_dev = STANDARD_DEVIATION(window_data)
  IF ABS(mean) < 1e-9: RETURN 0.0 IF std_dev == 0 ELSE 1.0
  RETURN std_dev / ABS(mean)
ENDFUNCTION
{col}_ts_stability_score Inverse measure of volatility (1/(1+volatility)). Float
FUNCTION CALCULATE_STABILITY_SCORE(window_data):
  volatility = CALCULATE_VOLATILITY(window_data)
  RETURN 1.0 / (1.0 + volatility)
ENDFUNCTION
{col}_ts_momentum Difference between current value and window mean. Float
FUNCTION CALCULATE_MOMENTUM(current_value, window_data):
  mean = MEAN(window_data)
  RETURN current_value - mean
ENDFUNCTION
{col}_ts_acceleration Rate of change of momentum (difference from previous momentum value). Float
FUNCTION CALCULATE_ACCELERATION(current_momentum, previous_momentum):
  RETURN current_momentum - previous_momentum
ENDFUNCTION
{col}_ts_mean_diff Absolute difference between current value and window mean. Float
FUNCTION CALCULATE_MEAN_DIFF(current_value, window_data):
  mean = MEAN(window_data)
  RETURN ABS(current_value - mean)
ENDFUNCTION
{col}_ts_std_diff The difference between the rolling standard deviation at the current step and the previous step. Float
FUNCTION CALCULATE_STD_DIFF(current_window_std, previous_window_std):
  RETURN current_window_std - previous_window_std
ENDFUNCTION
{col}_ts_range_ratio Position of current value within window range (0-1). Float
FUNCTION CALCULATE_RANGE_RATIO(current_value, window_data):
  min_val = MIN(window_data)
  max_val = MAX(window_data)
  range = max_val - min_val
  IF range < 1e-9: RETURN 0.5
  RETURN (current_value - min_val) / range
ENDFUNCTION
{col}_ts_percentile_rank Percentile rank of current value within window distribution. Float
FUNCTION CALCULATE_PERCENTILE_RANK(current_value, window_data):
  IF IS_EMPTY(window_data): RETURN 50.0
  count_less = COUNT(x < current_value FOR x in window_data)
  RETURN (count_less / LENGTH(window_data)) * 100.0
ENDFUNCTION
{col}_ts_iqr Interquartile range of window values. Float
FUNCTION CALCULATE_IQR(window_data):
  IF LENGTH(window_data) < 2: RETURN 0.0
  Q1 = PERCENTILE(window_data, 25)
  Q3 = PERCENTILE(window_data, 75)
  RETURN Q3 - Q1
ENDFUNCTION
{col}_ts_skewness Skewness of window distribution (third standardized moment). Float
FUNCTION CALCULATE_SKEWNESS(window_data):
  IF LENGTH(window_data) < 3: RETURN 0.0
  n = LENGTH(window_data)
  mean = MEAN(window_data)
  std_dev = STANDARD_DEVIATION(window_data)
  IF std_dev == 0: RETURN 0.0
  third_moment = SUM(((x - mean) / std_dev)^3 FOR x in window_data) / n
  RETURN third_moment
ENDFUNCTION
{col}_ts_kurtosis Excess kurtosis of window distribution (fourth standardized moment - 3). Float
FUNCTION CALCULATE_KURTOSIS(window_data):
  IF LENGTH(window_data) < 4: RETURN 0.0
  n = LENGTH(window_data)
  mean = MEAN(window_data)
  std_dev = STANDARD_DEVIATION(window_data)
  IF std_dev == 0: RETURN 0.0
  fourth_moment = SUM(((x - mean) / std_dev)^4 FOR x in window_data) / n
  RETURN fourth_moment - 3.0
ENDFUNCTION
{col}_ts_value_sum Sum of values in the lookback window. Float
FUNCTION CALCULATE_VALUE_SUM(window_data):
  RETURN SUM(window_data)
ENDFUNCTION
{col}_ts_value_mean_diff Signed difference between current value and window mean. Float
FUNCTION CALCULATE_VALUE_MEAN_DIFF(current_value, window_data):
  mean = MEAN(window_data)
  RETURN current_value - mean
ENDFUNCTION
{col}_ts_value_volatility_ratio Normalized distance from mean (|current-mean|/std). Float
FUNCTION CALCULATE_VOLATILITY_RATIO(current_value, window_data):
  mean = MEAN(window_data)
  std_dev = STANDARD_DEVIATION(window_data)
  IF std_dev < 1e-9: RETURN 0.0
  RETURN ABS(current_value - mean) / std_dev
ENDFUNCTION

Data Drift Detection Sensors

sensor_single_column_drift

Detects data drift by analyzing a single column of data over time, row by row. For each new data point, it compares the recent data window against a historical lookback window using a comprehensive suite of metrics. This includes geometric distances (Euclidean, Cosine), distribution comparison tests (Wasserstein, KS-test, Jensen-Shannon), statistical tests for variance (Levene), and scores that measure sudden changes, gradual trends, and volatility. This multi-faceted approach allows it to detect a wide range of drift types, from sudden spikes to gradual changes in distribution.

Main Function Logic

FUNCTION sensor_single_column_drift(data_column, lookback_window, min_history):
  // --- 1. Initialization ---
  results_dataframe = COPY(input_data_column)
  feature_names = GET_ALL_DRIFT_FEATURE_NAMES()
  FOR each feature_name in feature_names:
    results_dataframe[feature_name] = GET_DEFAULT_VALUE(feature_name)
  ENDFOR

  // --- 2. Iteration and Calculation ---
  FOR i from min_history to END of data_column:
    // --- 2a. Define Time Windows ---
    current_value = data_column[i]
    previous_value = data_column[i-1]
    recent_window_end = i + 1
    recent_window_start = MAX(0, i - lookback_window)
    recent_window = data_column from recent_window_start to recent_window_end
    historical_window_end = recent_window_start
    historical_window_start = MAX(0, historical_window_end - lookback_window)
    historical_window = data_column from historical_window_start to historical_window_end

    // --- 2b. Calculate All Drift Features for the current point 'i' ---
    current_features = INITIALIZE_EMPTY_DICTIONARY()
    current_features['euclidean_distance'] = CALCULATE_EUCLIDEAN_DISTANCE(recent_window, historical_window)
    current_features['cosine_distance'] = CALCULATE_COSINE_DISTANCE(recent_window, historical_window)
    current_features['mahalanobis_distance'] = CALCULATE_MAHALANOBIS_DISTANCE(current_value, historical_window)
    current_features['wasserstein_distance'] = CALCULATE_WASSERSTEIN_DISTANCE(recent_window, historical_window)
    current_features['jensen_shannon_divergence'] = CALCULATE_JENSEN_SHANNON_DIVERGENCE(recent_window, historical_window)
    current_features['ks_test_pvalue'] = PERFORM_KS_TEST(recent_window, historical_window)
    current_features['levene_test_pvalue'] = PERFORM_LEVENE_TEST(recent_window, historical_window)
    current_features['mannwhitney_test_pvalue'] = PERFORM_MANN_WHITNEY_U_TEST(recent_window, historical_window)
    current_features['sudden_change_score'] = CALCULATE_SUDDEN_CHANGE_SCORE(current_value, previous_value, recent_window)
    current_features['gradual_change_score'] = CALCULATE_GRADUAL_CHANGE_SCORE(recent_window)
    current_features['trend_deviation'] = CALCULATE_TREND_DEVIATION(current_value, historical_window)
    current_features['stability_index'] = CALCULATE_STABILITY_INDEX(recent_window)
    current_features['acceleration'] = CALCULATE_ACCELERATION(data_column, i)
    current_features['momentum'] = CALCULATE_MOMENTUM(recent_window)
    current_features['volatility_ratio'] = CALCULATE_VOLATILITY_RATIO(current_value, previous_value, historical_window)
    current_features['percentile_score'] = CALCULATE_PERCENTILE_SCORE(current_value, historical_window)
    current_features['regime_change_prob'] = CALCULATE_REGIME_CHANGE_PROB(current_value, historical_window)

    // --- 2c. Store Results ---
    FOR feature_name, value in current_features:
      results_dataframe[feature_name][i] = value
    ENDFOR
  ENDFOR

  // --- 3. Return ---
  RETURN results_dataframe
ENDFUNCTION

Generated Features

Feature Name Description Data Type
drift_euclidean_distance The geometric distance between the vector of the most recent window of data points and the immediately preceding window. Sensitive to shifts in the mean and overall value levels. Float
FUNCTION CALCULATE_EUCLIDEAN_DISTANCE(recent_window, historical_window):
  IF LENGTH(recent_window) != LENGTH(historical_window): RETURN 0.0
  sum_of_squares = 0.0
  FOR i from 0 to LENGTH(recent_window) - 1:
    difference = recent_window[i] - historical_window[i]
    sum_of_squares = sum_of_squares + (difference * difference)
  ENDFOR
  RETURN SQRT(sum_of_squares)
ENDFUNCTION
drift_cosine_distance Measures the cosine of the angle between two data windows treated as vectors. It is sensitive to changes in the pattern or shape of the data, independent of its magnitude. Float
FUNCTION CALCULATE_COSINE_DISTANCE(recent_window, historical_window):
  IF LENGTH(recent_window) != LENGTH(historical_window): RETURN 1.0
  dot_product = 0.0
  magnitude_recent = 0.0
  magnitude_historical = 0.0
  FOR i from 0 to LENGTH(recent_window) - 1:
    dot_product = dot_product + (recent_window[i] * historical_window[i])
    magnitude_recent = magnitude_recent + (recent_window[i] * recent_window[i])
    magnitude_historical = magnitude_historical + (historical_window[i] * historical_window[i])
  ENDFOR
  magnitude_recent = SQRT(magnitude_recent)
  magnitude_historical = SQRT(magnitude_historical)
  IF magnitude_recent == 0.0 OR magnitude_historical == 0.0: RETURN 1.0
  cosine_similarity = dot_product / (magnitude_recent * magnitude_historical)
  RETURN 1.0 - cosine_similarity
ENDFUNCTION
drift_mahalanobis_distance Measures the distance of the current data window from the center of the historical data distribution, accounting for covariance. Effective at detecting multivariate outliers or changes in correlation. Float
FUNCTION CALCULATE_MAHALANOBIS_DISTANCE(current_point, historical_distribution):
  IF LENGTH(historical_distribution) < 2: RETURN 0.0
  mean_historical = MEAN(historical_distribution)
  variance_historical = VARIANCE(historical_distribution)
  IF variance_historical == 0.0:
    RETURN ABS(current_point - mean_historical)
  ENDIF
  distance = ABS(current_point - mean_historical) / SQRT(variance_historical)
  RETURN distance
ENDFUNCTION
drift_wasserstein_distance Also known as Earth Mover's Distance. Measures the 'work' required to transform the historical data distribution into the distribution including the current point. Highly sensitive to changes in distribution shape. Float
FUNCTION CALCULATE_WASSERSTEIN_DISTANCE(recent_window, historical_window):
  IF IS_EMPTY(recent_window) OR IS_EMPTY(historical_window): RETURN 0.0
  sorted_recent = SORT(recent_window)
  sorted_historical = SORT(historical_window)
  all_points = UNIQUE(CONCATENATE(sorted_recent, sorted_historical))
  all_points = SORT(all_points)
  distance = 0.0
  cdf_recent = 0.0
  cdf_historical = 0.0
  FOR i from 0 to LENGTH(all_points) - 2:
    point1 = all_points[i]
    point2 = all_points[i+1]
    cdf_recent = COUNT(p <= point1 IN sorted_recent) / LENGTH(sorted_recent)
    cdf_historical = COUNT(p <= point1 IN sorted_historical) / LENGTH(sorted_historical)
    distance = distance + ABS(cdf_recent - cdf_historical) * (point2 - point1)
  ENDFOR
  RETURN distance
ENDFUNCTION
drift_jensen_shannon_divergence Measures the similarity between two probability distributions (recent vs. historical). It is a symmetrized version of Kullback-Leibler divergence, providing a score from 0 (identical) to 1 (maximal divergence). Float
FUNCTION CALCULATE_JENSEN_SHANNON_DIVERGENCE(recent_window, historical_window):
  min_val = MIN(MIN(recent_window), MIN(historical_window))
  max_val = MAX(MAX(recent_window), MAX(historical_window))
  bin_edges = CREATE_BINS(min_val, max_val, num_bins=10)
  hist_P = HISTOGRAM(recent_window, bin_edges)
  P = (hist_P + 1e-9) / SUM(hist_P)
  hist_Q = HISTOGRAM(historical_window, bin_edges)
  Q = (hist_Q + 1e-9) / SUM(hist_Q)
  M = 0.5 * (P + Q)
  kl_pm = SUM(P[i] * LOG(P[i] / M[i])) FOR i in P
  kl_qm = SUM(Q[i] * LOG(Q[i] / M[i])) FOR i in Q
  jsd = 0.5 * (kl_pm + kl_qm)
  RETURN jsd
ENDFUNCTION
drift_ks_test_pvalue The p-value from a Kolmogorov-Smirnov test comparing the distribution of a recent window of data to an older, historical window. A low p-value (e.g., < 0.05) indicates a statistically significant change in distribution. Float
FUNCTION PERFORM_KS_TEST(sample1, sample2):
  sorted1 = SORT(sample1)
  sorted2 = SORT(sample2)
  max_diff = 0.0
  n1 = LENGTH(sorted1)
  n2 = LENGTH(sorted2)
  i = 0
  j = 0
  WHILE i < n1 AND j < n2:
    diff = ABS((i+1)/n1 - (j+1)/n2)
    if diff > max_diff: max_diff = diff
    IF sorted1[i] < sorted2[j]: i = i + 1
    ELSE IF sorted2[j] < sorted1[i]: j = j + 1
    ELSE: i = i + 1; j = j + 1
    ENDIF
  ENDWHILE
  p_value = CALCULATE_P_VALUE_FROM_D_STATISTIC(max_diff, n1, n2)
  RETURN p_value
ENDFUNCTION
drift_levene_test_pvalue The p-value from Levene's test, which checks if two sub-samples have equal variances. A low p-value suggests a significant change in the data's volatility or spread. Float
FUNCTION PERFORM_LEVENE_TEST(sample1, sample2):
  median1 = MEDIAN(sample1)
  median2 = MEDIAN(sample2)
  Z1 = [ABS(x - median1) FOR x in sample1]
  Z2 = [ABS(x - median2) FOR x in sample2]
  p_value = T_TEST(Z1, Z2)
  RETURN p_value
ENDFUNCTION
drift_mannwhitney_test_pvalue The p-value from the Mann-Whitney U test, a non-parametric test for comparing the medians of two independent samples. A low p-value suggests that the distributions of the two windows are different. Float
FUNCTION PERFORM_MANN_WHITNEY_U_TEST(sample1, sample2):
  combined_list = CREATE_LIST_OF_PAIRS([(v, 's1') for v in sample1] + [(v, 's2') for v in sample2])
  sorted_combined = SORT(combined_list by value)
  ADD_RANKS_TO(sorted_combined)
  rank_sum_1 = SUM(rank for (value, origin, rank) in sorted_combined if origin == 's1')
  n1 = LENGTH(sample1)
  n2 = LENGTH(sample2)
  U_statistic = rank_sum_1 - (n1 * (n1 + 1) / 2)
  p_value = CALCULATE_P_VALUE_FROM_U(U_statistic, n1, n2)
  RETURN p_value
ENDFUNCTION
drift_sudden_change_score Measures the magnitude of the most recent change (current value vs. previous value) relative to the recent standard deviation. A high score indicates a spike or drop that is unusual compared to recent volatility. Float
FUNCTION CALCULATE_SUDDEN_CHANGE_SCORE(current_value, previous_value, recent_window):
  recent_std_dev = STANDARD_DEVIATION(recent_window)
  IF recent_std_dev < 1e-9: RETURN 0.0
  change = ABS(current_value - previous_value)
  score = change / recent_std_dev
  RETURN score
ENDFUNCTION
drift_gradual_change_score A score based on the slope and R-squared value of a linear regression over the recent data window. A high absolute value indicates a consistent and strong upward or downward trend. Float
FUNCTION CALCULATE_GRADUAL_CHANGE_SCORE(recent_window):
  IF LENGTH(recent_window) < 3: RETURN 0.0
  time_indices = [i for i from 0 to LENGTH(recent_window) - 1]
  slope, intercept, r_value = LINEAR_REGRESSION(x=time_indices, y=recent_window)
  r_squared = r_value * r_value
  mean_val = MEAN(recent_window)
  IF ABS(mean_val) < 1e-9: normalized_slope = slope
  ELSE: normalized_slope = slope / mean_val
  score = r_squared * normalized_slope
  RETURN ABS(score)
ENDFUNCTION
drift_trend_deviation Measures how much the current value deviates from the value predicted by a linear trend fitted on the historical data. A high score indicates a break from the established trend. Float
FUNCTION CALCULATE_TREND_DEVIATION(current_value, historical_window):
  IF LENGTH(historical_window) < 3: RETURN 0.0
  time_indices = [i for i from 0 to LENGTH(historical_window) - 1]
  slope, intercept = LINEAR_REGRESSION(x=time_indices, y=historical_window)
  current_time_index = LENGTH(historical_window)
  predicted_value = (slope * current_time_index) + intercept
  deviation = ABS(current_value - predicted_value)
  historical_std_dev = STANDARD_DEVIATION(historical_window)
  IF historical_std_dev < 1e-9: RETURN deviation
  ELSE: RETURN deviation / historical_std_dev
ENDFUNCTION
drift_stability_index A score from 0 to 1 based on the inverse of the coefficient of variation. A value near 1 indicates very stable and predictable data, while a value near 0 indicates high relative volatility. Float
FUNCTION CALCULATE_STABILITY_INDEX(recent_window):
  IF LENGTH(recent_window) < 2: RETURN 1.0
  mean_val = MEAN(recent_window)
  std_dev = STANDARD_DEVIATION(recent_window)
  IF ABS(mean_val) < 1e-9: RETURN 1.0 / (1.0 + std_dev)
  coefficient_of_variation = std_dev / ABS(mean_val)
  stability = 1.0 / (1.0 + coefficient_of_variation)
  RETURN stability
ENDFUNCTION
drift_acceleration Measures the rate of change of the momentum (velocity). A large positive value indicates that the data is rising at an increasing rate, while a large negative value indicates it is falling at an increasing rate. Float
FUNCTION CALCULATE_ACCELERATION(data_column, current_index):
  IF current_index < 2: RETURN 0.0
  current_value = data_column[current_index]
  previous_value = data_column[current_index - 1]
  pre_previous_value = data_column[current_index - 2]
  velocity_recent = current_value - previous_value
  velocity_prior = previous_value - pre_previous_value
  acceleration = velocity_recent - velocity_prior
  value_range = MAX(ABS(current_value), ABS(previous_value), ABS(pre_previous_value))
  IF value_range == 0: RETURN 0.0
  RETURN acceleration / value_range
ENDFUNCTION
drift_momentum A weighted average of recent changes in the data. Captures the current direction and magnitude of movement in the time series. Float
FUNCTION CALCULATE_MOMENTUM(recent_window):
  IF LENGTH(recent_window) < 2: RETURN 0.0
  changes = []
  FOR i from 1 to LENGTH(recent_window) - 1:
    changes.APPEND(recent_window[i] - recent_window[i-1])
  ENDFOR
  weights = [i for i from 1 to LENGTH(changes)]
  weighted_sum = SUM(changes[i] * weights[i] FOR i in range(LENGTH(changes)))
  sum_of_weights = SUM(weights)
  IF sum_of_weights == 0: RETURN 0.0
  RETURN weighted_sum / sum_of_weights
ENDFUNCTION
drift_volatility_ratio The ratio of the most recent absolute change to the historical standard deviation of changes. A value > 1 suggests the latest change is more volatile than usual. Float
FUNCTION CALCULATE_VOLATILITY_RATIO(current_value, previous_value, historical_window):
  IF LENGTH(historical_window) < 2: RETURN 1.0
  recent_absolute_change = ABS(current_value - previous_value)
  historical_changes = [ABS(historical_window[i] - historical_window[i-1]) for i from 1 to LENGTH-1]
  historical_volatility = STANDARD_DEVIATION(historical_changes)
  IF historical_volatility < 1e-9:
    RETURN 1.0 if recent_absolute_change == 0 else 10.0
  ENDIF
  RETURN recent_absolute_change / historical_volatility
ENDFUNCTION
drift_percentile_score A score from 0 to 1 indicating how extreme the current value is relative to the historical distribution. A value near 1 means the point is close to the historical minimum or maximum. Float
FUNCTION CALCULATE_PERCENTILE_SCORE(current_value, historical_window):
  IF IS_EMPTY(historical_window): RETURN 0.0
  count_less = COUNT(p < current_value IN historical_window)
  percentile_rank = (count_less / LENGTH(historical_window)) * 100.0
  extremeness_score = ABS(percentile_rank - 50.0) / 50.0
  RETURN extremeness_score
ENDFUNCTION
drift_regime_change_prob A probability-like score (0 to 1) that indicates how likely the current value is to be part of a new data regime, based on how far it falls outside the historical 5th and 95th percentiles. Float
FUNCTION CALCULATE_REGIME_CHANGE_PROB(current_value, historical_window):
  IF LENGTH(historical_window) < 20: RETURN 0.0
  p5 = PERCENTILE(historical_window, 5)
  p95 = PERCENTILE(historical_window, 95)
  normal_range_width = p95 - p5
  IF normal_range_width < 1e-9: RETURN 0.0
  deviation = 0.0
  IF current_value > p95: deviation = (current_value - p95) / normal_range_width
  ELSE IF current_value < p5: deviation = (p5 - current_value) / normal_range_width
  probability = 1.0 - EXP(-deviation)
  RETURN probability
ENDFUNCTION

Sensor Framework Documentation - Version 1.0
Last Updated: September 2025