From: will Date: Wed, 3 Jul 2024 22:46:20 +0000 (+0000) Subject: added live audio capability X-Git-Url: https://git.ozva.co.uk/?a=commitdiff_plain;h=e91b29123a5e249b656c5ed8b0ac4936b38346b7;p=audio-over-stft added live audio capability --- diff --git a/__pycache__/camera.cpython-312.pyc b/__pycache__/camera.cpython-312.pyc new file mode 100644 index 0000000..f47db0f Binary files /dev/null and b/__pycache__/camera.cpython-312.pyc differ diff --git a/__pycache__/fft.cpython-312.pyc b/__pycache__/fft.cpython-312.pyc new file mode 100644 index 0000000..75fb418 Binary files /dev/null and b/__pycache__/fft.cpython-312.pyc differ diff --git a/camera.py b/camera.py new file mode 100644 index 0000000..d222678 --- /dev/null +++ b/camera.py @@ -0,0 +1,79 @@ +import cv2 as cv +import numpy as np + +class camera(): + def __init__( + self, + window_size: int, + window_height: int, + display_size: tuple, + device_id: int = 0 + ): + + self.window_size = window_size + self.window_height = window_height + self.display_size = display_size + + self.camera = cv.VideoCapture(device_id) + self.homography = None + + cv.namedWindow("display", cv.WINDOW_NORMAL) + + def calibrate( + self + ): + calibration_image = cv.imread("calibration/calibration.jpg") + calibration_image = cv.resize(calibration_image, self.display_size, cv.INTER_NEAREST) + + cv.imshow("display", calibration_image) + cv.waitKey(0) + _, capture = camera.read() + + # detect SIFT keypoints + sift = cv.SIFT_create() + kp1, des1 = sift.detectAndCompute(calibration_image, None) + kp2, des2 = sift.detectAndCompute(capture, None) + + # get good matches between calibration image and the captured image + flann = cv.FlannBasedMatcher( + {"algorithm": 1, "trees": 5}, + {"checks": 50} + ) + matches = flann.knnMatch(des1, des2, k=2) + + #get good matches via ratio test + good = [] + for m,n in matches: + if m.distance < 0.7*n.distance: + good.append(m) + + if len(good)>10: + src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) + dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) + self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) + + else: + print("calibration failed") + + def display( + self, + image: np.ndarray + ) -> None: + + image = cv.resize(image, self.display_size, cv.INTER_NEAREST) + cv.imshow("display", image) + cv.waitKey(1) + + def capture( + self + ) -> np.ndarray: + + image = self.camera.read() + if self.homography is not None: + image = cv.warpPerspective(image, self.homography, self.display_size) + image = cv.resize(image, (self.window_size, self.window_height), cv.INTER_NEAREST) + image = match_histograms(image, display, channel_axis=-1) + + return image + + diff --git a/fft.py b/fft.py new file mode 100644 index 0000000..2950929 --- /dev/null +++ b/fft.py @@ -0,0 +1,77 @@ +import numpy as np +import cv2 as cv + +class fft(): + def __init__( + self, + window_size: int, + hop_size: int + ): + self.window_size = window_size + self.hop_size = hop_size + self.window = np.hanning(window_size) + + self.lower_limit = -40 + self.upper_limit = 100 + + self.amplitude_max = 180 + self.amplitude_min = 0 + self.angle_max = 255 + self.angle_min = 100 + + self.amplitude_relative = self.amplitude_max - self.amplitude_min + self.angle_relative = self.angle_max - self.angle_min + + + def stft( + self, + data: np.ndarray + ) -> np.ndarray: + + segment = data * self.window + spectrum = np.fft.fft(segment) / self.window_size + + amplitude = np.abs(spectrum) + amplitude = 20*np.log10(amplitude) + amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit) + amplitude -= self.lower_limit + amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min + + angle = np.angle(spectrum) + angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min + + full = np.full(angle.shape, fill_value=255) + + image = np.stack((amplitude, angle, full), axis=-1) + image = np.array([image], dtype=np.uint8) + + image = cv.cvtColor(image, cv.COLOR_HSV2BGR) + + return image + + def istft( + self, + image: np.ndarray + ) -> np.ndarray: + + image = cv.cvtColor(image, cv.COLOR_BGR2HSV) + + amplitude = image[0][...,0].astype(np.float64) + angle = image[0][...,1].astype(np.float64) + + amplitude -= self.amplitude_min + amplitude /= (self.amplitude_relative / self.upper_limit) + amplitude += self.lower_limit + amplitude = np.power(10, amplitude / 20) + + angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi + + real = np.cos(angle) * amplitude + imag = np.sin(angle) * amplitude + segment = real + (1j * imag) + + data = np.fft.ifft(segment * self.window_size).real + + return data + + diff --git a/file.py b/file.py new file mode 100644 index 0000000..1ca680c --- /dev/null +++ b/file.py @@ -0,0 +1,69 @@ +import cv2 as cv +import numpy as np +from scipy.io import wavfile +import scipy.signal as sps +import matplotlib.pyplot as plt +from multiprocessing import Pool +from camera import camera +from fft import fft + +""" +notes: +- window size + the time to generate the spectrum is logaritmically related to the window size + bigger windows are exponentially better so you should prefer this if possible + obviously the biggest you can use is the size of your display unless you have + some way of arranging the pixles independant of the orrigional spectrogram +""" + +sample_rate, data = wavfile.read("/home/will/Downloads/number-station.wav") +new_rate = 22_050. + +sample_count = round(len(data) * new_rate / sample_rate) +data = sps.resample(data, sample_count) +sample_rate = int(new_rate) + +data = [data[i] for i in range(0, len(data), 2)] +sample_rate = sample_rate // 2 + +window_size = 250 +window_height = 125 + +hop_size = window_size // 2 +camera = camera(window_size, window_height, (1000, 1000)) +transform = fft(window_size, hop_size) + +segment_samples = window_height * hop_size + +padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1) +data = np.concatenate((data, padding)) + +recovered_data = np.zeros(data.shape) + +segment_count = round(len(data) / segment_samples) + +for segment_index in range(segment_count): + segment_start = segment_index * segment_samples + rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)] + with Pool() as p: + mapping = p.map(transform.stft, rows) + + spectrum = np.array(mapping)[:,0,...] + + camera.display(spectrum) + + rows = [np.array([i]) for i in spectrum] + with Pool() as p: + recovered = np.array(p.map(transform.istft, rows)) + + for i, row in enumerate(recovered): + row_start = i * hop_size + recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row + +wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16)) + +difference = (data - recovered_data)[1000:251000] + + + + diff --git a/main.py b/main.py deleted file mode 100644 index 229a797..0000000 --- a/main.py +++ /dev/null @@ -1,220 +0,0 @@ -import cv2 as cv -import numpy as np -from scipy.io import wavfile -import scipy.signal as sps -import matplotlib.pyplot as plt -from multiprocessing import Pool - -""" -notes: -- window size - the time to generate the spectrum is logaritmically related to the window size - bigger windows are exponentially better so you should prefer this if possible - obviously the biggest you can use is the size of your display unless you have - some way of arranging the pixles independant of the orrigional spectrogram -""" - -class camera(): - def __init__( - self, - window_size: int, - window_height: int, - device_id: int = 0 - ): - - self.window_size = window_size - self.window_height = window_height - - self.camera = cv.VideoCapture(device_id) - self.homography = None - - cv.namedWindow("display", cv.WINDOW_NORMAL) - - def calibrate( - self - ): - calibration_image = cv.imread("calibration/calibration.jpg") - calibration_image = cv.resize(calibration_image, (self.window_size, self.window_height), cv.INTER_NEAREST) - - cv.imshow("display", calibration_image) - cv.waitKey(0) - _, capture = camera.read() - - # detect SIFT keypoints - sift = cv.SIFT_create() - kp1, des1 = sift.detectAndCompute(calibration_image, None) - kp2, des2 = sift.detectAndCompute(capture, None) - - # get good matches between calibration image and the captured image - flann = cv.FlannBasedMatcher( - {"algorithm": 1, "trees": 5}, - {"checks": 50} - ) - matches = flann.knnMatch(des1, des2, k=2) - - #get good matches via ratio test - good = [] - for m,n in matches: - if m.distance < 0.7*n.distance: - good.append(m) - - if len(good)>10: - src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) - dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) - - self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) - - else: - print("calibration failed") - - def display( - self, - image: np.ndarray - ) -> None: - - cv.imshow("display", image) - cv.waitKey(1) - - def capture( - self - ) -> np.ndarray: - - image = self.camera.read() - if self.homography is not None: - image = cv.warpPerspective(image, self.homography, (self.display_size, self.display_height)) - image = match_histograms(image, display, channel_axis=-1) - - return image - -class fft(): - def __init__( - self, - window_size: int, - hop_size: int - ): - self.window_size = window_size - self.hop_size = hop_size - self.window = np.hanning(window_size) - - self.lower_limit = -40 - self.upper_limit = 100 - - self.amplitude_max = 180 - self.amplitude_min = 0 - self.angle_max = 255 - self.angle_min = 100 - - self.amplitude_relative = self.amplitude_max - self.amplitude_min - self.angle_relative = self.angle_max - self.angle_min - - - def stft( - self, - data: np.ndarray - ) -> np.ndarray: - - segment = data * self.window - spectrum = np.fft.fft(segment) / self.window_size - - amplitude = np.abs(spectrum) - amplitude = 20*np.log10(amplitude) - amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit) - amplitude -= self.lower_limit - amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min - - angle = np.angle(spectrum) - angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min - - full = np.full(angle.shape, fill_value=255) - - image = np.stack((amplitude, angle, full), axis=-1) - image = np.array([image], dtype=np.uint8) - - image = cv.cvtColor(image, cv.COLOR_HSV2BGR) - - return image - - def istft( - self, - image: np.ndarray - ) -> np.ndarray: - - image = cv.cvtColor(image, cv.COLOR_BGR2HSV) - - amplitude = image[0][...,0].astype(np.float64) - angle = image[0][...,1].astype(np.float64) - - amplitude -= self.amplitude_min - amplitude /= (self.amplitude_relative / self.upper_limit) - amplitude += self.lower_limit - amplitude = np.power(10, amplitude / 20) - - angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi - - real = np.cos(angle) * amplitude - imag = np.sin(angle) * amplitude - segment = real + (1j * imag) - - data = np.fft.ifft(segment * self.window_size).real - - return data - -sample_rate, data = wavfile.read("/home/will/Music/George Michael - Careless Whisper.wav") -new_rate = 22_050. - -sample_count = round(len(data) * new_rate / sample_rate) -data = sps.resample(data, sample_count) -sample_rate = int(new_rate) - -data = [data[i] for i in range(0, len(data), 2)] -sample_rate = sample_rate // 2 - -window_size = 1_000 -window_height = 500 - -hop_size = window_size // 2 -camera = camera(window_size, window_height) -transform = fft(window_size, hop_size) - -segment_samples = window_height * hop_size - -padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1) -data = np.concatenate((data, padding)) - -recovered_data = np.zeros(data.shape) - -segment_count = round(len(data) / segment_samples) - -for segment_index in range(segment_count): - segment_start = segment_index * segment_samples - rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)] - with Pool() as p: - mapping = p.map(transform.stft, rows) - - spectrum = np.array(mapping)[:,0,...] - - # do the silly capture thing here - cv.imshow("display", spectrum) - cv.waitKey(1) - - rows = [np.array([i]) for i in spectrum] - with Pool() as p: - recovered = np.array(p.map(transform.istft, rows)) - - for i, row in enumerate(recovered): - row_start = i * hop_size - recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row - -wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16)) - -difference = (data - recovered_data)[1000:251000] - -plt.style.use('dark_background') -fig, (ax1, ax2) = plt.subplots(nrows=2) -ax1.plot(difference) -Pxx, freqs, bins, im = ax2.specgram(difference, NFFT=1014, Fs=1/0.0005) -plt.show() - - - - diff --git a/out.wav b/out.wav index baf075e..b56a8b7 100644 Binary files a/out.wav and b/out.wav differ diff --git a/rec.wav b/rec.wav new file mode 100644 index 0000000..d365d58 Binary files /dev/null and b/rec.wav differ diff --git a/stream.py b/stream.py new file mode 100644 index 0000000..18daf7b --- /dev/null +++ b/stream.py @@ -0,0 +1,102 @@ +from struct import unpack +import numpy as np +from scipy.io import wavfile +import matplotlib.pyplot as plt +from multiprocessing import Pool +from camera import camera +from fft import fft +import pyaudio +import sys +import os +import wave + +""" +notes: +- window size + the time to generate the spectrum is logaritmically related to the window size + bigger windows are exponentially better so you should prefer this if possible + obviously the biggest you can use is the size of your display unless you have + some way of arranging the pixles independant of the orrigional spectrogram +- read size (the window size) + this is the amount of data that is read from the audio device at one time + i belive the maximum for this specific device is 990? its something to do with + the number of channels and the sample rate... + +every time the window width / 2 number of samples is available to read from the audio +device. the program puts that chunk of audio into the biffer. each chunk is then +appended to the last chunk. the last chunk (with no later chunk to append onto it) is +left in the buffer to provide a smooth transition between the images +""" + +window_width = 750 +window_height = 500 +sample_rate = 22_050 +channels = 1 + +hop_size = window_width // 2 +camera = camera(window_width, window_height, (1000, 1000)) +transform = fft(window_width, hop_size) + +pyaudio_object = pyaudio.PyAudio() +stream = pyaudio_object.open( + format = pyaudio.paInt16, + channels = channels, + rate = sample_rate, + input = True +) + +buffer = [] +spectrum = np.zeros((window_height, window_width, 3), dtype=np.uint8) +spectrum_index = 0 +audio = np.zeros((hop_size,), dtype=np.int16) + +try: + file = wave.open("out.wav", "wb") + file.setparams(( + channels, + 2, # sample width + sample_rate, + 0, + "NONE", # compression type + "NONE" # compression name + )) + + while stream.is_active(): + data = stream.read(hop_size, exception_on_overflow = False) + data = unpack(f"<{hop_size}h", data) + buffer.append(list(data)) + + if len(buffer) == 2: + spectrum[spectrum_index] = transform.stft(buffer[0] + buffer[1]) + spectrum_index += 1 + del buffer[0] + + camera.display(spectrum) + + if spectrum_index == window_height: + spectrum_index = 0 + + rows = [np.array([i]) for i in spectrum] + with Pool(3) as p: + recovered = np.array(p.map(transform.istft, rows), dtype=np.int16) + + for row in recovered: + audio[-hop_size:] += row[:hop_size] + audio = np.append(audio, row[hop_size:]) + + file.writeframes(audio[:-hop_size]) + audio = np.delete(audio, np.s_[:-hop_size]) + +except KeyboardInterrupt: + + stream.stop_stream() + stream.close() + pyaudio_object.terminate() + file.close() + + try: + sys.exit() + except SystemExit: + os._exit(130) + + diff --git a/test.py b/test.py deleted file mode 100644 index 89e73be..0000000 --- a/test.py +++ /dev/null @@ -1,26 +0,0 @@ -import wave - -import pyaudio - -def handler (in_data, frame_count, time_info, status): - print(in_data) - print(frame_count) - print(time_info) - print(status) - - (in_data, pyaudio.paContinue) - -CHUNK = 1024 - -p = pyaudio.PyAudio() - -stream = p.open( - format=pyaudio.paInt8, - channels=1, - rate=22_050, - input=True, - stream_callback=handler -) - -stream.close() -p.terminate() diff --git a/test.wav b/test.wav new file mode 100644 index 0000000..3dcd159 Binary files /dev/null and b/test.wav differ diff --git a/test2.wav b/test2.wav new file mode 100644 index 0000000..ea303ee Binary files /dev/null and b/test2.wav differ