]> OzVa Git service - audio-over-stft/commitdiff
Added .venv
authorwill <greenwoodw50@gmail.com>
Sat, 7 Sep 2024 13:53:14 +0000 (14:53 +0100)
committerwill <greenwoodw50@gmail.com>
Mon, 9 Sep 2024 19:35:56 +0000 (20:35 +0100)
- removed unused loop.py dependencies
- removed unused files
+ modified .gitignore to not include the calibration image
+ split loop.py into multiple functions
+ camera.py debugging tools
+ fixed persitant issue with scaling in fft.py
+ Keyboard exeption in multithreaded code
+ added main loop function to loop.py
+ added silence error correction in loop.py

.gitignore
camera.py [changed mode: 0644->0755]
fft.py
loop.py [changed mode: 0644->0755]
mum.wav [deleted file]

index 0d86721f82d79d0de1c698c79be683535a681255..7d6f707271f79d0a51844d6e2ac5664a937750bd 100644 (file)
@@ -1,4 +1,6 @@
 *.npy
 *.jpg
+!calibration.jpg
 *.wav
 __pycache__/
+.venv/
old mode 100644 (file)
new mode 100755 (executable)
index 4dc634f..d1684d0
--- a/camera.py
+++ b/camera.py
@@ -1,10 +1,10 @@
+#!./.venv/bin/python
+
 import cv2 as cv
 import numpy as np
 import queue
-import time
 import threading
-import random
-import matplotlib.pyplot as plt
+import cProfile
 
 class VideoCapture:
        def __init__(self, device_id, backend):
@@ -45,7 +45,7 @@ class camera():
                window_height: int,
                display_size: tuple,
                device_id: int = 0,
-               debug: bool = True,
+               debug: bool = False,
                dummy: bool = False
        ):
 
@@ -69,8 +69,8 @@ class camera():
                self.last_recovered = None
 
                cv.namedWindow("display", cv.WINDOW_NORMAL + cv.WINDOW_GUI_NORMAL)
