Module: Muze::Core::STFT

Defined in:
lib/muze/core/stft.rb

Overview

Short-time Fourier transform and related utilities.

Constant Summary collapse

EPSILON =
1.0e-12
MAX_N_FFT =
262_144
FREQUENCY_CACHE =
Muze::Core::BoundedCache.new(max_size: 64)

Class Method Summary collapse

Class Method Details

.amplitude_to_db(s, ref: 1.0, amin: 1.0e-5, top_db: 80.0, abs: false) ⇒ Numo::SFloat

Parameters:

  • s (Numo::NArray)
  • ref (Float, Symbol, Proc) (defaults to: 1.0)
  • amin (Float) (defaults to: 1.0e-5)
  • top_db (Float, nil) (defaults to: 80.0)
  • abs (Boolean) (defaults to: false)

Returns:

  • (Numo::SFloat)


160
161
162
163
164
165
166
167
168
# File 'lib/muze/core/stft.rb', line 160

def amplitude_to_db(s, ref: 1.0, amin: 1.0e-5, top_db: 80.0, abs: false)
  magnitude = if s.is_a?(Numo::DComplex)
                s.abs.cast_to(Numo::SFloat)
              else
                values = Numo::SFloat.cast(s)
                abs ? values.abs : values
              end
  log_scale(magnitude, ref:, amin:, top_db:, multiplier: 20.0)
end

.db_to_amplitude(s_db, ref: 1.0) ⇒ Numo::SFloat

Parameters:

  • s_db (Numo::NArray)
  • ref (Float) (defaults to: 1.0)

Returns:

  • (Numo::SFloat)


183
184
185
186
# File 'lib/muze/core/stft.rb', line 183

def db_to_amplitude(s_db, ref: 1.0)
  validate_db_inverse_args!(s_db, ref)
  Numo::SFloat.cast(ref.to_f * Numo::NMath.exp((Numo::SFloat.cast(s_db) / 20.0) * Math.log(10.0)))
end

.db_to_power(s_db, ref: 1.0) ⇒ Numo::SFloat

Parameters:

  • s_db (Numo::NArray)
  • ref (Float) (defaults to: 1.0)

Returns:

  • (Numo::SFloat)


191
192
193
194
# File 'lib/muze/core/stft.rb', line 191

def db_to_power(s_db, ref: 1.0)
  validate_db_inverse_args!(s_db, ref)
  Numo::SFloat.cast(ref.to_f * Numo::NMath.exp((Numo::SFloat.cast(s_db) / 10.0) * Math.log(10.0)))
end

.fft_frequencies(sr:, n_fft:) ⇒ Numo::SFloat

Parameters:

  • sr (Integer)
  • n_fft (Integer)

Returns:

  • (Numo::SFloat)

Raises:



199
200
201
202
203
204
205
# File 'lib/muze/core/stft.rb', line 199

def fft_frequencies(sr:, n_fft:)
  raise Muze::ParameterError, "sr must be a positive integer" unless sr.is_a?(Integer) && sr.positive?
  raise Muze::ParameterError, "n_fft must be a positive integer" unless n_fft.is_a?(Integer) && n_fft.positive?

  key = [sr, n_fft]
  FREQUENCY_CACHE.fetch(key) { Numo::SFloat.cast(Array.new((n_fft / 2) + 1) { |index| index * sr.to_f / n_fft }) }.dup
end

.frames_to_samples(frames, hop_length:) ⇒ Integer, Numo::Int64

Parameters:

  • frames (Integer, Array<Integer>, Numo::NArray)
  • hop_length (Integer)

Returns:

  • (Integer, Numo::Int64)

Raises:



226
227
228
229
230
# File 'lib/muze/core/stft.rb', line 226

def frames_to_samples(frames, hop_length:)
  raise Muze::ParameterError, "hop_length must be positive" unless hop_length.positive?

  map_scalar_or_array(frames) { |frame| (frame.to_i * hop_length).to_i }
end

.frames_to_time(frames, sr:, hop_length:) ⇒ Float, Numo::SFloat

Parameters:

  • frames (Integer, Array<Integer>, Numo::NArray)
  • sr (Integer)
  • hop_length (Integer)

Returns:

  • (Float, Numo::SFloat)


211
212
213
# File 'lib/muze/core/stft.rb', line 211

def frames_to_time(frames, sr:, hop_length:)
  samples_to_time(frames_to_samples(frames, hop_length:), sr:)
end

.istft(stft_matrix, hop_length: 512, win_length: nil, window: :hann, center: true, length: nil, dtype: Numo::SFloat, periodic: false) ⇒ Numo::SFloat

