From: will Date: Wed, 26 Jun 2024 00:11:54 +0000 (+0100) Subject: Version 3 complete rewrite X-Git-Url: https://git.ozva.co.uk/?a=commitdiff_plain;h=e00f63780e7ee49e76fcd345c18bc6429da89d16;p=audio-over-stft Version 3 complete rewrite --- diff --git a/data/data.wav b/data/data.wav deleted file mode 100644 index c78f1b9..0000000 Binary files a/data/data.wav and /dev/null differ diff --git a/main.py b/main.py index 06df698..83a73cd 100644 --- a/main.py +++ b/main.py @@ -1,201 +1,195 @@ -import os -import time import cv2 as cv -import screeninfo import numpy as np -from PIL import ImageGrab from scipy.io import wavfile -from skimage.exposure import match_histograms - -sample_rate, data = wavfile.read("./data/data.wav") -camera = cv.VideoCapture(0) - -window_size = 12_000 # the window size is the number of frequency bins -hop_size = 6_000 # size of each jump of the window -display_size = (900, 900) # SHOULD be greater than segment size otherwise youll get information loss -segment_size = 160 # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size -segment_parity = 1 # number of parity copies in the display -screen_id = 0 - -dummy = True -time_skip = False - -data = np.concatenate( - (data, np.zeros(( - # add empty samples to bring the size up to a multiple of hop + the window width - window_size + (hop_size - (len(data) % hop_size)) - ))) -) - -segment_count = round(len(data) / hop_size) - 1 # get the number of jumps required - -window = np.hanning(window_size) # window is half cosine so the overlap produces constant power - -result_array = np.empty((segment_count, window_size), dtype=np.complex128) # result array - -for i in range(segment_count): - segment_offset = hop_size * i - segment = data[segment_offset:segment_offset+window_size] # current segment of data - - window_segment = segment * window # multiply by the window - spectrum = np.fft.fft(window_segment) / window_size # take the Fourier Transform and scale by the number of samples - - result_array[i, :] = spectrum[:window_size] # append to the results array - - os.system("clear") - print(f"1/2 {round((i / segment_count) * 100)}%") - -result_array = np.transpose(result_array) - -result_real = np.concatenate(( # get the positive and negative (top and bottom) real arrays - np.where(result_array.real > 0., result_array.real, 0.1), - np.where(result_array.real < 0., result_array.real * -1, 0.1) -), axis=0) -result_imag = np.concatenate(( # get the positive and negative (top and bottom) imaginary arrays - np.where(result_array.imag > 0., result_array.imag, 0.1), - np.where(result_array.imag < 0., result_array.imag * -1, 0.1) -), axis=0) - -result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1) - -result = 20*np.log10(result) # scale to db -result = np.clip(result, -40, 200) # clip values +import matplotlib.pyplot as plt +from multiprocessing import Pool + +""" +notes: +- window size + the time to generate the spectrum is logaritmically related to the window size + bigger windows are exponentially better so you should prefer this if possible + obviously the biggest you can use is the size of your display unless you have + some way of arranging the pixles independant of the orrigional spectrogram +""" + +class camera(): + def __init__( + self, + window_size: int, + window_height: int, + device_id: int = 0 + ): + + self.window_size = window_size + self.window_height = window_height + + self.camera = cv.VideoCapture(device_id) + self.homography = None + + cv.namedWindow("display", cv.WINDOW_NORMAL) + + def calibrate( + self + ): + calibration_image = cv.imread("calibration/calibration.jpg") + calibration_image = cv.resize(calibration_image, (self.window_size, self.window_height), cv.INTER_NEAREST) + + cv.imshow("display", calibration_image) + cv.waitKey(0) + _, capture = camera.read() + + # detect SIFT keypoints + sift = cv.SIFT_create() + kp1, des1 = sift.detectAndCompute(calibration_image, None) + kp2, des2 = sift.detectAndCompute(capture, None) + + # get good matches between calibration image and the captured image + flann = cv.FlannBasedMatcher( + {"algorithm": 1, "trees": 5}, + {"checks": 50} + ) + matches = flann.knnMatch(des1, des2, k=2) + + #get good matches via ratio test + good = [] + for m,n in matches: + if m.distance < 0.7*n.distance: + good.append(m) + + if len(good)>10: + src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) + dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) + + self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) + + else: + print("calibration failed") + + def display( + self, + image: np.ndarray + ) -> None: + + cv.imshow("display", image) + cv.waitKey(1) -image = (result + 40) * 1.275 # put the data in range for an image + def capture( + self + ) -> np.ndarray: -image = np.array(np.rint(image), dtype=np.uint8) -recovered = np.zeros((image.shape), dtype=np.uint8) + image = self.camera.read() + if self.homography is not None: + image = cv.warpPerspective(image, self.homography, (self.display_size, self.display_height)) + image = match_histograms(image, display, channel_axis=-1) -cv.namedWindow("display") -cv.namedWindow("debug1") -cv.namedWindow("debug2") + return image -calibrated = False -while not calibrated and not dummy: - calibration_image = cv.imread("calibration/calibration.jpg") - calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST) - cv.imshow("display", calibration_image) - cv.waitKey(0) - _, capture = camera.read() +class fft(): + def __init__( + self, + window_size: int, + hop_size: int + ): + self.window_size = window_size + self.hop_size = hop_size + self.window = np.hanning(window_size) - # detect SIFT keypoints - sift = cv.SIFT_create() - kp1, des1 = sift.detectAndCompute(calibration_image,None) - kp2, des2 = sift.detectAndCompute(capture,None) + self.lower_limit = -40 + self.upper_limit = 100 - # get good matches between calibration image and the captured image - FLANN_INDEX_KDTREE = 1 - index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5) - search_params = dict(checks = 50) - flann = cv.FlannBasedMatcher(index_params, search_params) - matches = flann.knnMatch(des1,des2,k=2) - #get good matches via ratio test - good = [] - for m,n in matches: - if m.distance < 0.7*n.distance: - good.append(m) + def stft( + self, + data: np.ndarray + ) -> np.ndarray: - if len(good)>10: - src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) - dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) - homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) + segment = data * self.window + spectrum = np.fft.fft(segment) / self.window_size - img3 = cv.warpPerspective(capture, homography, display_size) + amplitude = np.abs(spectrum) + amplitude = 20*np.log10(amplitude) + amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit) + amplitude -= self.lower_limit + amplitude *= (255 / self.upper_limit) - calibrated = True - print("calibrated") + angle = np.angle(spectrum) + angle = (angle + np.pi) * (255 / (2 * np.pi)) - cv.imshow("display", img3) - cv.waitKey(1) - else: - print("retrying calibration") + image = np.stack((amplitude, angle, amplitude), axis=-1) + image = np.array([image], dtype=np.uint8) - calibrated = True + return image -frame_time = (window_size * 2) / (sample_rate / window_size) + def istft( + self, + image: np.ndarray + ) -> np.ndarray: -for i in range(image.shape[1]): - time_start = time.time() - segment = np.copy(image[:,i]) # get the column of the image we are to work on + amplitude = image[0][...,0].astype(np.float64) + angle = image[0][...,1].astype(np.float64) - columns = round(len(segment) / segment_size) # get the number of columns to split the single column into - segment = np.reshape(segment, (segment_size, columns, 3)) # reshape the column into a 3d array + amplitude /= (255 / self.upper_limit) + amplitude += self.lower_limit + amplitude = np.power(10, amplitude / 20) - segment = np.concatenate((segment,)*segment_parity, axis=1) # affix the object with 5 parity copies + angle = (angle / (255 / (2 * np.pi))) - np.pi - display = cv.resize(segment, display_size, cv.INTER_NEAREST) # resize the array for display + real = np.cos(angle) * amplitude + imag = np.sin(angle) * amplitude + segment = real + (1j * imag) - cv.imshow("display", display) # show image to display - cv.waitKey(1) # wait till capture + data = np.fft.ifft(segment * self.window_size).real - if dummy: capture = display # pass the image to display straight to the capture - else: - good, capture = camera.read() # send the capture to the buffer - if not good: - print("capture failed") - print("diverting to dummy output") - dummy = True - capture = display + return data - cv.imshow("debug1", capture) # show image to display - capture = cv.warpPerspective(capture, homography, display_size) # fix distorition in the captured image and crop - capture = match_histograms(capture, display, channel_axis=-1) - cv.imshow("debug2", capture) # show image to display +sample_rate, data = wavfile.read("/home/will/Music/George Michael - Careless Whisper.wav") - capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST) # resize back to the segment size +window_size = 1_000 +window_height = 500 - recovered_segment = np.array_split(capture, segment_parity, axis=1) # split to list of parity copies - recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns]) # get the array of the parity copies - recovered_segment = np.mean(recovered_segment, axis=0) # get the mean of the parity copies +hop_size = window_size // 2 +camera = camera(window_size, window_height) +transform = fft(window_size, hop_size) - recovered_segment = np.reshape(recovered_segment, (2*window_size, 3)) # reshape to origional column +segment_samples = window_height * hop_size - recovered[:, i] = recovered_segment # insert into recovered data +padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1) +data = np.concatenate((data, padding)) - real_time = time.time() - time_start - wait_time = frame_time - real_time - if wait_time > 0 and time_skip: time.sleep(wait_time) - else: - os.system("clear") - print(f"running @ {round(real_time / frame_time, 2)}x realtime") +recovered_data = np.zeros(data.shape) -recovered = np.where(np.isreal(recovered), recovered, 0.1) # remove the nans introduces through transformation of a blank spectrogram -recovered = np.power(10, ((recovered / 1.275) - 40) / 20) # unscale from dB +segment_count = round(len(data) / segment_samples) -recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2 # revert the parity copy flipped in the red channel +for segment_index in range(segment_count): + segment_start = segment_index * segment_samples + rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)] + with Pool() as p: + mapping = p.map(transform.stft, rows) -recovered_real = np.array_split(recovered[...,0], 2) # split into two arrays each of the positive and negative component (top and bottom) -recovered_imag = np.array_split(recovered[...,1], 2) + spectrum = np.array(mapping)[:,0,...] -recovered_real = recovered_real[0] + (recovered_real[1] * -1) # revert the negative array and combine -recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1) -recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag) # transpose and make complex again -# because of the two transposes, the array might be (in total) flipped horisonally? -# this is not an issue for the audio signal + # do the silly capture thing here + cv.imshow("display", spectrum) + cv.waitKey(1) -recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size)) # empty array for the recovered signal + rows = [[i] for i in spectrum] + with Pool() as p: + recovered = np.array(p.map(transform.istft, rows)) -for i in range(recovered_array.shape[0]): - signal_offset = i * hop_size # get the sample offset of the range we insert - signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real # get the istft of the data - recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2 # add the data to the recovered signal + for i, row in enumerate(recovered): + row_start = i * hop_size + recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row - os.system("clear") - print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%") +wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16)) -recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram -# this is a quick fix for a bug introduced elsewhere +difference = data - recovered_data -recovered_signal = np.clip( # constrain the data to the max and min for the datatype - recovered_signal, - np.iinfo(np.int16).min, # max and min are system dependant so check dynamically - np.iinfo(np.int16).max -) +plt.style.use('dark_background') +fig, (ax1, ax2) = plt.subplots(nrows=2) +ax1.plot(difference) +Pxx, freqs, bins, im = ax2.specgram(difference, NFFT=1014, Fs=1/0.0005) +plt.show() -recovered_signal *= np.average(data) / np.average(recovered_signal) # normalize via the average of both of the audio signals -recovered_signal = np.array(recovered_signal, dtype=np.int16) # covert the data to the required data type -wavfile.write("out.wav", sample_rate, recovered_signal) -print("complete") diff --git a/old.py b/old.py new file mode 100644 index 0000000..06df698 --- /dev/null +++ b/old.py @@ -0,0 +1,201 @@ +import os +import time +import cv2 as cv +import screeninfo +import numpy as np +from PIL import ImageGrab +from scipy.io import wavfile +from skimage.exposure import match_histograms + +sample_rate, data = wavfile.read("./data/data.wav") +camera = cv.VideoCapture(0) + +window_size = 12_000 # the window size is the number of frequency bins +hop_size = 6_000 # size of each jump of the window +display_size = (900, 900) # SHOULD be greater than segment size otherwise youll get information loss +segment_size = 160 # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size +segment_parity = 1 # number of parity copies in the display +screen_id = 0 + +dummy = True +time_skip = False + +data = np.concatenate( + (data, np.zeros(( + # add empty samples to bring the size up to a multiple of hop + the window width + window_size + (hop_size - (len(data) % hop_size)) + ))) +) + +segment_count = round(len(data) / hop_size) - 1 # get the number of jumps required + +window = np.hanning(window_size) # window is half cosine so the overlap produces constant power + +result_array = np.empty((segment_count, window_size), dtype=np.complex128) # result array + +for i in range(segment_count): + segment_offset = hop_size * i + segment = data[segment_offset:segment_offset+window_size] # current segment of data + + window_segment = segment * window # multiply by the window + spectrum = np.fft.fft(window_segment) / window_size # take the Fourier Transform and scale by the number of samples + + result_array[i, :] = spectrum[:window_size] # append to the results array + + os.system("clear") + print(f"1/2 {round((i / segment_count) * 100)}%") + +result_array = np.transpose(result_array) + +result_real = np.concatenate(( # get the positive and negative (top and bottom) real arrays + np.where(result_array.real > 0., result_array.real, 0.1), + np.where(result_array.real < 0., result_array.real * -1, 0.1) +), axis=0) +result_imag = np.concatenate(( # get the positive and negative (top and bottom) imaginary arrays + np.where(result_array.imag > 0., result_array.imag, 0.1), + np.where(result_array.imag < 0., result_array.imag * -1, 0.1) +), axis=0) + +result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1) + +result = 20*np.log10(result) # scale to db +result = np.clip(result, -40, 200) # clip values + +image = (result + 40) * 1.275 # put the data in range for an image + +image = np.array(np.rint(image), dtype=np.uint8) +recovered = np.zeros((image.shape), dtype=np.uint8) + +cv.namedWindow("display") +cv.namedWindow("debug1") +cv.namedWindow("debug2") + +calibrated = False +while not calibrated and not dummy: + calibration_image = cv.imread("calibration/calibration.jpg") + calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST) + cv.imshow("display", calibration_image) + cv.waitKey(0) + _, capture = camera.read() + + # detect SIFT keypoints + sift = cv.SIFT_create() + kp1, des1 = sift.detectAndCompute(calibration_image,None) + kp2, des2 = sift.detectAndCompute(capture,None) + + # get good matches between calibration image and the captured image + FLANN_INDEX_KDTREE = 1 + index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5) + search_params = dict(checks = 50) + flann = cv.FlannBasedMatcher(index_params, search_params) + matches = flann.knnMatch(des1,des2,k=2) + #get good matches via ratio test + good = [] + for m,n in matches: + if m.distance < 0.7*n.distance: + good.append(m) + + if len(good)>10: + src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) + dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) + homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) + + img3 = cv.warpPerspective(capture, homography, display_size) + + calibrated = True + print("calibrated") + + cv.imshow("display", img3) + cv.waitKey(1) + else: + print("retrying calibration") + + calibrated = True + +frame_time = (window_size * 2) / (sample_rate / window_size) + +for i in range(image.shape[1]): + time_start = time.time() + segment = np.copy(image[:,i]) # get the column of the image we are to work on + + columns = round(len(segment) / segment_size) # get the number of columns to split the single column into + segment = np.reshape(segment, (segment_size, columns, 3)) # reshape the column into a 3d array + + segment = np.concatenate((segment,)*segment_parity, axis=1) # affix the object with 5 parity copies + + display = cv.resize(segment, display_size, cv.INTER_NEAREST) # resize the array for display + + cv.imshow("display", display) # show image to display + cv.waitKey(1) # wait till capture + + if dummy: capture = display # pass the image to display straight to the capture + else: + good, capture = camera.read() # send the capture to the buffer + if not good: + print("capture failed") + print("diverting to dummy output") + dummy = True + capture = display + + cv.imshow("debug1", capture) # show image to display + capture = cv.warpPerspective(capture, homography, display_size) # fix distorition in the captured image and crop + capture = match_histograms(capture, display, channel_axis=-1) + cv.imshow("debug2", capture) # show image to display + + capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST) # resize back to the segment size + + recovered_segment = np.array_split(capture, segment_parity, axis=1) # split to list of parity copies + recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns]) # get the array of the parity copies + recovered_segment = np.mean(recovered_segment, axis=0) # get the mean of the parity copies + + recovered_segment = np.reshape(recovered_segment, (2*window_size, 3)) # reshape to origional column + + recovered[:, i] = recovered_segment # insert into recovered data + + real_time = time.time() - time_start + wait_time = frame_time - real_time + if wait_time > 0 and time_skip: time.sleep(wait_time) + else: + os.system("clear") + print(f"running @ {round(real_time / frame_time, 2)}x realtime") + +recovered = np.where(np.isreal(recovered), recovered, 0.1) # remove the nans introduces through transformation of a blank spectrogram +recovered = np.power(10, ((recovered / 1.275) - 40) / 20) # unscale from dB + +recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2 # revert the parity copy flipped in the red channel + +recovered_real = np.array_split(recovered[...,0], 2) # split into two arrays each of the positive and negative component (top and bottom) +recovered_imag = np.array_split(recovered[...,1], 2) + +recovered_real = recovered_real[0] + (recovered_real[1] * -1) # revert the negative array and combine +recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1) +recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag) # transpose and make complex again +# because of the two transposes, the array might be (in total) flipped horisonally? +# this is not an issue for the audio signal + +recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size)) # empty array for the recovered signal + +for i in range(recovered_array.shape[0]): + signal_offset = i * hop_size # get the sample offset of the range we insert + signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real # get the istft of the data + recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2 # add the data to the recovered signal + + os.system("clear") + print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%") + +recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram +# this is a quick fix for a bug introduced elsewhere + +recovered_signal = np.clip( # constrain the data to the max and min for the datatype + recovered_signal, + np.iinfo(np.int16).min, # max and min are system dependant so check dynamically + np.iinfo(np.int16).max +) + +recovered_signal *= np.average(data) / np.average(recovered_signal) # normalize via the average of both of the audio signals + +recovered_signal = np.array(recovered_signal, dtype=np.int16) # covert the data to the required data type + +wavfile.write("out.wav", sample_rate, recovered_signal) + +print("complete") diff --git a/out.gif b/out.gif deleted file mode 100644 index b45b639..0000000 Binary files a/out.gif and /dev/null differ diff --git a/out.wav b/out.wav index d03eef7..232ced3 100644 Binary files a/out.wav and b/out.wav differ