-               if self.show_debug == True:
-                       cv.namedWindow("debug", cv.WINDOW_NORMAL)
+               if debug:
+                       cv.namedWindow("debug", cv.WINDOW_NORMAL + cv.WINDOW_GUI_NORMAL)
 
        def calibrate(
                self
@@ -114,14 +114,18 @@ class camera():
 
        def display(
                self,
-               image: np.ndarray
+               image: np.ndarray,
+               debug: bool = False
        ) -> None:
 
                self.last_display = image
+               image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
                image = cv.resize(image, self.display_size, interpolation=cv.INTER_NEAREST_EXACT)
-               image[...,0] = np.round(image[...,0] * 0.9)
 
-               cv.imshow("display", image)
+               if debug:
+                       cv.imshow("debug", image)
+               else:
+                       cv.imshow("display", image)
                cv.waitKey(1)
 
        def capture(
@@ -139,6 +143,8 @@ class camera():
                                image = cv.warpPerspective(image, self.homography, self.display_size)
                                image = cv.resize(image, (self.window_size, self.window_height))
 
+                               image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
+
                self.last_recovered = image
 
                if self.show_debug == True:
@@ -164,3 +170,17 @@ class camera():
                        cv.imshow("debug", debug_image)
                        cv.waitKey(1)
 
+if __name__ == "__main__":
+       camera = camera(
+               160,
+               90,
+               (1920, 1080),
+               device_id = 0,
+               debug = False,
+               dummy = False
+       )
+       #camera.calibrate()
+
+       cProfile.run("camera.display(np.zeros((90, 160, 3), dtype=np.uint8))")
+       cProfile.run("camera.capture()")
+
diff --git a/fft.py b/fft.py
index c45456b4c6ca4e9e8f12eb0605a506d08d99b913..fc14e258a6c7b0b542a62103bebaa72db73b71d2 100644 (file)
--- a/fft.py
+++ b/fft.py
@@ -1,6 +1,7 @@
+#!./.venv/bin/python
+
 import numpy as np
 import cv2 as cv
-import matplotlib.pyplot as plt
 
 """
 Notes:
@@ -19,20 +20,21 @@ class fft():
        def __init__(
                self,
                window_size: int,
-               hop_size: int
        ):
                # calculate the window and hop size, use to calulate the cosine window
                self.window_size = window_size
-               self.hop_size = hop_size
+               self.hop_size = window_size // 2
                self.window = np.hanning(window_size)
 
+               self.placeholder = np.full((window_size,), fill_value=60)
 
                # set the max and min numerical values for amplitude and angle to allow for easier combinations of them both
                self.amplitude_max = 254
                self.amplitude_min = 0
+
                self.angle_max = 254
                self.angle_min = 0
-               # set the upper and lower limits (in dB) that are to be displayed on the screen
+
                self.volume_max = 100
                self.volume_min = -40
 
@@ -42,11 +44,12 @@ class fft():
                self.volume_relative = self.volume_max - self.volume_min
 
                # generate lookup table for the converstion from decibels to power
-               a = self.volume_min
-               b = self.volume_max / self.amplitude_relative
+               self.a = a = self.volume_min
+               self.b = b = self.volume_relative / self.amplitude_relative
+               self.c = c = self.amplitude_min
 
-               # this is the parameterized inverted function of y = (20 * log10(x) - 40) * (255/140)
-               log_lookup = [10 ** (((x * b) + a) / 20) for x in range(0, 256)]
+               # this is the parameterized inverted function of (e.g.) y = (20 * log10(x) - 40) * (255/140)
+               log_lookup = [10 ** ((((x - c) * b) + a) / 20) for x in range(0, 256)]
                self.log_lookup = np.array(log_lookup)
 
        def stft(
@@ -54,61 +57,58 @@ class fft():
                data: np.ndarray
        ) -> np.ndarray:
 
-               # apply window and perform the fft
-               segment = data * self.window
-               spectrum = np.fft.fft(segment) / self.window_size
-
-               # convert the vector length to decimals and confine
-               amplitude = np.abs(spectrum)
-
-               amplitude = 20*np.log10(amplitude)
-               amplitude = np.clip(amplitude, self.volume_min, self.volume_max)
+               try:
+                       # apply window and perform the fft
+                       segment = data * self.window
+                       spectrum = np.fft.fft(segment) / self.window_size
 
-               # confine the amplitude within the limits specified
-               a = self.volume_min
-               b = self.amplitude_relative / self.volume_relative # possibly change the vol_max to vol_rel ?? see [2]
-               c = self.amplitude_min
-               amplitude = ((amplitude - a) * b) + c
+                       # convert the vector length to decimals and confine
+                       amplitude = np.abs(spectrum)
+                       amplitude = 20*np.log10(amplitude)
+                       amplitude = np.clip(amplitude, self.volume_min, self.volume_max)
+                       amplitude = ((amplitude - self.a) / self.b) + self.c
 
-               # convert x and y to the angle and confine
-               angle = np.angle(spectrum)
+                       # convert x and y to the angle and confine
+                       angle = np.angle(spectrum)
+                       angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
+                       angle = np.clip(angle, self.angle_min, self.angle_max)
 
-               # confine the angle within the limits specified
-               angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
-               angle = np.clip(angle, self.angle_min, self.angle_max)
+                       # rearrange to image format
+                       image = np.stack((amplitude * (180/255), angle, amplitude), axis=-1)
+                       image = np.array([image], dtype=np.uint8)
 
-               # rearrange to image format
-               full = np.full(angle.shape, fill_value=60)
-               image = np.stack((amplitude * (180/255), angle, amplitude), axis=-1)
-               image = np.array([image], dtype=np.uint8)
-               image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
+                       return image
 
-               return image
+               except KeyboardInterrupt:
+                       return None
 
        def istft(
                self,
                image: np.ndarray
        ) -> np.ndarray:
 
-               # split the image into constituant parts
-               image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
-               amplitude = image[0][...,2]
-               angle = image[0][...,1].astype(np.float64)
-               #hue = image[0][...,0].astype(np.float64) * (255/180)
+               try:
+                       # split the image into constituant parts
+                       amplitude = image[0][...,2]
+                       angle = image[0][...,1].astype(np.float64)
 
-               #amplitude = np.mean( np.array([ amplitude, hue ]), axis=0 ).astype(np.uint8)
+                       # Use hue as seperate data point
+                       #hue = image[0][...,0].astype(np.float64) * (255/180)
+                       #amplitude = np.mean( np.array([ amplitude, hue ]), axis=0 ).astype(np.uint8)
 
-               # convert amplitude back into vector length
-               amplitude = self.log_lookup[amplitude]
+                       # convert amplitude back into vector length
+                       amplitude = self.log_lookup[amplitude]
 
-               # convert angle back into x and y
-               angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
+                       # convert angle back into x and y
+                       angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
 
-               # rearrange back into fft result
-               real = np.cos(angle) * amplitude
-               imag = np.sin(angle) * amplitude
-               segment = real + (1j * imag)
+                       # rearrange back into fft result
+                       real = np.cos(angle) * amplitude
+                       imag = np.sin(angle) * amplitude
+                       segment = real + (1j * imag)
+                       data = np.fft.ifft(segment * self.window_size).real
 
-               data = np.fft.ifft(segment * self.window_size).real
+                       return data.astype(np.int16)
 
-               return data.astype(np.int16)
+               except KeyboardInterrupt:
+                       return None
diff --git a/loop.py b/loop.py
old mode 100644 (file)
new mode 100755 (executable)
index b59d7f6..e0c534a
--- a/loop.py
+++ b/loop.py
+#!./.venv/bin/python
+
 import cv2 as cv
 import numpy as np
 from scipy.io import wavfile
 import scipy.signal as sps
-import matplotlib.pyplot as plt
 from multiprocessing import Pool
 from camera import camera
-from struct import pack
 from fft import fft
 import time
 import pyaudio
 import os
 import sys
 import wave
-import matplotlib.pyplot as plt
-import gi
-from gi.repository import Gtk
+from tqdm import tqdm
 
-"""
-notes:
-- window size
-       the time to generate the spectrum is logaritmically related to the window size
-       bigger windows are exponentially better so you should prefer this if possible
-       obviously the biggest you can use is the size of your display unless you have
-       some way of arranging the pixles independant of the orrigional spectrogram
-"""
+def normalize(array: np.ndarray):
+       array = array.astype(np.float64)
+       array -= np.min(array)
+       array *= 255 / np.max(array)
 
-sample_rate, data = wavfile.read("/home/will/Downloads/birdsong.wav")
-#data = data[...,0]
+       return array.astype(np.uint8)
 
-new_rate = 10_000.
-sample_count = round(len(data) * new_rate / sample_rate)
-data = sps.resample(data, sample_count)
-sample_rate = int(new_rate)
+def get_audio(
+       uri: str,
+       window_size: int,
+       window_height: int,
+       new_rate: int = 11_050
+) -> np.ndarray:
 
-window_size = 170
-window_height = 80
+       sample_rate, data = wavfile.read(uri)
 
-hop_size = window_size // 2
-camera = camera(
-       window_size,
-       window_height,
-       (1920, 1080),
-       device_id = 2,
-       debug = True,
-       dummy = False
-)
-
-file = wave.open("out.wav", "wb")
-file.setparams((
-       1,                      # channels
-       2,                      # sample width
-       sample_rate,
-       0,
-       "NONE",         # compression type
-       "NONE"          # compression name
-))
-
-camera.calibrate()
-
-transform = fft(window_size, hop_size)
-
-segment_samples = window_height * hop_size
-overflow_samples = segment_samples - (len(data) % segment_samples) + window_size
-data = np.concatenate((data, data[0:overflow_samples]))
-
-segment_count = round(len(data) / segment_samples)
-segment_index = 0
-audio = np.zeros((hop_size,), dtype=np.int16)
+       # ensure 1 channel
+       if len(data.shape) > 1:
+               data = data[...,0]
 
-def callback(in_data, frame_count, time_info, status):
+       # resample
+       sample_count = round(len(data) * float(new_rate) / sample_rate)
+       data = sps.resample(data, sample_count)
+
+       # make divisisible into screens
+       segment_samples = window_height * (window_height // 2)
+       overflow_samples = segment_samples - (len(data) % segment_samples) + window_size
+       data = np.concatenate((data, data[0:overflow_samples]))
+
+       return data
 
+def callback(in_data, frame_count, time_info, status):
        global audio
+       global caching
 
+       # handle not enough frames being available
        data = audio[:frame_count]
        if len(data) < frame_count:
                data = np.pad(data, [(0, frame_count - len(data))], mode='constant')
                audio = np.zeros((hop_size,), dtype=np.int16)
+               if not caching: print("Dropped frames!")
+
        else:
                audio = np.delete(audio, np.s_[:frame_count])
 
+               #handle buffer minimum
+               if len(audio) < hop_size:
+                       audio = np.zeros((hop_size,), dtype=np.int16)
+                       print("buffer minimum exceeded ")
+
        return (data, pyaudio.paContinue)
 
-pyaudio_object = pyaudio.PyAudio()
-stream = pyaudio_object.open(
-       format = pyaudio.paInt16,
-       channels = 1,
-       rate = sample_rate,
-       frames_per_buffer = 1024,
-       output = True,
-       stream_callback = callback
-)
-
-try:
-       while stream.is_active():
+def process_loop(
+       data: np.ndarray,
+       transform,
+       camera,
+       window_size: int,
+       window_height: int,
+       loop: bool = True,
+       correction: bool = False,
+       correction_array: np.ndarray = None
+):
+       global audio
+       global caching
+
+       hop_size = window_size // 2
+       segment_samples = window_height * hop_size
+
+       segment_count = round(len(data) / segment_samples)
+
+       error_array = np.zeros((5, window_size))
+       error_spectrum = np.zeros((window_height, window_size, 3))
+
+       print("caching data...")
+       caching = True
+
+       for segment_index in tqdm(range(segment_count)):
+               # get the specturm of the current sample
                segment_start = segment_index * segment_samples
-               rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+               segment_rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
                with Pool() as p:
-                       mapping = p.map(transform.stft, rows)
+                       mapping = p.map(transform.stft, segment_rows)
 
                spectrum = np.array(mapping)[:,0,...]
+               if correction_array is not None:
+                       spectrum -= correction_array
+                       spectrum = np.clip(spectrum, 0, 255)
+
+               spectrum = spectrum.astype(np.uint8)
 
-               if segment_index == 10: cv.imwrite("sample.jpg", spectrum)
+               # print sample image
+               if segment_index == 10:
+                       cv.imwrite("spectrum_sample.jpg", spectrum)
 
+               np.save(f"cache/frame{segment_index}.npy", spectrum)
+
+       segment_index = 0
+       print("cached!")
+       caching = False
+
+       while segment_index < segment_count:
+
+               spectrum = np.load(f"cache/frame{segment_index}.npy")
+
+               # display and capture
                camera.display(spectrum)
                capture = camera.capture()
 
-               # plt.clf()
-               # plt.plot(rows[0])
-
-               rows = [np.array([i]) for i in capture]
+               # get the recovered sample data
+               recovered_rows = [np.array([i]) for i in capture]
                with Pool() as p:
-                       recovered = np.array(p.map(transform.istft, rows))
-
-               if len(audio) < hop_size:
-                       audio = np.zeros((hop_size,), dtype=np.int16)
+                       recovered = np.array(p.map(transform.istft, recovered_rows))
 
-               # plt.plot(recovered[0])
-               # plt.pause(0.05)
+               if correction:
+                       error_array[1:] = error_array[0:-1]
+                       error_array[0] = np.mean(np.array(segment_rows) - recovered, axis=0)
 
+               # write the recovered data to the file and soundcard
                for row in recovered:
                        audio[-hop_size:] += row[:hop_size]
                        audio = np.append(audio, row[hop_size:])
 
-                       file.writeframes(row[hop_size:])
-
-               segment_index += 1
-               if segment_index == segment_count: segment_index = 0
+                       # file.writeframes(row[hop_size:])
 
-               slept = False
+               # sleep condition
                time.sleep(0.1)
                while len(audio) > 1 * segment_samples:
-                       slept = True
                        cv.waitKey(1)
 
-               if not slept:
-                       print("Dropped frames!")
+               # move to next segment
+               segment_index += 1
+               if loop and segment_index == segment_count:
+                       segment_index = 0
+
+       error_spectrum = transform.stft(np.mean(error_array, axis=0))
+       error_spectrum = np.tile(error_spectrum, (window_height, 1, 1))
+       error_spectrum[...,2] *= 0 # correction of phase seems to cause stability issues
+
+       return error_spectrum
+
+"""
+notes:
+- sample rate
+       due to nyquist, the maximum possible frequncy that can be recovered is half this
+       number. Lower can be dramatically better from a speed standpoint.
+- window size
+       the time to generate the spectrum is logaritmically related to the window size
+       bigger windows are exponentially better so you should prefer this if possible
+       obviously the biggest you can use is the size of your display unless you have
+       some way of arranging the pixles independant of the orrigional spectrogram.
+- Window height
+       This is the height of the image, or, the number of ffts performed in one "batch".
+       This batching is multithreaded and therefore has some effect on process speed.
+"""
+
+# define parameters
+sample_rate = 22_050
+window_size = 150
+window_height = 80
 
-except KeyboardInterrupt:
-       stream.stop_stream()
-       stream.close()
-       pyaudio_object.terminate()
-       file.close()
+caching = False
+
+if __name__ == "__main__":
+
+       # get audio data
+       data = get_audio("/home/will/Downloads/Adducci - Around the Horn.wav", window_size, window_height, sample_rate)
+
+       # setup fft
+       transform = fft(window_size)
+
+       # setup and calibrate camera
+       camera = camera(
+               window_size,
+               window_height,
+               (1920, 1080),
+               device_id = 2,
+               debug = False,
+               dummy = True
+       )
+       camera.calibrate()
+
+       # setup output file
+       file = wave.open("out.wav", "wb")
+       file.setparams((
+               1,                      # channels
+               2,                      # sample width
+               sample_rate,
+               0,
+               "NONE",         # compression type
+               "NONE"          # compression name
+       ))
+
+       # setup stream output
+       audio = np.zeros((window_size // 2,), dtype=np.int16)
+       pyaudio_object = pyaudio.PyAudio()
+       stream = pyaudio_object.open(
+               format = pyaudio.paInt16,
+               channels = 1,
+               rate = sample_rate,
+               frames_per_buffer = 1024,
+               output = True,
+               stream_callback = callback
+       )
 
        try:
-               sys.exit()
-       except SystemExit:
-               os._exit(130)
+               print("performing error correction...")
+               silence = np.full(((10 * window_size * window_height) + window_size,), fill_value=1, dtype=np.int16)
+               correction_array = process_loop(silence, transform, camera, window_size, window_height, loop = False, correction = True)
+
+               cv.imwrite("error.jpg", correction_array)
+               cv.imwrite("error_norm.jpg", normalize(correction_array))
+               print("error correction complete!")
+
+               process_loop(data, transform, camera, window_size, window_height, loop = True, correction = False, correction_array = correction_array)
+
+       # elegantly handle interupt (intended but not succeded)
+       except KeyboardInterrupt:
+               stream.stop_stream()
+               stream.close()
+               pyaudio_object.terminate()
+               file.close()
+
+               try:
+                       sys.exit()
+               except SystemExit:
+                       os._exit(130)
diff --git a/mum.wav b/mum.wav
deleted file mode 100644 (file)
index 0140a2b..0000000
Binary files a/mum.wav and /dev/null differ