Parameters:

  • stft_matrix (Numo::DComplex)
  • hop_length (Integer) (defaults to: 512)
  • win_length (Integer, nil) (defaults to: nil)
  • window (Symbol) (defaults to: :hann)
  • center (Boolean) (defaults to: true)
  • length (Integer, nil) (defaults to: nil)

Returns:

  • (Numo::SFloat)

Raises:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/muze/core/stft.rb', line 61

def istft(stft_matrix, hop_length: 512, win_length: nil, window: :hann, center: true, length: nil, dtype: Numo::SFloat, periodic: false)
  stft_matrix = cast_complex_matrix(stft_matrix, "stft_matrix")
  frequency_bins, frame_count = stft_matrix.shape
  n_fft = (frequency_bins - 1) * 2
  win_length ||= n_fft
  validate_stft_params!(n_fft:, hop_length:, win_length:)
  raise Muze::ParameterError, "length must be non-negative" if length && (!length.is_a?(Integer) || length.negative?)

  signal_length = n_fft + (hop_length * [frame_count - 1, 0].max)
  output = Array.new(signal_length, 0.0)
  window_sums = Array.new(signal_length, 0.0)
  window_values = Muze::Core::Windows.resolve(window, win_length, periodic:).to_a
  window_offset = (n_fft - win_length) / 2

  frame_count.times do |frame_index|
    half_spectrum = Array.new(frequency_bins) { |bin| stft_matrix[bin, frame_index] }
    time_domain = ifft_real(half_spectrum).to_a

    win_length.times do |index|
      output_index = (frame_index * hop_length) + index + window_offset
      break if output_index >= signal_length

      window_value = window_values[index]
      frame_value = time_domain[index + window_offset]
      output[output_index] += frame_value * window_value
      window_sums[output_index] += window_value * window_value
    end
  end

  output.map!.with_index do |value, index|
    denominator = window_sums[index]
    denominator > EPSILON ? (value / denominator) : value
  end

  if center
    pad = n_fft / 2
    output = output[pad...(output.length - pad)] || []
  end

  output = adjust_length(output, length) if length
  dtype_class(dtype).cast(output)
end

.magphase(stft_matrix, eps: EPSILON, dtype: Numo::SFloat) ⇒ Array<Numo::SFloat, Numo::DComplex>

Parameters:

  • stft_matrix (Numo::DComplex)
  • eps (Float) (defaults to: EPSILON)
  • dtype (Class, Symbol) (defaults to: Numo::SFloat)

Returns:

  • (Array<Numo::SFloat, Numo::DComplex>)


108
109
110
111
112
113
114
115
116
117
# File 'lib/muze/core/stft.rb', line 108

def magphase(stft_matrix, eps: EPSILON, dtype: Numo::SFloat)
  unless eps.respond_to?(:positive?) && eps.respond_to?(:finite?) && eps.positive? && eps.finite?
    raise Muze::ParameterError, "eps must be positive"
  end

  stft_matrix = cast_complex_matrix(stft_matrix, "stft_matrix")
  magnitude = stft_matrix.abs.cast_to(dtype_class(dtype))
  phase = stft_matrix / (magnitude + eps)
  [magnitude, phase]
end

.power_to_db(s, ref: 1.0, amin: 1.0e-10, top_db: 80.0) ⇒ Numo::SFloat

Parameters:

  • s (Numo::NArray)
  • ref (Float, Symbol, Proc) (defaults to: 1.0)
  • amin (Float) (defaults to: 1.0e-10)
  • top_db (Float, nil) (defaults to: 80.0)

Returns:

  • (Numo::SFloat)


175
176
177
178
# File 'lib/muze/core/stft.rb', line 175

def power_to_db(s, ref: 1.0, amin: 1.0e-10, top_db: 80.0)
  power = Numo::SFloat.cast(s)
  log_scale(power, ref:, amin:, top_db:, multiplier: 10.0)
end

.samples_to_frames(samples, hop_length:) ⇒ Integer, Numo::Int64

Parameters:

  • samples (Integer, Array<Integer>, Numo::NArray)
  • hop_length (Integer)

Returns:

  • (Integer, Numo::Int64)

Raises:



235
236
237
238
239
# File 'lib/muze/core/stft.rb', line 235

def samples_to_frames(samples, hop_length:)
  raise Muze::ParameterError, "hop_length must be positive" unless hop_length.positive?

  map_scalar_or_array(samples) { |sample| (sample.to_i / hop_length.to_f).floor }
end

.samples_to_time(samples, sr:) ⇒ Object



453
454
455
456
457
# File 'lib/muze/core/stft.rb', line 453

def samples_to_time(samples, sr:)
  raise Muze::ParameterError, "sr must be positive" unless sr.positive?

  map_scalar_or_array(samples) { |sample| sample.to_f / sr }
