Module: Muze::Effects
- Defined in:
- lib/muze/effects/streaming.rb,
lib/muze/effects/time_stretch.rb,
lib/muze/effects/harmonic_percussive.rb
Constant Summary collapse
- MIN_PHASE_VOCODER_SAMPLES =
Use smaller FFTs for short clips so phase vocoding remains practical.
32_768- MIN_TIME_STRETCH_RATE =
1.0 / 32.0
- MAX_TIME_STRETCH_RATE =
32.0
Class Method Summary collapse
- .deemphasis(y, coef: 0.97) ⇒ Object
-
.hpss(y, kernel_size: 31, power: 2.0, margin: 1.0, n_fft: 2048, hop_length: 512, return_masks: false) ⇒ Array(Numo::SFloat, Numo::SFloat)
Harmonic and percussive waveforms.
- .hpss_stream(chunks, kernel_size: 31, power: 2.0, margin: 1.0, n_fft: 2048, hop_length: 512, overlap: n_fft) ⇒ Object
- .pitch_shift(y, sr: 22_050, n_steps: 0, bins_per_octave: 12, res_type: :auto, normalize: false, clip: nil) ⇒ Numo::SFloat
- .pitch_shift_stream(chunks, sr: 22_050, n_steps: 0, bins_per_octave: 12, res_type: :auto, normalize: false, clip: nil, overlap: 2048) ⇒ Object
- .preemphasis(y, coef: 0.97) ⇒ Object
- .time_stretch(y, rate: 1.0, n_fft: nil, hop_length: nil, method: :phase_vocoder, phase_lock: false, force_phase_vocoder: false) ⇒ Numo::SFloat
- .time_stretch_stream(chunks, rate: 1.0, n_fft: nil, hop_length: nil, method: :phase_vocoder, phase_lock: false, force_phase_vocoder: false, overlap: 2048) ⇒ Object
-
.trim(y, top_db: 60, frame_length: 2048, hop_length: 512, ref: :max, aggregate: :mean, units: :samples, sr: nil) ⇒ Array(Numo::SFloat, Array<Integer>)
Trimmed signal and [start, end].
Class Method Details
.deemphasis(y, coef: 0.97) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/muze/effects/time_stretch.rb', line 116 def deemphasis(y, coef: 0.97) validate_finite_number!(coef, "coef") matrix = Muze::Core::Audio.validate_audio!(y, allow_empty: true) return apply_channels(matrix) { |channel| deemphasis(channel, coef:) } if matrix.ndim == 2 signal = matrix.to_a return Numo::SFloat.cast(signal) if signal.empty? output = Array.new(signal.length, 0.0) output[0] = signal[0] (1...signal.length).each { |index| output[index] = signal[index] + (coef * output[index - 1]) } Numo::SFloat.cast(output) end |
.hpss(y, kernel_size: 31, power: 2.0, margin: 1.0, n_fft: 2048, hop_length: 512, return_masks: false) ⇒ Array(Numo::SFloat, Numo::SFloat)
Returns harmonic and percussive waveforms.
14 15 16 17 18 19 20 21 22 |
# File 'lib/muze/effects/harmonic_percussive.rb', line 14 def hpss(y, kernel_size: 31, power: 2.0, margin: 1.0, n_fft: 2048, hop_length: 512, return_masks: false) validate_hpss_params!(kernel_size:, power:, margin:) raise Muze::ParameterError, "return_masks must be true or false" unless [true, false].include?(return_masks) signal = Muze::Core::Audio.validate_audio!(y, allow_empty: true) return hpss_channels(signal, kernel_size:, power:, margin:, n_fft:, hop_length:, return_masks:) if signal.ndim == 2 hpss_mono(signal, kernel_size:, power:, margin:, n_fft:, hop_length:, return_masks:) end |
.hpss_stream(chunks, kernel_size: 31, power: 2.0, margin: 1.0, n_fft: 2048, hop_length: 512, overlap: n_fft) ⇒ Object
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/muze/effects/streaming.rb', line 31 def hpss_stream(chunks, kernel_size: 31, power: 2.0, margin: 1.0, n_fft: 2048, hop_length: 512, overlap: n_fft) return enum_for(__method__, chunks, kernel_size:, power:, margin:, n_fft:, hop_length:, overlap:) unless block_given? validate_stream_overlap!(overlap) stream_effect_chunks(chunks, overlap:) do |working, prefix_frames| harmonic, percussive = hpss(working, kernel_size:, power:, margin:, n_fft:, hop_length:) yield drop_audio_frames(harmonic, prefix_frames), drop_audio_frames(percussive, prefix_frames) end nil end |
.pitch_shift(y, sr: 22_050, n_steps: 0, bins_per_octave: 12, res_type: :auto, normalize: false, clip: nil) ⇒ Numo::SFloat
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/muze/effects/time_stretch.rb', line 45 def pitch_shift(y, sr: 22_050, n_steps: 0, bins_per_octave: 12, res_type: :auto, normalize: false, clip: nil) validate_positive_integer!(sr, "sr") validate_positive_number!(bins_per_octave, "bins_per_octave") raise Muze::ParameterError, "n_steps must be finite" unless n_steps.respond_to?(:finite?) && n_steps.finite? raise Muze::ParameterError, "normalize must be true or false" unless [true, false].include?(normalize) validate_positive_number!(clip, "clip") if clip signal = Muze::Core::Audio.validate_audio!(y, allow_empty: true) return apply_channels(signal) { |channel| pitch_shift(channel, sr:, n_steps:, bins_per_octave:, res_type:, normalize:, clip:) } if signal.ndim == 2 return signal if n_steps.zero? rate = 2.0**(-n_steps.to_f / bins_per_octave) stretched = time_stretch(signal, rate:) effective_res_type = res_type == :auto ? (signal.size >= MIN_PHASE_VOCODER_SAMPLES ? :sinc : :linear) : res_type restored = resample_for_pitch_shift(stretched, target_size: signal.size, sr:, rate:, res_type: effective_res_type) output = Numo::SFloat.cast(restored[0...signal.size]) output = normalize_peak(output) if normalize output = output.clip(-clip, clip) if clip output end |
.pitch_shift_stream(chunks, sr: 22_050, n_steps: 0, bins_per_octave: 12, res_type: :auto, normalize: false, clip: nil, overlap: 2048) ⇒ Object
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/muze/effects/streaming.rb', line 20 def pitch_shift_stream(chunks, sr: 22_050, n_steps: 0, bins_per_octave: 12, res_type: :auto, normalize: false, clip: nil, overlap: 2048) return enum_for(__method__, chunks, sr:, n_steps:, bins_per_octave:, res_type:, normalize:, clip:, overlap:) unless block_given? validate_stream_overlap!(overlap) stream_effect_chunks(chunks, overlap:) do |working, prefix_frames| shifted = pitch_shift(working, sr:, n_steps:, bins_per_octave:, res_type:, normalize:, clip:) yield drop_audio_frames(shifted, prefix_frames) end nil end |
.preemphasis(y, coef: 0.97) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/muze/effects/time_stretch.rb', line 102 def preemphasis(y, coef: 0.97) validate_finite_number!(coef, "coef") matrix = Muze::Core::Audio.validate_audio!(y, allow_empty: true) return apply_channels(matrix) { |channel| preemphasis(channel, coef:) } if matrix.ndim == 2 signal = matrix.to_a return Numo::SFloat.cast(signal) if signal.empty? output = Array.new(signal.length, 0.0) output[0] = signal[0] (1...signal.length).each { |index| output[index] = signal[index] - (coef * signal[index - 1]) } Numo::SFloat.cast(output) end |
.time_stretch(y, rate: 1.0, n_fft: nil, hop_length: nil, method: :phase_vocoder, phase_lock: false, force_phase_vocoder: false) ⇒ Numo::SFloat
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/muze/effects/time_stretch.rb', line 15 def time_stretch(y, rate: 1.0, n_fft: nil, hop_length: nil, method: :phase_vocoder, phase_lock: false, force_phase_vocoder: false) validate_positive_number!(rate, "rate") unless rate.between?(MIN_TIME_STRETCH_RATE, MAX_TIME_STRETCH_RATE) raise Muze::ParameterError, "rate must be between #{MIN_TIME_STRETCH_RATE} and #{MAX_TIME_STRETCH_RATE}" end validate_optional_positive_integer!(n_fft, "n_fft") validate_optional_positive_integer!(hop_length, "hop_length") raise Muze::ParameterError, "method must be :phase_vocoder, :ola, :wsola, or :linear" unless %i[phase_vocoder ola wsola linear].include?(method) signal = Muze::Core::Audio.validate_audio!(y, allow_empty: true) return apply_channels(signal) { |channel| time_stretch(channel, rate:, n_fft:, hop_length:, method:, phase_lock:, force_phase_vocoder:) } if signal.ndim == 2 return signal if signal.empty? || rate == 1.0 return linear_time_stretch(signal.to_a, rate) if method == :linear return ola_time_stretch(signal.to_a, rate) if method == :ola return wsola_time_stretch(signal.to_a, rate) if method == :wsola n_fft ||= phase_vocoder_fft_size(signal.size) hop_length ||= [n_fft / 4, 1].max stft_matrix = Muze::Core::STFT.stft(signal, n_fft:, hop_length:, center: true) stretched_stft = phase_vocoder(stft_matrix, rate:, hop_length:, n_fft:, phase_lock:) target_length = [(signal.size / rate).round, 1].max Muze::Core::STFT.istft(stretched_stft, hop_length:, center: true, length: target_length) end |
.time_stretch_stream(chunks, rate: 1.0, n_fft: nil, hop_length: nil, method: :phase_vocoder, phase_lock: false, force_phase_vocoder: false, overlap: 2048) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/muze/effects/streaming.rb', line 7 def time_stretch_stream(chunks, rate: 1.0, n_fft: nil, hop_length: nil, method: :phase_vocoder, phase_lock: false, force_phase_vocoder: false, overlap: 2048) return enum_for(__method__, chunks, rate:, n_fft:, hop_length:, method:, phase_lock:, force_phase_vocoder:, overlap:) unless block_given? validate_positive_number!(rate, "rate") validate_stream_overlap!(overlap) stream_effect_chunks(chunks, overlap:) do |working, prefix_frames| stretched = time_stretch(working, rate:, n_fft:, hop_length:, method:, phase_lock:, force_phase_vocoder:) drop = [(prefix_frames / rate).round, audio_frame_count(stretched)].min yield drop_audio_frames(stretched, drop) end nil end |
.trim(y, top_db: 60, frame_length: 2048, hop_length: 512, ref: :max, aggregate: :mean, units: :samples, sr: nil) ⇒ Array(Numo::SFloat, Array<Integer>)
Returns trimmed signal and [start, end].
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/muze/effects/time_stretch.rb', line 71 def trim(y, top_db: 60, frame_length: 2048, hop_length: 512, ref: :max, aggregate: :mean, units: :samples, sr: nil) raise Muze::ParameterError, "top_db must be non-negative" if top_db.negative? raise Muze::ParameterError, "frame_length and hop_length must be positive" unless frame_length.positive? && hop_length.positive? raise Muze::ParameterError, "aggregate must be :mean or :max" unless %i[mean max].include?(aggregate) validate_trim_units!(units:, sr:, hop_length:) signal = Muze::Core::Audio.validate_audio!(y, allow_empty: true) amplitude = sample_amplitude(signal, aggregate:) frames = Muze::Core::Frames.slice(amplitude, frame_length:, hop_length:, pad_end: true) energies = frames.map do |frame| values = frame.map { |value| value * value } aggregate == :max ? Math.sqrt(values.max || 0.0) : Math.sqrt(values.sum(0.0) / frame.length) end reference = trim_reference(energies, ref:) threshold = [reference, 1.0e-12].max * (10.0**(-top_db / 20.0)) active_frames = energies.each_index.select { |index| energies[index] >= threshold } return [Numo::SFloat[], [0, 0]] if active_frames.empty? search_start = active_frames.first * hop_length sample_count = amplitude.length search_end = [(active_frames.last * hop_length) + frame_length, sample_count].min active_samples = (search_start...search_end).select { |index| amplitude[index] >= threshold } empty = signal.ndim == 2 ? Numo::SFloat.zeros(0, signal.shape[1]) : Numo::SFloat[] return [empty, [0, 0]] if active_samples.empty? start_sample = active_samples.first end_sample = active_samples.last + 1 trimmed = signal.ndim == 2 ? signal[start_sample...end_sample, true] : signal[start_sample...end_sample] [trimmed, convert_trim_interval(start_sample, end_sample, units:, sr:, hop_length:)] end |