Module: Muze::Core::STFT
- Defined in:
- lib/muze/core/stft.rb
Overview
Short-time Fourier transform and related utilities.
Constant Summary collapse
- EPSILON =
1.0e-12- MAX_N_FFT =
262_144- FREQUENCY_CACHE =
Muze::Core::BoundedCache.new(max_size: 64)
Class Method Summary collapse
- .amplitude_to_db(s, ref: 1.0, amin: 1.0e-5, top_db: 80.0, abs: false) ⇒ Numo::SFloat
- .db_to_amplitude(s_db, ref: 1.0) ⇒ Numo::SFloat
- .db_to_power(s_db, ref: 1.0) ⇒ Numo::SFloat
- .fft_frequencies(sr:, n_fft:) ⇒ Numo::SFloat
- .frames_to_samples(frames, hop_length:) ⇒ Integer, Numo::Int64
- .frames_to_time(frames, sr:, hop_length:) ⇒ Float, Numo::SFloat
- .istft(stft_matrix, hop_length: 512, win_length: nil, window: :hann, center: true, length: nil, dtype: Numo::SFloat, periodic: false) ⇒ Numo::SFloat
- .magphase(stft_matrix, eps: EPSILON, dtype: Numo::SFloat) ⇒ Array<Numo::SFloat, Numo::DComplex>
- .power_to_db(s, ref: 1.0, amin: 1.0e-10, top_db: 80.0) ⇒ Numo::SFloat
- .samples_to_frames(samples, hop_length:) ⇒ Integer, Numo::Int64
- .samples_to_time(samples, sr:) ⇒ Object
-
.stft(y, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: true, pad_mode: :reflect, pad_end: false, periodic: false) ⇒ Numo::DComplex
Shape: [1 + n_fft/2, frames].
- .stft_stream(chunks, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: false, pad_mode: :reflect, periodic: false, flush: true) ⇒ Array<Numo::DComplex>
- .time_to_frames(times, sr:, hop_length:) ⇒ Integer, Numo::Int64
- .time_to_samples(times, sr:) ⇒ Object
Class Method Details
.amplitude_to_db(s, ref: 1.0, amin: 1.0e-5, top_db: 80.0, abs: false) ⇒ Numo::SFloat
160 161 162 163 164 165 166 167 168 |
# File 'lib/muze/core/stft.rb', line 160 def amplitude_to_db(s, ref: 1.0, amin: 1.0e-5, top_db: 80.0, abs: false) magnitude = if s.is_a?(Numo::DComplex) s.abs.cast_to(Numo::SFloat) else values = Numo::SFloat.cast(s) abs ? values.abs : values end log_scale(magnitude, ref:, amin:, top_db:, multiplier: 20.0) end |
.db_to_amplitude(s_db, ref: 1.0) ⇒ Numo::SFloat
183 184 185 186 |
# File 'lib/muze/core/stft.rb', line 183 def db_to_amplitude(s_db, ref: 1.0) validate_db_inverse_args!(s_db, ref) Numo::SFloat.cast(ref.to_f * Numo::NMath.exp((Numo::SFloat.cast(s_db) / 20.0) * Math.log(10.0))) end |
.db_to_power(s_db, ref: 1.0) ⇒ Numo::SFloat
191 192 193 194 |
# File 'lib/muze/core/stft.rb', line 191 def db_to_power(s_db, ref: 1.0) validate_db_inverse_args!(s_db, ref) Numo::SFloat.cast(ref.to_f * Numo::NMath.exp((Numo::SFloat.cast(s_db) / 10.0) * Math.log(10.0))) end |
.fft_frequencies(sr:, n_fft:) ⇒ Numo::SFloat
199 200 201 202 203 204 205 |
# File 'lib/muze/core/stft.rb', line 199 def fft_frequencies(sr:, n_fft:) raise Muze::ParameterError, "sr must be a positive integer" unless sr.is_a?(Integer) && sr.positive? raise Muze::ParameterError, "n_fft must be a positive integer" unless n_fft.is_a?(Integer) && n_fft.positive? key = [sr, n_fft] FREQUENCY_CACHE.fetch(key) { Numo::SFloat.cast(Array.new((n_fft / 2) + 1) { |index| index * sr.to_f / n_fft }) }.dup end |
.frames_to_samples(frames, hop_length:) ⇒ Integer, Numo::Int64
226 227 228 229 230 |
# File 'lib/muze/core/stft.rb', line 226 def frames_to_samples(frames, hop_length:) raise Muze::ParameterError, "hop_length must be positive" unless hop_length.positive? map_scalar_or_array(frames) { |frame| (frame.to_i * hop_length).to_i } end |
.frames_to_time(frames, sr:, hop_length:) ⇒ Float, Numo::SFloat
211 212 213 |
# File 'lib/muze/core/stft.rb', line 211 def frames_to_time(frames, sr:, hop_length:) samples_to_time(frames_to_samples(frames, hop_length:), sr:) end |
.istft(stft_matrix, hop_length: 512, win_length: nil, window: :hann, center: true, length: nil, dtype: Numo::SFloat, periodic: false) ⇒ Numo::SFloat
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/muze/core/stft.rb', line 61 def istft(stft_matrix, hop_length: 512, win_length: nil, window: :hann, center: true, length: nil, dtype: Numo::SFloat, periodic: false) stft_matrix = cast_complex_matrix(stft_matrix, "stft_matrix") frequency_bins, frame_count = stft_matrix.shape n_fft = (frequency_bins - 1) * 2 win_length ||= n_fft validate_stft_params!(n_fft:, hop_length:, win_length:) raise Muze::ParameterError, "length must be non-negative" if length && (!length.is_a?(Integer) || length.negative?) signal_length = n_fft + (hop_length * [frame_count - 1, 0].max) output = Array.new(signal_length, 0.0) window_sums = Array.new(signal_length, 0.0) window_values = Muze::Core::Windows.resolve(window, win_length, periodic:).to_a window_offset = (n_fft - win_length) / 2 frame_count.times do |frame_index| half_spectrum = Array.new(frequency_bins) { |bin| stft_matrix[bin, frame_index] } time_domain = ifft_real(half_spectrum).to_a win_length.times do |index| output_index = (frame_index * hop_length) + index + window_offset break if output_index >= signal_length window_value = window_values[index] frame_value = time_domain[index + window_offset] output[output_index] += frame_value * window_value window_sums[output_index] += window_value * window_value end end output.map!.with_index do |value, index| denominator = window_sums[index] denominator > EPSILON ? (value / denominator) : value end if center pad = n_fft / 2 output = output[pad...(output.length - pad)] || [] end output = adjust_length(output, length) if length dtype_class(dtype).cast(output) end |
.magphase(stft_matrix, eps: EPSILON, dtype: Numo::SFloat) ⇒ Array<Numo::SFloat, Numo::DComplex>
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/muze/core/stft.rb', line 108 def magphase(stft_matrix, eps: EPSILON, dtype: Numo::SFloat) unless eps.respond_to?(:positive?) && eps.respond_to?(:finite?) && eps.positive? && eps.finite? raise Muze::ParameterError, "eps must be positive" end stft_matrix = cast_complex_matrix(stft_matrix, "stft_matrix") magnitude = stft_matrix.abs.cast_to(dtype_class(dtype)) phase = stft_matrix / (magnitude + eps) [magnitude, phase] end |
.power_to_db(s, ref: 1.0, amin: 1.0e-10, top_db: 80.0) ⇒ Numo::SFloat
175 176 177 178 |
# File 'lib/muze/core/stft.rb', line 175 def power_to_db(s, ref: 1.0, amin: 1.0e-10, top_db: 80.0) power = Numo::SFloat.cast(s) log_scale(power, ref:, amin:, top_db:, multiplier: 10.0) end |
.samples_to_frames(samples, hop_length:) ⇒ Integer, Numo::Int64
235 236 237 238 239 |
# File 'lib/muze/core/stft.rb', line 235 def samples_to_frames(samples, hop_length:) raise Muze::ParameterError, "hop_length must be positive" unless hop_length.positive? map_scalar_or_array(samples) { |sample| (sample.to_i / hop_length.to_f).floor } end |
.samples_to_time(samples, sr:) ⇒ Object
453 454 455 456 457 |
# File 'lib/muze/core/stft.rb', line 453 def samples_to_time(samples, sr:) raise Muze::ParameterError, "sr must be positive" unless sr.positive? map_scalar_or_array(samples) { |sample| sample.to_f / sr } end |
.stft(y, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: true, pad_mode: :reflect, pad_end: false, periodic: false) ⇒ Numo::DComplex
Returns shape: [1 + n_fft/2, frames].
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/muze/core/stft.rb', line 21 def stft(y, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: true, pad_mode: :reflect, pad_end: false, periodic: false) win_length ||= n_fft validate_stft_params!(n_fft:, hop_length:, win_length:) validate_pad_mode!(pad_mode) signal = signal_to_a(y) signal = pad_signal(signal, n_fft / 2, pad_mode) if center signal = signal.empty? ? [0.0] : signal window_values = Muze::Core::Windows.resolve(window, win_length, periodic:).to_a window_offset = (n_fft - win_length) / 2 frame_count = analysis_frame_count(signal.length, n_fft:, hop_length:, pad_end:) frequency_bins = (n_fft / 2) + 1 stft_matrix = Numo::DComplex.zeros(frequency_bins, frame_count) frame_count.times do |frame_index| frame_start = frame_index * hop_length windowed = Array.new(n_fft, 0.0) win_length.times do |index| frame_index_in_window = index + window_offset source_index = frame_start + frame_index_in_window sample = source_index < signal.length ? signal[source_index] : 0.0 windowed[frame_index_in_window] = sample * window_values[index] end spectrum = fft_real(windowed) frequency_bins.times { |bin| stft_matrix[bin, frame_index] = spectrum[bin] } end stft_matrix end |
.stft_stream(chunks, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: false, pad_mode: :reflect, periodic: false, flush: true) ⇒ Array<Numo::DComplex>
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/muze/core/stft.rb', line 121 def stft_stream(chunks, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: false, pad_mode: :reflect, periodic: false, flush: true) return chunks.map { |chunk| stft(chunk, n_fft:, hop_length:, win_length:, window:, center:, pad_mode:, pad_end: true, periodic:) } if center win_length ||= n_fft validate_stft_params!(n_fft:, hop_length:, win_length:) raise Muze::ParameterError, "flush must be true or false" unless [true, false].include?(flush) buffer = [] results = [] sentinel = Object.new enumerator = chunks.each chunk = next_stream_chunk(enumerator, sentinel) until chunk.equal?(sentinel) following = next_stream_chunk(enumerator, sentinel) final = following.equal?(sentinel) buffer.concat(signal_to_a(chunk)) frame_count = stream_frame_count(buffer.length, n_fft:, hop_length:, final: final && flush) results << if frame_count.zero? empty_stft_matrix(n_fft) else matrix = stft(buffer, n_fft:, hop_length:, win_length:, window:, center: false, pad_end: final && flush, periodic:) emitted = matrix.shape[1] consumed = final && flush ? buffer.length : emitted * hop_length buffer = buffer[consumed..] || [] matrix end chunk = following end results end |
.time_to_frames(times, sr:, hop_length:) ⇒ Integer, Numo::Int64
219 220 221 |
# File 'lib/muze/core/stft.rb', line 219 def time_to_frames(times, sr:, hop_length:) samples_to_frames(time_to_samples(times, sr:), hop_length:) end |
.time_to_samples(times, sr:) ⇒ Object
447 448 449 450 451 |
# File 'lib/muze/core/stft.rb', line 447 def time_to_samples(times, sr:) raise Muze::ParameterError, "sr must be positive" unless sr.positive? map_scalar_or_array(times) { |time| (time.to_f * sr).round } end |