end

.stft(y, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: true, pad_mode: :reflect, pad_end: false, periodic: false) ⇒ Numo::DComplex

Returns shape: [1 + n_fft/2, frames].

Parameters:

  • y (Numo::SFloat, Array<Float>)

    waveform signal

  • n_fft (Integer) (defaults to: 2048)
  • hop_length (Integer) (defaults to: 512)
  • win_length (Integer, nil) (defaults to: nil)
  • window (Symbol) (defaults to: :hann)
  • center (Boolean) (defaults to: true)
  • pad_mode (Symbol) (defaults to: :reflect)
  • pad_end (Boolean) (defaults to: false)

Returns:

  • (Numo::DComplex)

    shape: [1 + n_fft/2, frames]



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/muze/core/stft.rb', line 21

def stft(y, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: true, pad_mode: :reflect, pad_end: false, periodic: false)
  win_length ||= n_fft
  validate_stft_params!(n_fft:, hop_length:, win_length:)
  validate_pad_mode!(pad_mode)

  signal = signal_to_a(y)
  signal = pad_signal(signal, n_fft / 2, pad_mode) if center
  signal = signal.empty? ? [0.0] : signal

  window_values = Muze::Core::Windows.resolve(window, win_length, periodic:).to_a
  window_offset = (n_fft - win_length) / 2
  frame_count = analysis_frame_count(signal.length, n_fft:, hop_length:, pad_end:)

  frequency_bins = (n_fft / 2) + 1
  stft_matrix = Numo::DComplex.zeros(frequency_bins, frame_count)

  frame_count.times do |frame_index|
    frame_start = frame_index * hop_length
    windowed = Array.new(n_fft, 0.0)
    win_length.times do |index|
      frame_index_in_window = index + window_offset
      source_index = frame_start + frame_index_in_window
      sample = source_index < signal.length ? signal[source_index] : 0.0
      windowed[frame_index_in_window] = sample * window_values[index]
    end

    spectrum = fft_real(windowed)
    frequency_bins.times { |bin| stft_matrix[bin, frame_index] = spectrum[bin] }
  end

  stft_matrix
end

.stft_stream(chunks, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: false, pad_mode: :reflect, periodic: false, flush: true) ⇒ Array<Numo::DComplex>

Parameters:

  • chunks (Enumerable<Array<Float>, Numo::NArray>)

Returns:

  • (Array<Numo::DComplex>)

Raises:



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/muze/core/stft.rb', line 121

def stft_stream(chunks, n_fft: 2048, hop_length: 512, win_length: nil, window: :hann, center: false, pad_mode: :reflect, periodic: false, flush: true)
  return chunks.map { |chunk| stft(chunk, n_fft:, hop_length:, win_length:, window:, center:, pad_mode:, pad_end: true, periodic:) } if center

  win_length ||= n_fft
  validate_stft_params!(n_fft:, hop_length:, win_length:)
  raise Muze::ParameterError, "flush must be true or false" unless [true, false].include?(flush)

  buffer = []
  results = []
  sentinel = Object.new
  enumerator = chunks.each
  chunk = next_stream_chunk(enumerator, sentinel)

  until chunk.equal?(sentinel)
    following = next_stream_chunk(enumerator, sentinel)
    final = following.equal?(sentinel)
    buffer.concat(signal_to_a(chunk))
    frame_count = stream_frame_count(buffer.length, n_fft:, hop_length:, final: final && flush)
    results << if frame_count.zero?
                 empty_stft_matrix(n_fft)
               else
                 matrix = stft(buffer, n_fft:, hop_length:, win_length:, window:, center: false, pad_end: final && flush, periodic:)
                 emitted = matrix.shape[1]
                 consumed = final && flush ? buffer.length : emitted * hop_length
                 buffer = buffer[consumed..] || []
                 matrix
               end
    chunk = following
  end

  results
end

.time_to_frames(times, sr:, hop_length:) ⇒ Integer, Numo::Int64

Parameters:

  • times (Float, Array<Float>, Numo::NArray)
  • sr (Integer)
  • hop_length (Integer)

Returns:

  • (Integer, Numo::Int64)


219
220
221
# File 'lib/muze/core/stft.rb', line 219

def time_to_frames(times, sr:, hop_length:)
  samples_to_frames(time_to_samples(times, sr:), hop_length:)
end

.time_to_samples(times, sr:) ⇒ Object



447
448
449
450
451
# File 'lib/muze/core/stft.rb', line 447

def time_to_samples(times, sr:)
  raise Muze::ParameterError, "sr must be positive" unless sr.positive?

  map_scalar_or_array(times) { |time| (time.to_f * sr).round }
end