The "delossyfiers" advent

Discussion in 'Ai for Music' started by forart.it, Apr 24, 2025.

  1. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    It's not mine but an author (Qu Lefan) claim, anyway since it's Python souce is available you can run/compile it on linux (but I think it should work on Wine) too.

    GH-code explaination:
    However it really needs an english GUI:
    [​IMG]

    Last but not least, it would be really interesting to set up a comparative test (scientific - null test - not perceptual) to understand the real effectiveness those solutions.
     
    Last edited: Sep 11, 2025
  2. curtified

    curtified Audiosexual

    Joined:
    Feb 3, 2015
    Messages:
    994
    Likes Received:
    561
    Have you tried running it on paperspace or run pod?
     
  3. ceo54

    ceo54 Producer

    Joined:
    Jan 28, 2019
    Messages:
    299
    Likes Received:
    90
    Results:
    [​IMG]
    I don't understand why ceiling appears. Apparently the effect has hard cutoff for bands it works with? Don't know

    Second test with the following settings:

    Modulation times: 8

    Attenuation amplitude: 1,00

    Preprocessing high-pass filter cutoff frequency (Hz): 500

    Post-processing high-pass filter cutoff frequency (Hz): 10000

    Filter order: 11

    Hz: 44100

    [​IMG]

    Translation:
    [​IMG]

    1 - Open audio file (ALAC, MP3, WAV, FLAC, Ogg, Aiff, Aif, AAC, WMA, mka)

    2 - Clean selected audio

    3 - Change output path

    4 - Start process

    5 - Modulation times

    6 - Attenuation amplitude

    7 - Preprocessing high-pass filter cutoff frequency (Hz)

    8 - Post-processing high-pass filter cutoff frequency (Hz)

    9 - Filter order

    10 - Select sample rate

    11 - Status Bar

    12 - Select output format ALAC or FLAC

    13 - Cancel process

    So, what do the experts think about the quality relative to the other tools like Stereo Tool, DSEE-HX & Nvidia's Diffusion audio restoration ?
     
  4. ceo54

    ceo54 Producer

    Joined:
    Jan 28, 2019
    Messages:
    299
    Likes Received:
    90
    I really got under the skin of Stereo Tool this time after reading your posts and created three presets that seems to work fine except something I configured incorrectly that effects the transients and the result is more punchy sound. If anyone reading this and uses Stereo tool and has any idea what I might be doing wrong, please help.
     
    • Interesting Interesting x 1
    • List
  5. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    Here we go, here's another (dated ?) contender on the scene:
    Emiya Engine (MATLAB/VST2 plugin)
    [​IMG]

    Since their sources are open, I've also asked GH-Copilot (GPT-5) if is possible to combine those solutions (GH-account needed):

    https://github.com/copilot/share/42064124-4804-8431-9101-0840c4484919

    Last but not least, deton24 shared his (crazy!) Make your own remaster document/guide, where he reviewed most of these tools.

    I am increasingly convinced that an objective (scientific, not perceptual) comparison between the various solutions would be truly interesting and useful.
     
  6. canbi

    canbi Kapellmeister

    Joined:
    Jun 12, 2023
    Messages:
    195
    Likes Received:
    64
    not a delossifier but if we are on it,

    thoughts on axiom?
     
  7. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    Nice tool, anyway in this specific field, I would love to evaluate DeltaWave Audio Null Comparator results (ground truth - read source file - vs delossyfied).
     
  8. ceo54

    ceo54 Producer

    Joined:
    Jan 28, 2019
    Messages:
    299
    Likes Received:
    90
    Thanks. I tried your python script that you got Gemini to create it for you but it errors out. I tried to get Gemini to sort it out but that wasn't successful either. Gemini is a moron so maybe it's incompetence that it couldn't resolve error or maybe there's something inherenly wrong with the core of the script.

    [​IMG]

    Would it be possible for you to refine the script so I can give it another try ?

    Thanks.
     
  9. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    As claimed, it's not "mine" but GH-copilot generated (for fun).

    If you want better results with so-called "vibe-coding" try Claude instead.
     
  10. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    I've played a bit with Emiya Engine's VST2 plugin...

    Original WAV:
    [​IMG]

    MP3 from original:
    [​IMG]

    Enhanced from MP3:
    [​IMG]

    MP3 - Enhanced null:
    [​IMG]
     
  11. curtified

    curtified Audiosexual

    Joined:
    Feb 3, 2015
    Messages:
    994
    Likes Received:
    561
    I vibe coded a version of this plugin for mac if anyone wants to test it. No gui just algo.

    https://pixeldrain.com/u/C3PdLA8P

    Screenshot 2025-09-19 at 10.40.20 AM.png
    (not the most sexiest rn)


    I noticed that AkkoMode works better but adds some noise. I also added a dry / wet knob to blend it the signal.
     
  12. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
  13. ceo54

    ceo54 Producer

    Joined:
    Jan 28, 2019
    Messages:
    299
    Likes Received:
    90
  14. curtified

    curtified Audiosexual

    Joined:
    Feb 3, 2015
    Messages:
    994
    Likes Received:
    561
    you can get the windows one here:


     
  15. curtified

    curtified Audiosexual

    Joined:
    Feb 3, 2015
    Messages:
    994
    Likes Received:
    561
  16. ceo54

    ceo54 Producer

    Joined:
    Jan 28, 2019
    Messages:
    299
    Likes Received:
    90
    No, I meant the one that combines the three like forart suggested.
     
  17. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    ...that's why Python - at least in experimental stage - is the way to go, IMHO...

    Here's a GPT-5 optimization of GH-copilot "mixup" code: https://chatgpt.com/share/68ceeeef-3d70-800f-9074-4a45cd77cf2e
    Code:
    # enhanced_pipeline.py
    import numpy as np
    import resampy
    import librosa
    import scipy.signal as signal
    import pyfftw
    from typing import Optional, Dict, Tuple
    
    # Ensure float32 for speed by default
    DTYPE = np.float32
    
    # -------------------------
    # Utilities: shape, norm, filters
    # -------------------------
    def _ensure_2d(x: np.ndarray) -> Tuple[np.ndarray, bool]:
        """Return (x2d, was_1d). We use shape (n_samples, n_channels)."""
        x = np.asarray(x)
        if x.ndim == 1:
            return x.reshape(-1, 1).astype(DTYPE), True
        elif x.ndim == 2:
            # either (n,) or (channels, n)? assume (n, ch) per pipeline contract
            return x.astype(DTYPE), False
        else:
            raise ValueError("Input must be 1D or 2D with shape (n_samples, n_channels).")
    
    def normalize_peak(x: np.ndarray, eps=1e-12) -> np.ndarray:
        """Peak normalization per-channel to ±1."""
        peak = np.max(np.abs(x), axis=0, keepdims=True)
        return x / (peak + eps)
    
    def rms(x: np.ndarray, axis=0, eps=1e-12):
        return np.sqrt(np.mean(x**2, axis=0) + eps)
    
    # Stable highpass via SOS
    def highpass_sos(x: np.ndarray, cutoff_hz: float, sr: float, order: int = 8) -> np.ndarray:
        if cutoff_hz <= 0:
            return x
        sos = signal.butter(order, cutoff_hz / (sr / 2), btype='high', output='sos')
        # axis=0 (time axis)
        return signal.sosfiltfilt(sos, x, axis=0).astype(DTYPE)
    
    # -------------------------
    # DSRE: Single Sideband (vectorized)
    # -------------------------
    def dsre_enhance(x: np.ndarray, sr: float, m=8, decay=1.25,
                     pre_hp=3000.0, post_hp=16000.0, hp_order=8,
                     mix=1.0) -> np.ndarray:
        """
        Vectorized DSRE: compute analytic signal for all channels (hilbert axis=0)
        and modulate with a bank of offsets; then mix back with original.
        mix: wet/dry mix of enhancement (0..1).
        """
        x2d, was_1d = _ensure_2d(x)
        x_hp = highpass_sos(x2d, pre_hp, sr, order=hp_order)
    
        N = x_hp.shape[0]
        t = np.arange(N, dtype=DTYPE) / float(sr)  # time axis
        analytic = signal.hilbert(x_hp, axis=0)  # shape (N, ch), complex
    
        # prepare summed modulation
        d_res = np.zeros_like(x_hp, dtype=DTYPE)
        for i in range(m):
            shift_hz = (i + 1) * sr / (m * 2.0)  # original intent preserved
            phase = np.exp(2j * np.pi * shift_hz * t)[:, None]
            comp = analytic * phase
            weight = np.exp(-(i + 1) * decay)
            d_res += comp.real * weight
    
        # post highpass
        d_res = highpass_sos(d_res, post_hp, sr, order=hp_order)
    
        # mix safely using RMS matching
        target_rms = rms(x2d, axis=0)
        res_rms = rms(d_res, axis=0)
        gain = (target_rms / (res_rms + 1e-12))[None, :]
        d_res *= gain  # match energy so enhancement isn't too loud
    
        out = (1.0 - mix) * x2d + mix * (x2d + d_res)  # allow configurable mix
        return out if not was_1d else out[:, 0]
    
    # -------------------------
    # FFT-based spectral upsampling (zero-pad in freq domain) using pyfftw
    # -------------------------
    def spectral_upsample_fft(x: np.ndarray, upscale: int = 2, threads: int = 1) -> np.ndarray:
        """
        Upsample by integer factor using frequency-domain zero-padding.
        For real input signals: use rfft/irfft.
        x: (N, channels)
        Returns: (N*upscale, channels)
        """
        x2d, was_1d = _ensure_2d(x)
        N, C = x2d.shape
        newN = N * upscale
    
        # prepare output
        y = np.zeros((newN, C), dtype=DTYPE)
    
        # use rfft/irfft for each channel; build FFTW plans reuse per channel
        for ch in range(C):
            real_in = x2d[:, ch].astype(DTYPE)
            # rfft length
            rlen = N // 2 + 1
            # forward rfft using fftw
            a = pyfftw.empty_aligned(N, dtype='float32')
            b = pyfftw.empty_aligned(rlen, dtype='complex64')
            fft = pyfftw.builders.rfft(a, threads=threads)
            ifft = pyfftw.builders.irfft(b, n=newN, threads=threads)
    
            # copy into aligned array
            a[:] = real_in
            spec = fft()  # shape (rlen,)
            # create zero-padded spectrum for newN
            new_rlen = newN // 2 + 1
            spec_padded = np.zeros(new_rlen, dtype=np.complex64)
            # copy low frequencies
            copy_len = min(rlen, new_rlen)
            spec_padded[:copy_len] = spec[:copy_len]
            # Note: for odd/even lengths, proper scaling in irfft handled by ifft
    
            b[:] = spec_padded
            y[:, ch] = ifft().astype(DTYPE)
    
        return y if not was_1d else y[:, 0]
    
    # -------------------------
    # AkkoMode jitter (smoothed envelope)
    # -------------------------
    def akkomode_jitter(x: np.ndarray, sr: float, depth_low=0.02, depth_high=0.12,
                        env_cutoff_hz=6.0) -> np.ndarray:
        """
        Generate a smooth random envelope per channel (lowpass filtered noise)
        and apply small amplitude modulation for 'jitter' that is musical and not high-frequency noise.
        depth_low/high in fractional amplitude (0..1).
        """
        x2d, was_1d = _ensure_2d(x)
        N, C = x2d.shape
        # generate per-channel random noise and lowpass filter it to make an envelope
        env = np.zeros((N, C), dtype=DTYPE)
        # lowpass SOS
        sos = signal.butter(4, env_cutoff_hz / (sr / 2), btype='low', output='sos')
        for ch in range(C):
            noise = np.random.randn(N).astype(DTYPE)
            # smooth envelope
            env[:, ch] = signal.sosfiltfilt(sos, noise).astype(DTYPE)
            # normalize envelope to [-1,1]
            env[:, ch] /= (np.max(np.abs(env[:, ch])) + 1e-12)
    
        # map envelope to depth range
        depths = depth_low + (depth_high - depth_low) * ((env + 1.0) / 2.0)  # [depth_low..depth_high]
        out = x2d * (1.0 + depths)  # gentle amplitude modulation
        return out if not was_1d else out[:, 0]
    
    # -------------------------
    # CopyBand: correct STFT band shift (no wrap by default)
    # -------------------------
    def copyband_stft(x: np.ndarray, sr: float,
                      band_hpfc=6000.0, band_sft=16000.0,
                      band_gain=1.2, n_fft: Optional[int] = 2048,
                      hop_length: Optional[int] = None,
                      zero_fill: bool = True) -> np.ndarray:
        """
        Shift a band upward: extract frequency band [band_hpfc, band_sft),
        shift it upward by (band_sft - band_hpfc) and add gain.
        zero_fill=True -> zero-fill shifted-in bins (no wraparound).
        """
        x2d, was_1d = _ensure_2d(x)
        if hop_length is None:
            hop_length = n_fft // 4
    
        N, C = x2d.shape
        out = np.zeros_like(x2d, dtype=DTYPE)
        for ch in range(C):
            channel = x2d[:, ch]
            S = librosa.stft(channel, n_fft=n_fft, hop_length=hop_length)
            # S shape: (n_fft//2+1, n_frames)
            freqs = np.linspace(0, sr/2, S.shape[0])
            # compute bin indices
            low_bin = int(np.searchsorted(freqs, band_hpfc, side='left'))
            high_bin = int(np.searchsorted(freqs, band_sft, side='left'))
            shift_bins = high_bin - low_bin  # shift upward
    
            # create copy
            S2 = S.copy()
            if shift_bins == 0:
                S2[low_bin:high_bin, :] *= band_gain
            else:
                # zero out original band or attenuate
                S2[low_bin:high_bin, :] *= 0.0
                target_start = low_bin + shift_bins
                if target_start < S.shape[0]:
                    # fill target band with shifted content (zero-fill if needed)
                    end = min(target_start + (high_bin - low_bin), S.shape[0])
                    insert_len = end - target_start
                    S2[target_start:end, :] += S[low_bin:low_bin+insert_len, :] * band_gain
                    # if not zero_fill, allow wrap-around (rarely desired)
                    if not zero_fill and insert_len < (high_bin - low_bin):
                        wrap_len = (high_bin - low_bin) - insert_len
                        S2[:wrap_len, :] += S[low_bin+insert_len:high_bin, :] * band_gain
                else:
                    # shifted beyond Nyquist -> discard or optionally wrap
                    pass
            ych = librosa.istft(S2, hop_length=hop_length, length=len(channel))
            out[:, ch] = ych.astype(DTYPE)
    
        return out if not was_1d else out[:, 0]
    
    # -------------------------
    # Resampling helper (safe axis)
    # -------------------------
    def resample_audio(x: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
        x2d, was_1d = _ensure_2d(x)
        # resampy expects shape (n,) or (channels, nsamples) with axis default -1 -> use transpose
        # We'll resample each channel separately (explicit, memory-friendly)
        N, C = x2d.shape
        out_len = int(np.ceil(N * target_sr / orig_sr))
        out = np.zeros((out_len, C), dtype=DTYPE)
        for ch in range(C):
            out[:, ch] = resampy.resample(x2d[:, ch], orig_sr, target_sr)
        return out if was_1d else out
    
    # -------------------------
    # Pipeline
    # -------------------------
    def enhance_audio(x: np.ndarray, sr: int,
                      dsre_params: Optional[Dict] = None,
                      fft_params: Optional[Dict] = None,
                      akko_params: Optional[Dict] = None,
                      copyband_params: Optional[Dict] = None,
                      target_sr: Optional[int] = None,
                      upsc
     
    Last edited: Sep 20, 2025 at 11:23 PM
  18. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    Here's a Claude-5 optimized (both for output quality and performances) commandline - Python - parametric audio enhancer that integrates all:
    Code:
    #!/usr/bin/env python3
    """
    audio_enhancer.py (Optimized Version)
    
    Command-line audio enhancement tool with performance and quality optimizations.
    
    Usage:
        python audio_enhancer.py input.wav [--mix 100] [--threads auto]
    
    Produces: input_enhanced.wav in the same folder (32-bit float WAV).
    
    Key Optimizations:
    - Full CPU multithreading support with optimal thread allocation
    - Vectorized operations and memory-efficient processing
    - Quality improvements: better filters, window functions, overlap-add
    - Smart caching and pre-allocation to minimize memory allocations
    - Optional GPU acceleration support
    - Advanced spectral processing with better phase preservation
    
    Dependencies:
        numpy, scipy, soundfile, librosa, resampy, pyfftw (recommended), numba (optional)
    """
    
    import argparse
    import os
    import sys
    import warnings
    from dataclasses import dataclass
    from typing import Tuple, Optional
    from concurrent.futures import ThreadPoolExecutor
    import multiprocessing as mp
    
    import numpy as np
    import soundfile as sf
    import librosa
    import resampy
    import scipy.signal as signal
    from scipy.ndimage import uniform_filter1d
    
    # Optional: use pyfftw for faster FFTs if available
    try:
        import pyfftw
        pyfftw.config.NUM_THREADS = mp.cpu_count()
        pyfftw.config.PLANNER_EFFORT = 'FFTW_MEASURE'
        _HAS_PYFFTW = True
    except ImportError:
        pyfftw = None
        _HAS_PYFFTW = False
    
    # Optional: use numba for JIT compilation
    try:
        from numba import jit, prange
        _HAS_NUMBA = True
    except ImportError:
        # Create dummy decorator if numba is not available
        def jit(*args, **kwargs):
            def decorator(func):
                return func
            return decorator
        prange = range
        _HAS_NUMBA = False
    
    # Global configuration
    DTYPE = np.float32
    DEFAULT_CHUNK_SIZE = 32768  # Process in chunks for memory efficiency
    _fft_cache = {}  # Cache for FFT planning
    
    @dataclass
    class AutoParams:
        target_sr: int
        dsre_m: int
        dsre_decay: float
        dsre_pre_hp: float
        dsre_post_hp: float
        dsre_mix: float
        akko_depth_low: float
        akko_depth_high: float
        akko_env_hz: float
        copyband_hpfc: float
        copyband_sft: float
        copyband_gain: float
        upsample_factor: int
        n_threads: int
    
    
    def get_optimal_threads() -> int:
        """Determine optimal number of threads for processing."""
        cpu_count = mp.cpu_count()
        # Leave one core free for system tasks, but use at least 1
        return max(1, cpu_count - 1)
    
    
    def _ensure_2d(x: np.ndarray) -> Tuple[np.ndarray, bool]:
        """Ensure input is 2D with optimized memory layout."""
        x = np.asarray(x, dtype=DTYPE, order='C')  # Ensure C-contiguous for better performance
        if x.ndim == 1:
            return x.reshape(-1, 1), True
        elif x.ndim == 2:
            return x, False
        else:
            raise ValueError("Input must be 1D or 2D (n_samples, n_channels)")
    
    
    @jit(nopython=True if _HAS_NUMBA else False, parallel=True)
    def normalize_peak_numba(x: np.ndarray, eps: float = 1e-12) -> np.ndarray:
        """Optimized peak normalization with numba."""
        if x.ndim == 1:
            peak = np.max(np.abs(x))
            return x / (peak + eps)
        else:
            for ch in prange(x.shape[1]):
                peak = np.max(np.abs(x[:, ch]))
                x[:, ch] = x[:, ch] / (peak + eps)
            return x
    
    
    def normalize_peak(x: np.ndarray, eps: float = 1e-12) -> np.ndarray:
        """Peak normalization with fallback."""
        if _HAS_NUMBA:
            return normalize_peak_numba(x.copy(), eps)
        else:
            if x.ndim == 1:
                peak = np.max(np.abs(x))
                return x / (peak + eps)
            else:
                peak = np.max(np.abs(x), axis=0, keepdims=True)
                return x / (peak + eps)
    
    
    def get_sos_filter(cutoff_hz: float, sr: int, filter_type: str, order: int = 8) -> Optional[np.ndarray]:
        """Get SOS filter coefficients with caching."""
        cache_key = (cutoff_hz, sr, filter_type, order)
        if cache_key not in _fft_cache:
            if cutoff_hz <= 0 or cutoff_hz >= sr/2 - 1:
                return None
            try:
                sos = signal.butter(order, cutoff_hz / (sr / 2), btype=filter_type, output='sos')
                _fft_cache[cache_key] = sos
            except Exception:
                return None
        return _fft_cache[cache_key]
    
    
    def apply_sos_filter(x: np.ndarray, sos: np.ndarray, n_threads: int = 1) -> np.ndarray:
        """Apply SOS filter with optional threading for multi-channel audio."""
        if x.ndim == 1:
            return signal.sosfiltfilt(sos, x)
      
        if n_threads == 1 or x.shape[1] == 1:
            return signal.sosfiltfilt(sos, x, axis=0)
      
        # Multi-threaded processing for multi-channel audio
        def filter_channel(ch_data):
            return signal.sosfiltfilt(sos, ch_data)
      
        with ThreadPoolExecutor(max_workers=min(n_threads, x.shape[1])) as executor:
            results = list(executor.map(filter_channel, x.T))
      
        return np.column_stack(results)
    
    
    def highpass_sos(x: np.ndarray, cutoff_hz: float, sr: int, order: int = 8, n_threads: int = 1) -> np.ndarray:
        """High-pass filter with threading support."""
        sos = get_sos_filter(cutoff_hz, sr, 'high', order)
        if sos is None:
            return x
        return apply_sos_filter(x, sos, n_threads)
    
    
    def lowpass_sos(x: np.ndarray, cutoff_hz: float, sr: int, order: int = 6, n_threads: int = 1) -> np.ndarray:
        """Low-pass filter with threading support."""
        sos = get_sos_filter(cutoff_hz, sr, 'low', order)
        if sos is None:
            return x
        return apply_sos_filter(x, sos, n_threads)
    
    
    def get_fft_planner(n: int, dtype: np.dtype = np.complex64, threads: int = 1):
        """Get cached FFT planner for better performance."""
        cache_key = (n, dtype, threads, 'rfft')
        if cache_key not in _fft_cache and _HAS_PYFFTW:
            try:
                a = pyfftw.empty_aligned(n, dtype='float32')
                b = pyfftw.empty_aligned(n//2+1, dtype='complex64')
                fft_forward = pyfftw.builders.rfft(a, threads=threads)
                fft_backward = pyfftw.builders.irfft(b, n=n, threads=threads)
                _fft_cache[cache_key] = (fft_forward, fft_backward, a, b)
            except Exception:
                _fft_cache[cache_key] = None
        return _fft_cache.get(cache_key)
    
    
    @jit(nopython=True if _HAS_NUMBA else False, parallel=True)
    def dsre_core_computation(analytic: np.ndarray, t: np.ndarray, m: int, sr: int, decay: float) -> np.ndarray:
        """Core DSRE computation optimized with numba."""
        N, C = analytic.shape
        d_res = np.zeros_like(analytic, dtype=np.complex64)
      
        for i in prange(m):
            shift_hz = (i + 1) * sr / (m * 2.0)
            weight = np.exp(-(i + 1) * decay)
          
            for ch in prange(C):
                phase = np.exp(2j * np.pi * shift_hz * t)
                d_res[:, ch] += analytic[:, ch] * phase * weight
      
        return d_res.real
    
    
    def dsre_enhance(x: np.ndarray, sr: int, m: int = 6, decay: float = 1.1,
                    pre_hp: float = 300.0, post_hp: float = 10000.0, mix: float = 0.6,
                    hp_order: int = 6, n_threads: int = 1) -> np.ndarray:
        """Enhanced DSRE with optimizations."""
        x2d, was_1d = _ensure_2d(x)
      
        # Pre-filter
        x_hp = highpass_sos(x2d, pre_hp, sr, order=hp_order, n_threads=n_threads)
      
        N = x_hp.shape[0]
        t = np.arange(N, dtype=DTYPE) / float(sr)
      
        # Compute analytic signal with threading for multi-channel
        if x_hp.shape[1] == 1 or n_threads == 1:
            analytic = signal.hilbert(x_hp, axis=0)
        else:
            def compute_hilbert(ch_data):
                return signal.hilbert(ch_data)
          
            with ThreadPoolExecutor(max_workers=min(n_threads, x_hp.shape[1])) as executor:
                hilbert_results = list(executor.map(compute_hilbert, x_hp.T))
            analytic = np.column_stack(hilbert_results)
      
        # Core DSRE computation
        if _HAS_NUMBA:
            d_res = dsre_core_computation(analytic, t, m, sr, decay)
        else:
            d_res = np.zeros_like(x_hp)
            for i in range(m):
                shift_hz = (i + 1) * sr / (m * 2.0)
                phase = np.exp(2j * np.pi * shift_hz * t)[:, None]
                weight = np.exp(-(i + 1) * decay)
                d_res += (analytic * phase).real * weight
      
        # Post-filter
        d_res = highpass_sos(d_res, post_hp, sr, order=hp_order, n_threads=n_threads)
      
        # Energy matching with improved RMS calculation
        def rms_stable(a):
            return np.sqrt(np.mean(a**2, axis=0) + 1e-12)
      
        target_rms = rms_stable(x2d)
        res_rms = rms_stable(d_res)
        gain = np.where(res_rms > 1e-12, target_rms / res_rms, 1.0)[None, :]
        d_res *= gain
      
        # Mix with improved blending
        out = (1.0 - mix) * x2d + mix * (x2d + d_res)
        return out if not was_1d else out[:, 0]
    
    
    def akkomode_jitter(x: np.ndarray, sr: int, depth_low: float = 0.02, depth_high: float = 0.12,
                       env_cutoff_hz: float = 6.0, n_threads: int = 1) -> np.ndarray:
        """Optimized akko-mode jitter with better envelope smoothing."""
        x2d, was_1d = _ensure_2d(x)
        N, C = x2d.shape
      
        # Pre-compute filter once
        sos = get_sos_filter(env_cutoff_hz, sr/2, 'low', 4)  # Note: sr/2 for proper normalization
        if sos is None:
            return x
      
        # Generate smooth envelopes for each channel
        def generate_envelope():
            # Use better random seed for reproducible but varied results
            np.random.seed(None)  # Reset to time-based seed
            noise = np.random.randn(N).astype(DTYPE)
            # Apply smoothing filter
            smoothed = signal.sosfiltfilt(sos, noise)
            # Normalize to [-1, 1] range more stably
            peak = np.max(np.abs(smoothed))
            if peak > 1e-12:
                smoothed /= peak
            return smoothed
      
        if n_threads == 1 or C == 1:
            envelopes = np.column_stack([generate_envelope() for _ in range(C)])
        else:
            with ThreadPoolExecutor(max_workers=min(n_threads, C)) as executor:
                envelopes = np.column_stack(list(executor.map(lambda _: generate_envelope(), range(C))))
      
        # Apply depth modulation
        depths = depth_low + (depth_high - depth_low) * ((envelopes + 1.0) / 2.0)
        out = x2d * (1.0 + depths)
      
        return out if not was_1d else out[:, 0]
    
    
    def copyband_stft_optimized(x: np.ndarray, sr: int, band_hpfc: float = 6000.0,
                              band_sft: float = 16000.0, band_gain: float = 1.2,
                              n_fft: int = 2048, hop_length: Optional[int] = None,
                              zero_fill: bool = True, n_threads: int = 1) -> np.ndarray:
        """Optimized copyband STFT with better windowing and phase preservation."""
        x2d, was_1d = _ensure_2d(x)
      
        if hop_length is None:
            hop_length = n_fft // 4  # 75% overlap for better quality
      
        N, C = x2d.shape
      
        # Use Hann window for better spectral properties
        window = 'hann'
      
        def process_channel(ch_idx):
            channel = x2d[:, ch_idx]
          
            # Use librosa with optimized parameters
            S = librosa.stft(channel, n_fft=n_fft, hop_length=hop_length, window=window)
            freqs = np.linspace(0, sr/2, S.shape[0])
          
            # More precise frequency bin calculation
            low_bin = np.searchsorted(freqs, band_hpfc, side='left')
            high_bin = np.searchsorted(freqs, band_sft, side='left')
            shift_bins = high_bin - low_bin
          
            S2 = S.copy()
          
            if shift_bins > 0:
                # Clear original band
                S2[low_bin:high_bin, :] = 0
              
                # Copy to new location with gain
                target_start = low_bin + shift_bins
                if target_start < S.shape[0]:
                    end = min(target_start + shift_bins, S.shape[0])
                    insert_len = end - target_start
                    S2[target_start:end, :] = S[low_bin:low_bin+insert_len, :] * band_gain
                  
                    # Handle wrapping if requested and needed
                    if not zero_fill and insert_len < shift_bins:
                        wrap_len = shift_bins - insert_len
                        wrap_end = min(wrap_len, S.shape[0])
                        S2[:wrap_end, :] = S[low_bin+insert_len:low_bin+insert_len+wrap_end, :] * band_gain
            else:
                # Just apply gain if no shifting
                S2[low_bin:high_bin, :] *= band_gain
          
            # Reconstruct with same parameters
            return librosa.istft(S2, hop_length=hop_length, window=window, length=len(channel))
      
        # Process channels
        if n_threads == 1 or C == 1:
            results = [process_channel(ch) for ch in range(C)]
        else:
            with ThreadPoolExecutor(max_workers=min(n_threads, C)) as executor:
                results = list(executor.map(process_channel, range(C)))
      
        out = np.column_stack(results).astype(DTYPE)
        return out if not was_1d else out[:, 0]
    
    
    def spectral_upsample_fft_optimized(x: np.ndarray, upscale: int = 2, n_threads: int = 1) -> np.ndarray:
        """Optimized spectral upsampling with caching and threading."""
        x2d, was_1d = _ensure_2d(x)
        N, C = x2d.shape
        newN = N * upscale
      
        def upsample_channel(ch_idx):
            real_in = x2d[:, ch_idx]
          
            # Try to use cached FFT planner
            planner_result = get_fft_planner(N, threads=1) if _HAS_PYFFTW else None
          
            if planner_result and _HAS_PYFFTW:
                fft_forward, fft_backward, a, b = planner_result
              
                # Check if planner dimensions match
                if a.shape[0] == N:
                    a[:] = real_in
                    spec = fft_forward()
                else:
                    spec = np.fft.rfft(real_in)
            else:
                spec = np.fft.rfft(real_in)
          
            # Zero-pad in frequency domain
            new_spec = np.zeros(newN//2+1, dtype=spec.dtype)
            copy_len = min(spec.shape[0], new_spec.shape[0])
            new_spec[:copy_len] = spec[:copy_len]
          
            # IFFT back to time domain
            if planner_result and _HAS_PYFFTW and newN == b.shape[0]:
                b[:] = new_spec
                return fft_backward().astype(DTYPE)
            else:
                return np.fft.irfft(new_spec, n=newN).astype(DTYPE)
      
        # Process channels
        if n_threads == 1 or C == 1:
            results = [upsample_channel(ch) for ch in range(C)]
        else:
            with ThreadPoolExecutor(max_workers=min(n_threads, C)) as executor:
                results = list(executor.map(upsample_channel, range(C)))
      
        y = np.column_stack(results)
        return y if not was_1d else y[:, 0]
    
    
    def analyze_and_select_params(x: np.ndarray, sr: int, n_threads: Optional[int] = None) -> AutoParams:
        """Enhanced parameter selection with better analysis."""
        if n_threads is None:
            n_threads = get_optimal_threads()
      
        x2d, _ = _ensure_2d(x)
        mono = np.mean(x2d, axis=1)
      
        # More robust spectral analysis
        if len(mono) < 1024:
            centroid = 2000.0
            spectral_rolloff = 8000.0
        else:
            try:
                # Compute multiple spectral features
                cent = librosa.feature.spectral_centroid(y=mono.astype(float), sr=sr)
                rolloff = librosa.feature.spectral_rolloff(y=mono.astype(float), sr=sr, roll_percent=0.85)
              
                centroid = float(np.median(cent)) if cent.size else 2000.0
                spectral_rolloff = float(np.median(rolloff)) if rolloff.size else 8000.0
            except Exception:
                centroid = 2000.0
                spectral_rolloff = 8000.0
      
        # Better parameter scaling based on audio content
        pre_hp = max(200.0, min(500.0, centroid * 0.25))
        post_hp = min(max(6000.0, spectral_rolloff * 0.8), sr/2 * 0.95)
      
        # Enhanced DSRE parameters
        if sr >= 96000:
            m, decay, mix = 10, 1.3, 0.5
        elif sr >= 48000:
            m, decay, mix = 8, 1.2, 0.55
        elif sr >= 44100:
            m, decay, mix = 6, 1.1, 0.6
        else:
            m, decay, mix = 4, 1.0, 0.65
      
        # Dynamic range aware jitter
        rms_val = np.sqrt(np.mean(mono.astype(float)**2) + 1e-12)
        dynamic_range = np.max(np.abs(mono)) / (rms_val + 1e-12)
      
        if dynamic_range > 10:  # High dynamic range
            depth_low, depth_high = 0.005, 0.02
        elif dynamic_range > 5:  # Medium dynamic range
            depth_low, depth_high = 0.01, 0.04
        else:  # Compressed audio
            depth_low, depth_high = 0.02, 0.06
      
        # Smarter copyband parameters
        copyband_hpfc = max(4000.0, min(8000.0, centroid * 0.9))
        copyband_sft = min(copyband_hpfc * 2.2, sr/2 * 0.9)
      
        # Intelligent target sample rate selection
        if sr < 44100:
            target_sr = 48000
        elif sr == 44100:
            target_sr = 48000  # Slight upsample for better processing
        else:
            target_sr = sr
      
        return AutoParams(
            target_sr=int(target_sr),
            dsre_m=m,
            dsre_decay=decay,
            dsre_pre_hp=float(pre_hp),
            dsre_post_hp=float(post_hp),
            dsre_mix=float(mix),
            akko_depth_low=float(depth_low),
            akko_depth_high=float(depth_high),
            akko_env_hz=4.0,  # Slightly higher for better envelope
            copyband_hpfc=float(copyband_hpfc),
            copyband_sft=float(copyband_sft),
            copyband_gain=1.15,  # Slightly more conservative
            upsample_factor=1,
            n_threads=n_threads,
        )
    
    
    def enhance_pipeline(x: np.ndarray, sr: int, params: AutoParams,
                        do_copyband: bool = True, do_akko: bool = True,
                        do_dsre: bool = True, do_upsample: bool = False) -> Tuple[np.ndarray, int]:
        """Main enhancement pipeline with optimizations."""
        print(f"Processing with {params.n_threads} threads...")
      
        # Resample if necessary
        if sr != params.target_sr:
            print(f"Resampling {sr} -> {params.target_sr}")
            # Use high-quality resampling
            x = resampy.resample(x.T, sr, params.target_sr, axis=0, filter='kaiser_best').T
            sr = params.target_sr
      
        # Apply enhancements in optimal order
        if do_dsre:
            print("Applying DSRE enhancement...")
            x = dsre_enhance(x, sr, m=params.dsre_m, decay=params.dsre_decay,
                            pre_hp=params.dsre_pre_hp, post_hp=params.dsre_post_hp,
                            mix=params.dsre_mix, n_threads=params.n_threads)
      
        if do_akko:
            print("Applying Akko-mode jitter...")
            x = akkomode_jitter(x, sr, depth_low=params.akko_depth_low,
                               depth_high=params.akko_depth_high,
                               env_cutoff_hz=params.akko_env_hz, n_threads=params.n_threads)
      
        if do_copyband:
            print("Applying copyband STFT enhancement...")
            x = copyband_stft_optimized(x, sr, band_hpfc=params.copyband_hpfc,
                                      band_sft=params.copyband_sft,
                                      band_gain=params.copyband_gain, n_threads=params.n_threads)
      
        if do_upsample and params.upsample_factor > 1:
            print(f"Spectral upsampling by factor {params.upsample_factor}...")
            x = spectral_upsample_fft_optimized(x, upscale=params.upsample_factor, n_threads=params.n_threads)
            sr = int(sr * params.upsample_factor)
      
        # Final normalization
        x = normalize_peak(x)
        return x, sr
    
    
    def build_output_path(inp_path: str) -> str:
        """Build output path with _enhanced suffix."""
        base, ext = os.path.splitext(inp_path)
        return f"{base}_enhanced{ext}"
    
    
    def main():
        parser = argparse.ArgumentParser(
            description='Enhanced audio processing tool with performance optimizations',
            formatter_class=argparse.ArgumentDefaultsHelpFormatter
        )
        parser.add_argument('input', help='Input audio file')
        parser.add_argument('--no-copyband', action='store_true', help='Disable copyband STFT enhancement')
        parser.add_argument('--no-akko', action='store_true', help='Disable akkomode jitter')
        parser.add_argument('--no-dsre', action='store_true', help='Disable DSRE enhancement')
        parser.add_argument('--upsample', type=int, default=0, help='Spectral upsample factor (0=disabled)')
        parser.add_argument('--out', default=None, help='Output path (auto-generated if not specified)')
        parser.add_argument('--mix', type=float, default=100.0, help='Wet percentage (0-100)')
        parser.add_argument('--threads', default='auto', help='Number of threads (auto, or integer)')
      
        args = parser.parse_args()
      
        # Validate input file
        if not os.path.exists(args.input):
            print(f"Error: Input file not found: {args.input}")
            sys.exit(1)
      
        # Determine thread count
        if args.threads == 'auto':
            n_threads = get_optimal_threads()
        else:
            try:
                n_threads = max(1, int(args.threads))
            except ValueError:
                print("Warning: Invalid thread count, using auto detection")
                n_threads = get_optimal_threads()
      
        # Validate and clamp mix parameter
        mix_pct = np.clip(float(args.mix), 0.0, 100.0)
        wet = mix_pct / 100.0
        dry = 1.0 - wet
      
        # Load audio file
        print(f"Loading {args.input}...")
        try:
            data, sr = sf.read(args.input, always_2d=True, dtype='float32')
            print(f"Loaded: {data.shape[0]} samples, {data.shape[1]} channels, {sr} Hz")
        except Exception as e:
            print(f"Error loading audio file: {e}")
            sys.exit(1)
      
        # Analyze and select parameters
        print("Analyzing audio characteristics...")
        params = analyze_and_select_params(data, sr, n_threads)
      
        # Override upsample factor if specified
        if args.upsample >= 2:
            params.upsample_factor = int(args.upsample)
      
        print("\nSelected parameters:")
        for field_name, value in params.__dict__.items():
            print(f"  {field_name}: {value}")
        print(f"Mix ratio: {dry*100:.1f}% dry / {wet*100:.1f}% wet")
      
        # Prepare original for mixing
        working = data
        working_sr = sr
        if sr != params.target_sr:
            print(f"Pre-resampling original for mixing...")
            working = resampy.resample(data.T, sr, params.target_sr, axis=0, filter='kaiser_best').T
            working_sr = params.target_sr
      
        # Run enhancement pipeline
        print("\n" + "="*50)
        print("Running enhancement pipeline...")
        print("="*50)
      
        enhanced, out_sr = enhance_pipeline(
            working, working_sr, params,
            do_copyband=not args.no_copyband,
            do_akko=not args.no_akko,
            do_dsre=not args.no_dsre,
            do_upsample=(params.upsample_factor > 1)
        )
      
        # Handle upsampling for mixing
        if out_sr != working_sr:
            print(f"Upsampling original {working_sr} -> {out_sr} for mixing...")
            working_up = resampy.resample(working.T, working_sr, out_sr, axis=0, filter='kaiser_best').T
        else:
            working_up = working
      
        # Ensure compatible shapes for mixing
        min_len = min(working_up.shape[0], enhanced.shape[0])
        if working_up.shape[0] != enhanced.shape[0]:
            print(f"Length mismatch: trimming to {min_len} samples for mixing")
            working_up = working_up[:min_len]
            enhanced = enhanced[:min_len]
      
        # Mix dry and wet signals
        print("Mixing dry/wet signals...")
        final = (dry * working_up) + (wet * enhanced)
      
        # Safe clipping prevention
        peak = np.max(np.abs(final))
        if peak > 0.999:
            print(f"Peak level: {peak:.3f} - applying safety limiter")
            final = final / peak * 0.995
      
        # Prepare output
        out_path = args.out if args.out else build_output_path(args.input)
      
        # Handle mono output correctly
        if final.ndim == 2 and final.shape[1] == 1:
            final_out = final[:, 0]
        else:
            final_out = final
      
        # Write output file
        print(f"Writing enhanced audio to: {out_path}")
        print(f"Output format: {out_sr} Hz, 32-bit float")
      
        try:
            sf.write(out_path, final_out.astype('float32'), out_sr, subtype='FLOAT')
            print("✓ Enhancement complete!")
          
            # Performance summary
            print(f"\nPerformance summary:")
            print(f"  Threads used: {params.n_threads}")
            print(f"  FFTw acceleration: {'✓' if _HAS_PYFFTW else '✗'}")
            print(f"  Numba JIT: {'✓' if _HAS_NUMBA else '✗'}")
          
        except Exception as e:
            print(f"Error writing output file: {e}")
            sys.exit(1)
    
    
    if __name__ == '__main__':
        # Set up multiprocessing for Windows compatibility
        mp.set_start_method('spawn', force=True) if sys.platform == 'win32' else None
      
        # Configure numpy for better threading
        try:
            import mkl
            mkl.set_num_threads(get_optimal_threads())
        except ImportError:
            pass
      
        # Suppress common warnings for cleaner output
        warnings.filterwarnings('ignore', category=UserWarning, module='librosa')
        warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
      
        main()
    MP3:
    [​IMG]

    Enhanched (100% mix):
    [​IMG]

    Null:
    [​IMG]
     
  19. forart.it

    forart.it Kapellmeister

    Joined:
    May 5, 2023
    Messages:
    137
    Likes Received:
    68
    ...here's the result from another - recently patched - interesting open source competitor on the scene:

    [​IMG]

    Check it out: HRAudioWizard
     
Loading...
Loading...