From: will Date: Thu, 20 Jun 2024 14:03:52 +0000 (+0100) Subject: done (: X-Git-Url: https://git.ozva.co.uk/?a=commitdiff_plain;h=58a297f0203bef84f8212dcd7230cbda97ae5d7e;p=audio-over-stft done (: --- diff --git a/AOSTFT.py b/AOSTFT.py deleted file mode 100755 index a954048..0000000 --- a/AOSTFT.py +++ /dev/null @@ -1,141 +0,0 @@ -import wave -import random -import time -import cv2 as cv -import numpy as np -from scipy.signal import ShortTimeFFT -from scipy.signal.windows import cosine -from scipy.io import wavfile -import os - - -import matplotlib.pyplot as plt - -def calibrate(windowsize): - print("Attempting calibration") - calibrated2 = False - while not calibrated2: - - calibrationimage = cv.imread("calibration.png") - cv.imshow("display", calibrationimage) - cv.waitKey(1) - - cameraimage = cv.imread("test.jpg") #replace with taking a picture from the camera - - #detect SIFT keypoints - sift = cv.SIFT_create() - kp1, des1 = sift.detectAndCompute(calibrationimage,None) - kp2, des2 = sift.detectAndCompute(cameraimage,None) - - #cv2 bullshit - FLANN_INDEX_KDTREE = 1 - index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5) - search_params = dict(checks = 50) - flann = cv.FlannBasedMatcher(index_params, search_params) - matches = flann.knnMatch(des1,des2,k=2) - #get good matches via ratio test - good = [] - for m,n in matches: - if m.distance < 0.7*n.distance: - good.append(m) - - #if theres enough matches - if len(good)>10: - src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) - dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) - M, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) - - img3 = cv.warpPerspective(cameraimage, M, windowsize) - - calibrated2 = True - print("Calibration sucessfull") - - cv.imshow("display", img3) - cv.waitKey(1) - else: - print("calibration unsucessfull - retrying...") - - return M - -def getSFT(): - width = 719 - w = cosine(width, sym=True) - SFT = ShortTimeFFT(w, hop=1, fs=16_000, scale_to='magnitude') # hop was og width *2 but then it started shouting at me - - return SFT - -def getSTFT(data, start, stop): - - diff = (stop-start)-len(data[start:stop]) - if diff > 0: - data = np.pad(data, (0, diff), 'constant') - - SFT = getSFT() - Sx = SFT.stft(data, p0=start, p1=stop) - - Sx = 20*np.log10(Sx) - real, imag = Sx.real, Sx.imag - - img = np.stack((real, imag, [real, imag][random.randint(0,1)]), axis=-1) - img = np.array(img+128, dtype=np.uint8) - - return img - -def getISIFT(img, predata, step): - - img = np.array(img, dtype=np.float64)-128 - - real, imag = img[...,0], img[...,1] - - Sx = np.vectorize(complex)(real, imag) - Sx = np.power(10, Sx/20) # i think this bit is wrong - - SFT = getSFT() - data = SFT.istft(Sx, k1=step) - data = np.array(data, dtype=np.int16) - - return data - -def transmit(img, homo, windowsize): - - img = cv.resize(img, (1080, 720), cv.INTER_NEAREST) - - cv.imshow("display", img) - cv.waitKey(1) - - #cap = cv.warpPerspective(cameraimage, homo, windowsize) - - img = cv.resize(img, (1080, 360), cv.INTER_NEAREST) - - return img - -if __name__ == "__main__": - - windowsize = (1080, 720) - - cv.namedWindow("display") - homo = None - #homo = calibrate() - - sr, data = wavfile.read("audio2.wav") - data = np.array(data, dtype=np.int16) - SFT = getSFT() - - step = 360 - newdata = np.zeros((1,), dtype=np.int16) - try: - for i in range(0, len(data), step): - img = getSTFT(data, i, i+step) - img = transmit(img, homo, windowsize) - recovered = getISIFT(img, data[i:i+step], step) - newdata = np.concatenate((newdata, recovered), axis=0) - os.system('cls') - print(f"total difference: {np.sum(abs(recovered-data[i:i+step]))}") - print(f"origional data limits: {np.max(data), np.min(data)}") - print(f"current data limits: {np.max(newdata), np.min(newdata)}") - print(f"current data factor: {np.max(data)//np.max(newdata), np.min(data)//np.min(newdata)}") - print(f"{round(i/data.shape[0], 2)*100}% done") - except: - print("errored out!") - - wavfile.write("out.wav", sr, newdata) \ No newline at end of file diff --git a/calibration/calibration.jpg b/calibration/calibration.jpg new file mode 100644 index 0000000..54d79ba Binary files /dev/null and b/calibration/calibration.jpg differ diff --git a/calibration/calibration.png b/calibration/calibration.png deleted file mode 100755 index 40576b8..0000000 Binary files a/calibration/calibration.png and /dev/null differ diff --git a/calibration/calibration1.png b/calibration/calibration1.png deleted file mode 100755 index 83bb076..0000000 Binary files a/calibration/calibration1.png and /dev/null differ diff --git a/data/audio1.wav b/data/audio1.wav deleted file mode 100755 index e53bc92..0000000 Binary files a/data/audio1.wav and /dev/null differ diff --git a/data/audio2.wav b/data/audio2.wav deleted file mode 100755 index 70ac56b..0000000 Binary files a/data/audio2.wav and /dev/null differ diff --git a/data/data.wav b/data/data.wav new file mode 100644 index 0000000..c78f1b9 Binary files /dev/null and b/data/data.wav differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..06df698 --- /dev/null +++ b/main.py @@ -0,0 +1,201 @@ +import os +import time +import cv2 as cv +import screeninfo +import numpy as np +from PIL import ImageGrab +from scipy.io import wavfile +from skimage.exposure import match_histograms + +sample_rate, data = wavfile.read("./data/data.wav") +camera = cv.VideoCapture(0) + +window_size = 12_000 # the window size is the number of frequency bins +hop_size = 6_000 # size of each jump of the window +display_size = (900, 900) # SHOULD be greater than segment size otherwise youll get information loss +segment_size = 160 # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size +segment_parity = 1 # number of parity copies in the display +screen_id = 0 + +dummy = True +time_skip = False + +data = np.concatenate( + (data, np.zeros(( + # add empty samples to bring the size up to a multiple of hop + the window width + window_size + (hop_size - (len(data) % hop_size)) + ))) +) + +segment_count = round(len(data) / hop_size) - 1 # get the number of jumps required + +window = np.hanning(window_size) # window is half cosine so the overlap produces constant power + +result_array = np.empty((segment_count, window_size), dtype=np.complex128) # result array + +for i in range(segment_count): + segment_offset = hop_size * i + segment = data[segment_offset:segment_offset+window_size] # current segment of data + + window_segment = segment * window # multiply by the window + spectrum = np.fft.fft(window_segment) / window_size # take the Fourier Transform and scale by the number of samples + + result_array[i, :] = spectrum[:window_size] # append to the results array + + os.system("clear") + print(f"1/2 {round((i / segment_count) * 100)}%") + +result_array = np.transpose(result_array) + +result_real = np.concatenate(( # get the positive and negative (top and bottom) real arrays + np.where(result_array.real > 0., result_array.real, 0.1), + np.where(result_array.real < 0., result_array.real * -1, 0.1) +), axis=0) +result_imag = np.concatenate(( # get the positive and negative (top and bottom) imaginary arrays + np.where(result_array.imag > 0., result_array.imag, 0.1), + np.where(result_array.imag < 0., result_array.imag * -1, 0.1) +), axis=0) + +result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1) + +result = 20*np.log10(result) # scale to db +result = np.clip(result, -40, 200) # clip values + +image = (result + 40) * 1.275 # put the data in range for an image + +image = np.array(np.rint(image), dtype=np.uint8) +recovered = np.zeros((image.shape), dtype=np.uint8) + +cv.namedWindow("display") +cv.namedWindow("debug1") +cv.namedWindow("debug2") + +calibrated = False +while not calibrated and not dummy: + calibration_image = cv.imread("calibration/calibration.jpg") + calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST) + cv.imshow("display", calibration_image) + cv.waitKey(0) + _, capture = camera.read() + + # detect SIFT keypoints + sift = cv.SIFT_create() + kp1, des1 = sift.detectAndCompute(calibration_image,None) + kp2, des2 = sift.detectAndCompute(capture,None) + + # get good matches between calibration image and the captured image + FLANN_INDEX_KDTREE = 1 + index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5) + search_params = dict(checks = 50) + flann = cv.FlannBasedMatcher(index_params, search_params) + matches = flann.knnMatch(des1,des2,k=2) + #get good matches via ratio test + good = [] + for m,n in matches: + if m.distance < 0.7*n.distance: + good.append(m) + + if len(good)>10: + src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2) + dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2) + homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0) + + img3 = cv.warpPerspective(capture, homography, display_size) + + calibrated = True + print("calibrated") + + cv.imshow("display", img3) + cv.waitKey(1) + else: + print("retrying calibration") + + calibrated = True + +frame_time = (window_size * 2) / (sample_rate / window_size) + +for i in range(image.shape[1]): + time_start = time.time() + segment = np.copy(image[:,i]) # get the column of the image we are to work on + + columns = round(len(segment) / segment_size) # get the number of columns to split the single column into + segment = np.reshape(segment, (segment_size, columns, 3)) # reshape the column into a 3d array + + segment = np.concatenate((segment,)*segment_parity, axis=1) # affix the object with 5 parity copies + + display = cv.resize(segment, display_size, cv.INTER_NEAREST) # resize the array for display + + cv.imshow("display", display) # show image to display + cv.waitKey(1) # wait till capture + + if dummy: capture = display # pass the image to display straight to the capture + else: + good, capture = camera.read() # send the capture to the buffer + if not good: + print("capture failed") + print("diverting to dummy output") + dummy = True + capture = display + + cv.imshow("debug1", capture) # show image to display + capture = cv.warpPerspective(capture, homography, display_size) # fix distorition in the captured image and crop + capture = match_histograms(capture, display, channel_axis=-1) + cv.imshow("debug2", capture) # show image to display + + capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST) # resize back to the segment size + + recovered_segment = np.array_split(capture, segment_parity, axis=1) # split to list of parity copies + recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns]) # get the array of the parity copies + recovered_segment = np.mean(recovered_segment, axis=0) # get the mean of the parity copies + + recovered_segment = np.reshape(recovered_segment, (2*window_size, 3)) # reshape to origional column + + recovered[:, i] = recovered_segment # insert into recovered data + + real_time = time.time() - time_start + wait_time = frame_time - real_time + if wait_time > 0 and time_skip: time.sleep(wait_time) + else: + os.system("clear") + print(f"running @ {round(real_time / frame_time, 2)}x realtime") + +recovered = np.where(np.isreal(recovered), recovered, 0.1) # remove the nans introduces through transformation of a blank spectrogram +recovered = np.power(10, ((recovered / 1.275) - 40) / 20) # unscale from dB + +recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2 # revert the parity copy flipped in the red channel + +recovered_real = np.array_split(recovered[...,0], 2) # split into two arrays each of the positive and negative component (top and bottom) +recovered_imag = np.array_split(recovered[...,1], 2) + +recovered_real = recovered_real[0] + (recovered_real[1] * -1) # revert the negative array and combine +recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1) +recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag) # transpose and make complex again +# because of the two transposes, the array might be (in total) flipped horisonally? +# this is not an issue for the audio signal + +recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size)) # empty array for the recovered signal + +for i in range(recovered_array.shape[0]): + signal_offset = i * hop_size # get the sample offset of the range we insert + signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real # get the istft of the data + recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2 # add the data to the recovered signal + + os.system("clear") + print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%") + +recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram +# this is a quick fix for a bug introduced elsewhere + +recovered_signal = np.clip( # constrain the data to the max and min for the datatype + recovered_signal, + np.iinfo(np.int16).min, # max and min are system dependant so check dynamically + np.iinfo(np.int16).max +) + +recovered_signal *= np.average(data) / np.average(recovered_signal) # normalize via the average of both of the audio signals + +recovered_signal = np.array(recovered_signal, dtype=np.int16) # covert the data to the required data type + +wavfile.write("out.wav", sample_rate, recovered_signal) + +print("complete") diff --git a/out.gif b/out.gif new file mode 100644 index 0000000..b45b639 Binary files /dev/null and b/out.gif differ diff --git a/out.wav b/out.wav old mode 100755 new mode 100644 index 60f58a3..d03eef7 Binary files a/out.wav and b/out.wav differ diff --git a/test.py b/test.py new file mode 100644 index 0000000..bce9ae7 --- /dev/null +++ b/test.py @@ -0,0 +1,5 @@ +import cv2 as cv + +image = cv.imread("calibration/calibration.jpg") + +cv.imwrite("calibration/calibration.jpg")