Éanna Ó Catháin | 145c88f | 2020-11-16 14:12:11 +0000 | [diff] [blame] | 1 | # Copyright © 2020 Arm Ltd and Contributors. All rights reserved. |
| 2 | # SPDX-License-Identifier: MIT |
| 3 | |
| 4 | """Class used to extract the Mel-frequency cepstral coefficients from a given audio frame.""" |
| 5 | |
| 6 | import numpy as np |
| 7 | |
| 8 | |
| 9 | class MFCCParams: |
| 10 | def __init__(self, sampling_freq, num_fbank_bins, |
| 11 | mel_lo_freq, mel_hi_freq, num_mfcc_feats, frame_len, use_htk_method, n_FFT): |
| 12 | self.sampling_freq = sampling_freq |
| 13 | self.num_fbank_bins = num_fbank_bins |
| 14 | self.mel_lo_freq = mel_lo_freq |
| 15 | self.mel_hi_freq = mel_hi_freq |
| 16 | self.num_mfcc_feats = num_mfcc_feats |
| 17 | self.frame_len = frame_len |
| 18 | self.use_htk_method = use_htk_method |
| 19 | self.n_FFT = n_FFT |
| 20 | |
| 21 | |
| 22 | class MFCC: |
| 23 | |
| 24 | def __init__(self, mfcc_params): |
| 25 | self.mfcc_params = mfcc_params |
| 26 | self.FREQ_STEP = 200.0 / 3 |
| 27 | self.MIN_LOG_HZ = 1000.0 |
| 28 | self.MIN_LOG_MEL = self.MIN_LOG_HZ / self.FREQ_STEP |
| 29 | self.LOG_STEP = 1.8562979903656 / 27.0 |
| 30 | self.__frame_len_padded = int(2 ** (np.ceil((np.log(self.mfcc_params.frame_len) / np.log(2.0))))) |
| 31 | self.__filter_bank_initialised = False |
| 32 | self.__frame = np.zeros(self.__frame_len_padded) |
| 33 | self.__buffer = np.zeros(self.__frame_len_padded) |
| 34 | self.__filter_bank_filter_first = np.zeros(self.mfcc_params.num_fbank_bins) |
| 35 | self.__filter_bank_filter_last = np.zeros(self.mfcc_params.num_fbank_bins) |
| 36 | self.__mel_energies = np.zeros(self.mfcc_params.num_fbank_bins) |
| 37 | self.__dct_matrix = self.create_dct_matrix(self.mfcc_params.num_fbank_bins, self.mfcc_params.num_mfcc_feats) |
| 38 | self.__mel_filter_bank = self.create_mel_filter_bank() |
| 39 | self.__np_mel_bank = np.zeros([self.mfcc_params.num_fbank_bins, int(self.mfcc_params.n_FFT / 2) + 1]) |
| 40 | |
| 41 | for i in range(self.mfcc_params.num_fbank_bins): |
| 42 | k = 0 |
| 43 | for j in range(int(self.__filter_bank_filter_first[i]), int(self.__filter_bank_filter_last[i]) + 1): |
| 44 | self.__np_mel_bank[i, j] = self.__mel_filter_bank[i][k] |
| 45 | k += 1 |
| 46 | |
| 47 | def mel_scale(self, freq, use_htk_method): |
| 48 | """ |
| 49 | Gets the mel scale for a particular sample frequency. |
| 50 | |
| 51 | Args: |
| 52 | freq: The sampling frequency. |
| 53 | use_htk_method: Boolean to set whether to use HTK method or not. |
| 54 | |
| 55 | Returns: |
| 56 | the mel scale |
| 57 | """ |
| 58 | if use_htk_method: |
| 59 | return 1127.0 * np.log(1.0 + freq / 700.0) |
| 60 | else: |
| 61 | mel = freq / self.FREQ_STEP |
| 62 | |
| 63 | if freq >= self.MIN_LOG_HZ: |
| 64 | mel = self.MIN_LOG_MEL + np.log(freq / self.MIN_LOG_HZ) / self.LOG_STEP |
| 65 | return mel |
| 66 | |
| 67 | def inv_mel_scale(self, mel_freq, use_htk_method): |
| 68 | """ |
| 69 | Gets the sample frequency for a particular mel. |
| 70 | |
| 71 | Args: |
| 72 | mel_freq: The mel frequency. |
| 73 | use_htk_method: Boolean to set whether to use HTK method or not. |
| 74 | |
| 75 | Returns: |
| 76 | the sample frequency |
| 77 | """ |
| 78 | if use_htk_method: |
| 79 | return 700.0 * (np.exp(mel_freq / 1127.0) - 1.0) |
| 80 | else: |
| 81 | freq = self.FREQ_STEP * mel_freq |
| 82 | |
| 83 | if mel_freq >= self.MIN_LOG_MEL: |
| 84 | freq = self.MIN_LOG_HZ * np.exp(self.LOG_STEP * (mel_freq - self.MIN_LOG_MEL)) |
| 85 | return freq |
| 86 | |
| 87 | def mfcc_compute(self, audio_data): |
| 88 | """ |
| 89 | Extracts the MFCC for a single frame. |
| 90 | |
| 91 | Args: |
| 92 | audio_data: The audio data to process. |
| 93 | |
| 94 | Returns: |
| 95 | the MFCC features |
| 96 | """ |
| 97 | if len(audio_data) != self.mfcc_params.frame_len: |
| 98 | raise ValueError( |
| 99 | f"audio_data buffer size {len(audio_data)} does not match the frame length {self.mfcc_params.frame_len}") |
| 100 | |
| 101 | audio_data = np.array(audio_data) |
| 102 | spec = np.abs(np.fft.rfft(np.hanning(self.mfcc_params.n_FFT + 1)[0:self.mfcc_params.n_FFT] * audio_data, |
| 103 | self.mfcc_params.n_FFT)) ** 2 |
| 104 | mel_energy = np.dot(self.__np_mel_bank.astype(np.float32), |
| 105 | np.transpose(spec).astype(np.float32)) |
| 106 | |
| 107 | mel_energy += 1e-10 |
| 108 | log_mel_energy = 10.0 * np.log10(mel_energy) |
| 109 | top_db = 80.0 |
| 110 | |
| 111 | log_mel_energy = np.maximum(log_mel_energy, log_mel_energy.max() - top_db) |
| 112 | |
| 113 | mfcc_feats = np.dot(self.__dct_matrix, log_mel_energy) |
| 114 | |
| 115 | return mfcc_feats |
| 116 | |
| 117 | def create_dct_matrix(self, num_fbank_bins, num_mfcc_feats): |
| 118 | """ |
| 119 | Creates the Discrete Cosine Transform matrix to be used in the compute function. |
| 120 | |
| 121 | Args: |
| 122 | num_fbank_bins: The number of filter bank bins |
| 123 | num_mfcc_feats: the number of MFCC features |
| 124 | |
| 125 | Returns: |
| 126 | the DCT matrix |
| 127 | """ |
| 128 | dct_m = np.zeros(num_fbank_bins * num_mfcc_feats) |
| 129 | for k in range(num_mfcc_feats): |
| 130 | for n in range(num_fbank_bins): |
| 131 | if k == 0: |
| 132 | dct_m[(k * num_fbank_bins) + n] = 2 * np.sqrt(1 / (4 * num_fbank_bins)) * np.cos( |
| 133 | (np.pi / num_fbank_bins) * (n + 0.5) * k) |
| 134 | else: |
| 135 | dct_m[(k * num_fbank_bins) + n] = 2 * np.sqrt(1 / (2 * num_fbank_bins)) * np.cos( |
| 136 | (np.pi / num_fbank_bins) * (n + 0.5) * k) |
| 137 | |
| 138 | dct_m = np.reshape(dct_m, [self.mfcc_params.num_mfcc_feats, self.mfcc_params.num_fbank_bins]) |
| 139 | return dct_m |
| 140 | |
| 141 | def create_mel_filter_bank(self): |
| 142 | """ |
| 143 | Creates the Mel filter bank. |
| 144 | |
| 145 | Returns: |
| 146 | the mel filter bank |
| 147 | """ |
| 148 | num_fft_bins = int(self.__frame_len_padded / 2) |
| 149 | fft_bin_width = self.mfcc_params.sampling_freq / self.__frame_len_padded |
| 150 | |
| 151 | mel_low_freq = self.mel_scale(self.mfcc_params.mel_lo_freq, False) |
| 152 | mel_high_freq = self.mel_scale(self.mfcc_params.mel_hi_freq, False) |
| 153 | mel_freq_delta = (mel_high_freq - mel_low_freq) / (self.mfcc_params.num_fbank_bins + 1) |
| 154 | |
| 155 | this_bin = np.zeros(num_fft_bins) |
| 156 | mel_fbank = [0] * self.mfcc_params.num_fbank_bins |
| 157 | |
| 158 | for bin_num in range(self.mfcc_params.num_fbank_bins): |
| 159 | left_mel = mel_low_freq + bin_num * mel_freq_delta |
| 160 | center_mel = mel_low_freq + (bin_num + 1) * mel_freq_delta |
| 161 | right_mel = mel_low_freq + (bin_num + 2) * mel_freq_delta |
| 162 | first_index = last_index = -1 |
| 163 | |
| 164 | for i in range(num_fft_bins): |
| 165 | freq = (fft_bin_width * i) |
| 166 | mel = self.mel_scale(freq, False) |
| 167 | this_bin[i] = 0.0 |
| 168 | |
| 169 | if (mel > left_mel) and (mel < right_mel): |
| 170 | if mel <= center_mel: |
| 171 | weight = (mel - left_mel) / (center_mel - left_mel) |
| 172 | else: |
| 173 | weight = (right_mel - mel) / (right_mel - center_mel) |
| 174 | |
| 175 | enorm = 2.0 / (self.inv_mel_scale(right_mel, False) - self.inv_mel_scale(left_mel, False)) |
| 176 | weight *= enorm |
| 177 | this_bin[i] = weight |
| 178 | |
| 179 | if first_index == -1: |
| 180 | first_index = i |
| 181 | last_index = i |
| 182 | |
| 183 | self.__filter_bank_filter_first[bin_num] = first_index |
| 184 | self.__filter_bank_filter_last[bin_num] = last_index |
| 185 | mel_fbank[bin_num] = np.zeros(last_index - first_index + 1) |
| 186 | j = 0 |
| 187 | |
| 188 | for i in range(first_index, last_index + 1): |
| 189 | mel_fbank[bin_num][j] = this_bin[i] |
| 190 | j += 1 |
| 191 | |
| 192 | return mel_fbank |
| 193 | |
| 194 | |
| 195 | class Preprocessor: |
| 196 | |
| 197 | def __init__(self, mfcc, model_input_size, stride): |
| 198 | self.model_input_size = model_input_size |
| 199 | self.stride = stride |
| 200 | |
| 201 | # Savitzky - Golay differential filters |
| 202 | self.__savgol_order1_coeffs = np.array([6.66666667e-02, 5.00000000e-02, 3.33333333e-02, |
| 203 | 1.66666667e-02, -3.46944695e-18, -1.66666667e-02, |
| 204 | -3.33333333e-02, -5.00000000e-02, -6.66666667e-02]) |
| 205 | |
| 206 | self.savgol_order2_coeffs = np.array([0.06060606, 0.01515152, -0.01731602, |
| 207 | -0.03679654, -0.04329004, -0.03679654, |
| 208 | -0.01731602, 0.01515152, 0.06060606]) |
| 209 | |
| 210 | self.__mfcc_calc = mfcc |
| 211 | |
| 212 | def __normalize(self, values): |
| 213 | """ |
| 214 | Normalize values to mean 0 and std 1 |
| 215 | """ |
| 216 | ret_val = (values - np.mean(values)) / np.std(values) |
| 217 | return ret_val |
| 218 | |
| 219 | def __get_features(self, features, mfcc_instance, audio_data): |
| 220 | idx = 0 |
| 221 | while len(features) < self.model_input_size * mfcc_instance.mfcc_params.num_mfcc_feats: |
| 222 | features.extend(mfcc_instance.mfcc_compute(audio_data[idx:idx + int(mfcc_instance.mfcc_params.frame_len)])) |
| 223 | idx += self.stride |
| 224 | |
| 225 | def extract_features(self, audio_data): |
| 226 | """ |
| 227 | Extracts the MFCC features, and calculates each features first and second order derivative. |
| 228 | The matrix returned should be sized appropriately for input to the model, based |
| 229 | on the model info specified in the MFCC instance. |
| 230 | |
| 231 | Args: |
| 232 | mfcc_instance: The instance of MFCC used for this calculation |
| 233 | audio_data: the audio data to be used for this calculation |
| 234 | Returns: |
| 235 | the derived MFCC feature vector, sized appropriately for inference |
| 236 | """ |
| 237 | |
| 238 | num_samples_per_inference = ((self.model_input_size - 1) |
| 239 | * self.stride) + self.__mfcc_calc.mfcc_params.frame_len |
| 240 | if len(audio_data) < num_samples_per_inference: |
| 241 | raise ValueError("audio_data size for feature extraction is smaller than " |
| 242 | "the expected number of samples needed for inference") |
| 243 | |
| 244 | features = [] |
| 245 | self.__get_features(features, self.__mfcc_calc, np.asarray(audio_data)) |
| 246 | features = np.reshape(np.array(features), (self.model_input_size, self.__mfcc_calc.mfcc_params.num_mfcc_feats)) |
| 247 | |
| 248 | mfcc_delta_np = np.zeros_like(features) |
| 249 | mfcc_delta2_np = np.zeros_like(features) |
| 250 | |
| 251 | for i in range(features.shape[1]): |
| 252 | idelta = np.convolve(features[:, i], self.__savgol_order1_coeffs, 'same') |
| 253 | mfcc_delta_np[:, i] = (idelta) |
| 254 | ideltadelta = np.convolve(features[:, i], self.savgol_order2_coeffs, 'same') |
| 255 | mfcc_delta2_np[:, i] = (ideltadelta) |
| 256 | |
| 257 | features = np.concatenate((self.__normalize(features), self.__normalize(mfcc_delta_np), |
| 258 | self.__normalize(mfcc_delta2_np)), axis=1) |
| 259 | |
| 260 | return np.float32(features) |