]> OzVa Git service - audio-over-stft/commitdiff
added live audio capability
authorwill <greenwoodw50@gmail.com>
Wed, 3 Jul 2024 22:46:20 +0000 (22:46 +0000)
committerwill <greenwoodw50@gmail.com>
Wed, 3 Jul 2024 22:46:20 +0000 (22:46 +0000)
12 files changed:
__pycache__/camera.cpython-312.pyc [new file with mode: 0644]
__pycache__/fft.cpython-312.pyc [new file with mode: 0644]
camera.py [new file with mode: 0644]
fft.py [new file with mode: 0644]
file.py [new file with mode: 0644]
main.py [deleted file]
out.wav
rec.wav [new file with mode: 0644]
stream.py [new file with mode: 0644]
test.py [deleted file]
test.wav [new file with mode: 0644]
test2.wav [new file with mode: 0644]

diff --git a/__pycache__/camera.cpython-312.pyc b/__pycache__/camera.cpython-312.pyc
new file mode 100644 (file)
index 0000000..f47db0f
Binary files /dev/null and b/__pycache__/camera.cpython-312.pyc differ
diff --git a/__pycache__/fft.cpython-312.pyc b/__pycache__/fft.cpython-312.pyc
new file mode 100644 (file)
index 0000000..75fb418
Binary files /dev/null and b/__pycache__/fft.cpython-312.pyc differ
diff --git a/camera.py b/camera.py
new file mode 100644 (file)
index 0000000..d222678
--- /dev/null
+++ b/camera.py
@@ -0,0 +1,79 @@
+import cv2 as cv
+import numpy as np
+
+class camera():
+       def __init__(
+               self,
+               window_size: int,
+               window_height: int,
+               display_size: tuple,
+               device_id: int = 0
+       ):
+
+               self.window_size = window_size
+               self.window_height = window_height
+               self.display_size = display_size
+
+               self.camera = cv.VideoCapture(device_id)
+               self.homography = None
+
+               cv.namedWindow("display", cv.WINDOW_NORMAL)
+
+       def calibrate(
+               self
+       ):
+               calibration_image = cv.imread("calibration/calibration.jpg")
+               calibration_image = cv.resize(calibration_image, self.display_size, cv.INTER_NEAREST)
+
+               cv.imshow("display", calibration_image)
+               cv.waitKey(0)
+               _, capture = camera.read()
+
+               # detect SIFT keypoints
+               sift = cv.SIFT_create()
+               kp1, des1 = sift.detectAndCompute(calibration_image, None)
+               kp2, des2 = sift.detectAndCompute(capture, None)
+
+               # get good matches between calibration image and the captured image
+               flann = cv.FlannBasedMatcher(
+                       {"algorithm": 1, "trees": 5},
+                       {"checks": 50}
+               )
+               matches = flann.knnMatch(des1, des2, k=2)
+
+               #get good matches via ratio test
+               good = []
+               for m,n in matches:
+                       if m.distance < 0.7*n.distance:
+                               good.append(m)
+
+               if len(good)>10:
+                       src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+                       dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+                       self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+
+               else:
+                       print("calibration failed")
+
+       def display(
+               self,
+               image: np.ndarray
+       ) -> None:
+        
+               image = cv.resize(image, self.display_size, cv.INTER_NEAREST)
+               cv.imshow("display", image)
+               cv.waitKey(1)
+
+       def capture(
+               self
+       ) -> np.ndarray:
+
+               image = self.camera.read()
+               if self.homography is not None:
+                       image = cv.warpPerspective(image, self.homography, self.display_size)
+                       image = cv.resize(image, (self.window_size, self.window_height), cv.INTER_NEAREST)
+                       image = match_histograms(image, display, channel_axis=-1)
+
+               return image
+
+
diff --git a/fft.py b/fft.py
new file mode 100644 (file)
index 0000000..2950929
--- /dev/null
+++ b/fft.py
@@ -0,0 +1,77 @@
+import numpy as np
+import cv2 as cv
+
+class fft():
+       def __init__(
+               self,
+               window_size: int,
+               hop_size: int
+       ):
+               self.window_size = window_size
+               self.hop_size = hop_size
+               self.window = np.hanning(window_size)
+
+               self.lower_limit = -40
+               self.upper_limit = 100
+
+               self.amplitude_max = 180
+               self.amplitude_min = 0
+               self.angle_max = 255
+               self.angle_min = 100
+
+               self.amplitude_relative = self.amplitude_max - self.amplitude_min
+               self.angle_relative = self.angle_max - self.angle_min
+
+
+       def stft(
+               self,
+               data: np.ndarray
+       ) -> np.ndarray:
+
+               segment = data * self.window
+               spectrum = np.fft.fft(segment) / self.window_size
+
+               amplitude = np.abs(spectrum)
+               amplitude = 20*np.log10(amplitude)
+               amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
+               amplitude -= self.lower_limit
+               amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min
+
+               angle = np.angle(spectrum)
+               angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
+
+               full = np.full(angle.shape, fill_value=255)
+
+               image = np.stack((amplitude, angle, full), axis=-1)
+               image = np.array([image], dtype=np.uint8)
+
+               image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
+
+               return image
+
+       def istft(
+               self,
+               image: np.ndarray
+       ) -> np.ndarray:
+
+               image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
+
+               amplitude = image[0][...,0].astype(np.float64)
+               angle = image[0][...,1].astype(np.float64)
+
+               amplitude -= self.amplitude_min
+               amplitude /= (self.amplitude_relative / self.upper_limit)
+               amplitude += self.lower_limit
+               amplitude = np.power(10, amplitude / 20)
+
+               angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
+
+               real = np.cos(angle) * amplitude
+               imag = np.sin(angle) * amplitude
+               segment = real + (1j * imag)
+
+               data = np.fft.ifft(segment * self.window_size).real
+
+               return data
+
+
diff --git a/file.py b/file.py
new file mode 100644 (file)
index 0000000..1ca680c
--- /dev/null
+++ b/file.py
@@ -0,0 +1,69 @@
+import cv2 as cv
+import numpy as np
+from scipy.io import wavfile
+import scipy.signal as sps
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+
+"""
+notes:
+- window size
+       the time to generate the spectrum is logaritmically related to the window size
+       bigger windows are exponentially better so you should prefer this if possible
+       obviously the biggest you can use is the size of your display unless you have
+       some way of arranging the pixles independant of the orrigional spectrogram
+"""
+
+sample_rate, data = wavfile.read("/home/will/Downloads/number-station.wav")
+new_rate = 22_050.
+
+sample_count = round(len(data) * new_rate / sample_rate)
+data = sps.resample(data, sample_count)
+sample_rate = int(new_rate)
+
+data = [data[i] for i in range(0, len(data), 2)]
+sample_rate = sample_rate // 2
+
+window_size = 250
+window_height = 125
+
+hop_size = window_size // 2
+camera = camera(window_size, window_height, (1000, 1000))
+transform = fft(window_size, hop_size)
+
+segment_samples = window_height * hop_size
+
+padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1)
+data = np.concatenate((data, padding))
+
+recovered_data = np.zeros(data.shape)
+
+segment_count = round(len(data) / segment_samples)
+
+for segment_index in range(segment_count):
+       segment_start = segment_index * segment_samples
+       rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+       with Pool() as p:
+               mapping = p.map(transform.stft, rows)
+
+       spectrum = np.array(mapping)[:,0,...]
+
+       camera.display(spectrum)
+
+       rows = [np.array([i]) for i in spectrum]
+       with Pool() as p:
+               recovered = np.array(p.map(transform.istft, rows))
+
+       for i, row in enumerate(recovered):
+               row_start = i * hop_size
+               recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row
+
+wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16))
+
+difference = (data - recovered_data)[1000:251000]
+
+
+
+
diff --git a/main.py b/main.py
deleted file mode 100644 (file)
index 229a797..0000000
--- a/main.py
+++ /dev/null
@@ -1,220 +0,0 @@
-import cv2 as cv
-import numpy as np
-from scipy.io import wavfile
-import scipy.signal as sps
-import matplotlib.pyplot as plt
-from multiprocessing import Pool
-
-"""
-notes:
-- window size
-       the time to generate the spectrum is logaritmically related to the window size
-       bigger windows are exponentially better so you should prefer this if possible
-       obviously the biggest you can use is the size of your display unless you have
-       some way of arranging the pixles independant of the orrigional spectrogram
-"""
-
-class camera():
-       def __init__(
-               self,
-               window_size: int,
-               window_height: int,
-               device_id: int = 0
-       ):
-
-               self.window_size = window_size
-               self.window_height = window_height
-
-               self.camera = cv.VideoCapture(device_id)
-               self.homography = None
-
-               cv.namedWindow("display", cv.WINDOW_NORMAL)
-
-       def calibrate(
-               self
-       ):
-               calibration_image = cv.imread("calibration/calibration.jpg")
-               calibration_image = cv.resize(calibration_image, (self.window_size, self.window_height), cv.INTER_NEAREST)
-
-               cv.imshow("display", calibration_image)
-               cv.waitKey(0)
-               _, capture = camera.read()
-
-               # detect SIFT keypoints
-               sift = cv.SIFT_create()
-               kp1, des1 = sift.detectAndCompute(calibration_image, None)
-               kp2, des2 = sift.detectAndCompute(capture, None)
-
-               # get good matches between calibration image and the captured image
-               flann = cv.FlannBasedMatcher(
-                       {"algorithm": 1, "trees": 5},
-                       {"checks": 50}
-               )
-               matches = flann.knnMatch(des1, des2, k=2)
-
-               #get good matches via ratio test
-               good = []
-               for m,n in matches:
-                       if m.distance < 0.7*n.distance:
-                               good.append(m)
-
-               if len(good)>10:
-                       src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
-                       dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
-
-                       self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
-
-               else:
-                       print("calibration failed")
-
-       def display(
-               self,
-               image: np.ndarray
-       ) -> None:
-
-               cv.imshow("display", image)
-               cv.waitKey(1)
-
-       def capture(
-               self
-       ) -> np.ndarray:
-
-               image = self.camera.read()
-               if self.homography is not None:
-                       image = cv.warpPerspective(image, self.homography, (self.display_size, self.display_height))
-                       image = match_histograms(image, display, channel_axis=-1)
-
-               return image
-
-class fft():
-       def __init__(
-               self,
-               window_size: int,
-               hop_size: int
-       ):
-               self.window_size = window_size
-               self.hop_size = hop_size
-               self.window = np.hanning(window_size)
-
-               self.lower_limit = -40
-               self.upper_limit = 100
-
-               self.amplitude_max = 180
-               self.amplitude_min = 0
-               self.angle_max = 255
-               self.angle_min = 100
-
-               self.amplitude_relative = self.amplitude_max - self.amplitude_min
-               self.angle_relative = self.angle_max - self.angle_min
-
-
-       def stft(
-               self,
-               data: np.ndarray
-       ) -> np.ndarray:
-
-               segment = data * self.window
-               spectrum = np.fft.fft(segment) / self.window_size
-
-               amplitude = np.abs(spectrum)
-               amplitude = 20*np.log10(amplitude)
-               amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
-               amplitude -= self.lower_limit
-               amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min
-
-               angle = np.angle(spectrum)
-               angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
-
-               full = np.full(angle.shape, fill_value=255)
-
-               image = np.stack((amplitude, angle, full), axis=-1)
-               image = np.array([image], dtype=np.uint8)
-
-               image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
-
-               return image
-
-       def istft(
-               self,
-               image: np.ndarray
-       ) -> np.ndarray:
-
-               image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
-
-               amplitude = image[0][...,0].astype(np.float64)
-               angle = image[0][...,1].astype(np.float64)
-
-               amplitude -= self.amplitude_min
-               amplitude /= (self.amplitude_relative / self.upper_limit)
-               amplitude += self.lower_limit
-               amplitude = np.power(10, amplitude / 20)
-
-               angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
-
-               real = np.cos(angle) * amplitude
-               imag = np.sin(angle) * amplitude
-               segment = real + (1j * imag)
-
-               data = np.fft.ifft(segment * self.window_size).real
-
-               return data
-
-sample_rate, data = wavfile.read("/home/will/Music/George Michael - Careless Whisper.wav")
-new_rate = 22_050.
-
-sample_count = round(len(data) * new_rate / sample_rate)
-data = sps.resample(data, sample_count)
-sample_rate = int(new_rate)
-
-data = [data[i] for i in range(0, len(data), 2)]
-sample_rate = sample_rate // 2
-
-window_size = 1_000
-window_height = 500
-
-hop_size = window_size // 2
-camera = camera(window_size, window_height)
-transform = fft(window_size, hop_size)
-
-segment_samples = window_height * hop_size
-
-padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1)
-data = np.concatenate((data, padding))
-
-recovered_data = np.zeros(data.shape)
-
-segment_count = round(len(data) / segment_samples)
-
-for segment_index in range(segment_count):
-       segment_start = segment_index * segment_samples
-       rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
-       with Pool() as p:
-               mapping = p.map(transform.stft, rows)
-
-       spectrum = np.array(mapping)[:,0,...]
-
-       # do the silly capture thing here
-       cv.imshow("display", spectrum)
-       cv.waitKey(1)
-
-       rows = [np.array([i]) for i in spectrum]
-       with Pool() as p:
-               recovered = np.array(p.map(transform.istft, rows))
-
-       for i, row in enumerate(recovered):
-               row_start = i * hop_size
-               recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row
-
-wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16))
-
-difference = (data - recovered_data)[1000:251000]
-
-plt.style.use('dark_background')
-fig, (ax1, ax2) = plt.subplots(nrows=2)
-ax1.plot(difference)
-Pxx, freqs, bins, im = ax2.specgram(difference, NFFT=1014, Fs=1/0.0005)
-plt.show()
-
-
-
-
diff --git a/out.wav b/out.wav
index baf075e524d1e50868d1215e43557b5c46fb1ae6..b56a8b7086bf34b6a0577d41de3f509773bb7721 100644 (file)
Binary files a/out.wav and b/out.wav differ
diff --git a/rec.wav b/rec.wav
new file mode 100644 (file)
index 0000000..d365d58
Binary files /dev/null and b/rec.wav differ
diff --git a/stream.py b/stream.py
new file mode 100644 (file)
index 0000000..18daf7b
--- /dev/null
+++ b/stream.py
@@ -0,0 +1,102 @@
+from struct import unpack
+import numpy as np
+from scipy.io import wavfile
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+import pyaudio
+import sys
+import os
+import wave
+
+"""
+notes:
+- window size
+       the time to generate the spectrum is logaritmically related to the window size
+       bigger windows are exponentially better so you should prefer this if possible
+       obviously the biggest you can use is the size of your display unless you have
+       some way of arranging the pixles independant of the orrigional spectrogram
+- read size (the window size)
+       this is the amount of data that is read from the audio device at one time
+       i belive the maximum for this specific device is 990? its something to do with
+       the number of channels and the sample rate...
+
+every time the window width / 2 number of samples is available to read from the audio
+device. the program puts that chunk of audio into the biffer. each chunk is then
+appended to the last chunk. the last chunk (with no later chunk to append onto it) is
+left in the buffer to provide a smooth transition between the images
+"""
+
+window_width = 750 
+window_height = 500
+sample_rate = 22_050
+channels = 1
+
+hop_size = window_width // 2
+camera = camera(window_width, window_height, (1000, 1000))
+transform = fft(window_width, hop_size)
+
+pyaudio_object = pyaudio.PyAudio()
+stream = pyaudio_object.open(
+       format = pyaudio.paInt16,
+       channels = channels,
+       rate = sample_rate,
+       input = True
+)
+
+buffer = []
+spectrum = np.zeros((window_height, window_width, 3), dtype=np.uint8)
+spectrum_index = 0
+audio = np.zeros((hop_size,), dtype=np.int16)
+
+try:
+       file = wave.open("out.wav", "wb")
+       file.setparams((
+               channels,
+               2,              # sample width
+               sample_rate,
+               0,
+               "NONE",         # compression type
+               "NONE"          # compression name
+       ))
+
+       while stream.is_active():
+               data = stream.read(hop_size, exception_on_overflow = False)
+               data = unpack(f"<{hop_size}h", data)
+               buffer.append(list(data))
+
+               if len(buffer) == 2:
+                       spectrum[spectrum_index] = transform.stft(buffer[0] + buffer[1])
+                       spectrum_index += 1
+                       del buffer[0]
+
+                       camera.display(spectrum)
+
+               if spectrum_index == window_height:
+                       spectrum_index = 0
+
+                       rows = [np.array([i]) for i in spectrum]
+                       with Pool(3) as p:
+                               recovered = np.array(p.map(transform.istft, rows), dtype=np.int16)
+
+                       for row in recovered:
+                               audio[-hop_size:] += row[:hop_size]
+                               audio = np.append(audio, row[hop_size:])
+
+                               file.writeframes(audio[:-hop_size])
+                               audio = np.delete(audio, np.s_[:-hop_size])
+
+except KeyboardInterrupt:
+
+       stream.stop_stream()
+       stream.close()
+       pyaudio_object.terminate()
+       file.close()
+
+       try:
+               sys.exit()
+       except SystemExit:
+               os._exit(130)
+
+
diff --git a/test.py b/test.py
deleted file mode 100644 (file)
index 89e73be..0000000
--- a/test.py
+++ /dev/null
@@ -1,26 +0,0 @@
-import wave
-
-import pyaudio
-
-def handler (in_data, frame_count, time_info, status):
-       print(in_data)
-       print(frame_count)
-       print(time_info)
-       print(status)
-
-       (in_data, pyaudio.paContinue)
-
-CHUNK = 1024
-
-p = pyaudio.PyAudio()
-
-stream = p.open(
-       format=pyaudio.paInt8,
-       channels=1,
-       rate=22_050,
-       input=True,
-       stream_callback=handler
-)
-
-stream.close()
-p.terminate()
diff --git a/test.wav b/test.wav
new file mode 100644 (file)
index 0000000..3dcd159
Binary files /dev/null and b/test.wav differ
diff --git a/test2.wav b/test2.wav
new file mode 100644 (file)
index 0000000..ea303ee
Binary files /dev/null and b/test2.wav differ