+++ /dev/null
-import wave\r
-import random\r
-import time\r
-import cv2 as cv\r
-import numpy as np\r
-from scipy.signal import ShortTimeFFT\r
-from scipy.signal.windows import cosine\r
-from scipy.io import wavfile\r
-import os\r
-\r
-\r
-import matplotlib.pyplot as plt\r
-\r
-def calibrate(windowsize):\r
- print("Attempting calibration")\r
- calibrated2 = False\r
- while not calibrated2:\r
-\r
- calibrationimage = cv.imread("calibration.png")\r
- cv.imshow("display", calibrationimage)\r
- cv.waitKey(1)\r
- \r
- cameraimage = cv.imread("test.jpg") #replace with taking a picture from the camera\r
-\r
- #detect SIFT keypoints\r
- sift = cv.SIFT_create()\r
- kp1, des1 = sift.detectAndCompute(calibrationimage,None)\r
- kp2, des2 = sift.detectAndCompute(cameraimage,None)\r
-\r
- #cv2 bullshit\r
- FLANN_INDEX_KDTREE = 1\r
- index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)\r
- search_params = dict(checks = 50)\r
- flann = cv.FlannBasedMatcher(index_params, search_params)\r
- matches = flann.knnMatch(des1,des2,k=2)\r
- #get good matches via ratio test\r
- good = []\r
- for m,n in matches:\r
- if m.distance < 0.7*n.distance:\r
- good.append(m)\r
- \r
- #if theres enough matches\r
- if len(good)>10:\r
- src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)\r
- dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)\r
- M, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)\r
- \r
- img3 = cv.warpPerspective(cameraimage, M, windowsize)\r
-\r
- calibrated2 = True\r
- print("Calibration sucessfull")\r
-\r
- cv.imshow("display", img3)\r
- cv.waitKey(1)\r
- else:\r
- print("calibration unsucessfull - retrying...")\r
-\r
- return M\r
-\r
-def getSFT():\r
- width = 719\r
- w = cosine(width, sym=True)\r
- SFT = ShortTimeFFT(w, hop=1, fs=16_000, scale_to='magnitude') # hop was og width *2 but then it started shouting at me\r
-\r
- return SFT\r
-\r
-def getSTFT(data, start, stop):\r
-\r
- diff = (stop-start)-len(data[start:stop])\r
- if diff > 0:\r
- data = np.pad(data, (0, diff), 'constant')\r
-\r
- SFT = getSFT()\r
- Sx = SFT.stft(data, p0=start, p1=stop)\r
-\r
- Sx = 20*np.log10(Sx)\r
- real, imag = Sx.real, Sx.imag\r
-\r
- img = np.stack((real, imag, [real, imag][random.randint(0,1)]), axis=-1)\r
- img = np.array(img+128, dtype=np.uint8)\r
-\r
- return img\r
-\r
-def getISIFT(img, predata, step):\r
-\r
- img = np.array(img, dtype=np.float64)-128\r
-\r
- real, imag = img[...,0], img[...,1]\r
-\r
- Sx = np.vectorize(complex)(real, imag)\r
- Sx = np.power(10, Sx/20) # i think this bit is wrong\r
-\r
- SFT = getSFT()\r
- data = SFT.istft(Sx, k1=step)\r
- data = np.array(data, dtype=np.int16)\r
-\r
- return data\r
-\r
-def transmit(img, homo, windowsize):\r
-\r
- img = cv.resize(img, (1080, 720), cv.INTER_NEAREST)\r
-\r
- cv.imshow("display", img)\r
- cv.waitKey(1)\r
-\r
- #cap = cv.warpPerspective(cameraimage, homo, windowsize)\r
- \r
- img = cv.resize(img, (1080, 360), cv.INTER_NEAREST)\r
-\r
- return img\r
-\r
-if __name__ == "__main__":\r
-\r
- windowsize = (1080, 720)\r
-\r
- cv.namedWindow("display")\r
- homo = None\r
- #homo = calibrate()\r
-\r
- sr, data = wavfile.read("audio2.wav")\r
- data = np.array(data, dtype=np.int16)\r
- SFT = getSFT()\r
-\r
- step = 360\r
- newdata = np.zeros((1,), dtype=np.int16)\r
- try:\r
- for i in range(0, len(data), step):\r
- img = getSTFT(data, i, i+step)\r
- img = transmit(img, homo, windowsize)\r
- recovered = getISIFT(img, data[i:i+step], step)\r
- newdata = np.concatenate((newdata, recovered), axis=0)\r
- os.system('cls')\r
- print(f"total difference: {np.sum(abs(recovered-data[i:i+step]))}")\r
- print(f"origional data limits: {np.max(data), np.min(data)}")\r
- print(f"current data limits: {np.max(newdata), np.min(newdata)}")\r
- print(f"current data factor: {np.max(data)//np.max(newdata), np.min(data)//np.min(newdata)}")\r
- print(f"{round(i/data.shape[0], 2)*100}% done")\r
- except:\r
- print("errored out!")\r
-\r
- wavfile.write("out.wav", sr, newdata)
\ No newline at end of file
--- /dev/null
+import os
+import time
+import cv2 as cv
+import screeninfo
+import numpy as np
+from PIL import ImageGrab
+from scipy.io import wavfile
+from skimage.exposure import match_histograms
+
+sample_rate, data = wavfile.read("./data/data.wav")
+camera = cv.VideoCapture(0)
+
+window_size = 12_000 # the window size is the number of frequency bins
+hop_size = 6_000 # size of each jump of the window
+display_size = (900, 900) # SHOULD be greater than segment size otherwise youll get information loss
+segment_size = 160 # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size
+segment_parity = 1 # number of parity copies in the display
+screen_id = 0
+
+dummy = True
+time_skip = False
+
+data = np.concatenate(
+ (data, np.zeros((
+ # add empty samples to bring the size up to a multiple of hop + the window width
+ window_size + (hop_size - (len(data) % hop_size))
+ )))
+)
+
+segment_count = round(len(data) / hop_size) - 1 # get the number of jumps required
+
+window = np.hanning(window_size) # window is half cosine so the overlap produces constant power
+
+result_array = np.empty((segment_count, window_size), dtype=np.complex128) # result array
+
+for i in range(segment_count):
+ segment_offset = hop_size * i
+ segment = data[segment_offset:segment_offset+window_size] # current segment of data
+
+ window_segment = segment * window # multiply by the window
+ spectrum = np.fft.fft(window_segment) / window_size # take the Fourier Transform and scale by the number of samples
+
+ result_array[i, :] = spectrum[:window_size] # append to the results array
+
+ os.system("clear")
+ print(f"1/2 {round((i / segment_count) * 100)}%")
+
+result_array = np.transpose(result_array)
+
+result_real = np.concatenate(( # get the positive and negative (top and bottom) real arrays
+ np.where(result_array.real > 0., result_array.real, 0.1),
+ np.where(result_array.real < 0., result_array.real * -1, 0.1)
+), axis=0)
+result_imag = np.concatenate(( # get the positive and negative (top and bottom) imaginary arrays
+ np.where(result_array.imag > 0., result_array.imag, 0.1),
+ np.where(result_array.imag < 0., result_array.imag * -1, 0.1)
+), axis=0)
+
+result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1)
+
+result = 20*np.log10(result) # scale to db
+result = np.clip(result, -40, 200) # clip values
+
+image = (result + 40) * 1.275 # put the data in range for an image
+
+image = np.array(np.rint(image), dtype=np.uint8)
+recovered = np.zeros((image.shape), dtype=np.uint8)
+
+cv.namedWindow("display")
+cv.namedWindow("debug1")
+cv.namedWindow("debug2")
+
+calibrated = False
+while not calibrated and not dummy:
+ calibration_image = cv.imread("calibration/calibration.jpg")
+ calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST)
+ cv.imshow("display", calibration_image)
+ cv.waitKey(0)
+ _, capture = camera.read()
+
+ # detect SIFT keypoints
+ sift = cv.SIFT_create()
+ kp1, des1 = sift.detectAndCompute(calibration_image,None)
+ kp2, des2 = sift.detectAndCompute(capture,None)
+
+ # get good matches between calibration image and the captured image
+ FLANN_INDEX_KDTREE = 1
+ index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
+ search_params = dict(checks = 50)
+ flann = cv.FlannBasedMatcher(index_params, search_params)
+ matches = flann.knnMatch(des1,des2,k=2)
+ #get good matches via ratio test
+ good = []
+ for m,n in matches:
+ if m.distance < 0.7*n.distance:
+ good.append(m)
+
+ if len(good)>10:
+ src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+ dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+ homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+
+ img3 = cv.warpPerspective(capture, homography, display_size)
+
+ calibrated = True
+ print("calibrated")
+
+ cv.imshow("display", img3)
+ cv.waitKey(1)
+ else:
+ print("retrying calibration")
+
+ calibrated = True
+
+frame_time = (window_size * 2) / (sample_rate / window_size)
+
+for i in range(image.shape[1]):
+ time_start = time.time()
+ segment = np.copy(image[:,i]) # get the column of the image we are to work on
+
+ columns = round(len(segment) / segment_size) # get the number of columns to split the single column into
+ segment = np.reshape(segment, (segment_size, columns, 3)) # reshape the column into a 3d array
+
+ segment = np.concatenate((segment,)*segment_parity, axis=1) # affix the object with 5 parity copies
+
+ display = cv.resize(segment, display_size, cv.INTER_NEAREST) # resize the array for display
+
+ cv.imshow("display", display) # show image to display
+ cv.waitKey(1) # wait till capture
+
+ if dummy: capture = display # pass the image to display straight to the capture
+ else:
+ good, capture = camera.read() # send the capture to the buffer
+ if not good:
+ print("capture failed")
+ print("diverting to dummy output")
+ dummy = True
+ capture = display
+
+ cv.imshow("debug1", capture) # show image to display
+ capture = cv.warpPerspective(capture, homography, display_size) # fix distorition in the captured image and crop
+ capture = match_histograms(capture, display, channel_axis=-1)
+ cv.imshow("debug2", capture) # show image to display
+
+ capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST) # resize back to the segment size
+
+ recovered_segment = np.array_split(capture, segment_parity, axis=1) # split to list of parity copies
+ recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns]) # get the array of the parity copies
+ recovered_segment = np.mean(recovered_segment, axis=0) # get the mean of the parity copies
+
+ recovered_segment = np.reshape(recovered_segment, (2*window_size, 3)) # reshape to origional column
+
+ recovered[:, i] = recovered_segment # insert into recovered data
+
+ real_time = time.time() - time_start
+ wait_time = frame_time - real_time
+ if wait_time > 0 and time_skip: time.sleep(wait_time)
+ else:
+ os.system("clear")
+ print(f"running @ {round(real_time / frame_time, 2)}x realtime")
+
+recovered = np.where(np.isreal(recovered), recovered, 0.1) # remove the nans introduces through transformation of a blank spectrogram
+recovered = np.power(10, ((recovered / 1.275) - 40) / 20) # unscale from dB
+
+recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2 # revert the parity copy flipped in the red channel
+
+recovered_real = np.array_split(recovered[...,0], 2) # split into two arrays each of the positive and negative component (top and bottom)
+recovered_imag = np.array_split(recovered[...,1], 2)
+
+recovered_real = recovered_real[0] + (recovered_real[1] * -1) # revert the negative array and combine
+recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1)
+recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag) # transpose and make complex again
+# because of the two transposes, the array might be (in total) flipped horisonally?
+# this is not an issue for the audio signal
+
+recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size)) # empty array for the recovered signal
+
+for i in range(recovered_array.shape[0]):
+ signal_offset = i * hop_size # get the sample offset of the range we insert
+ signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real # get the istft of the data
+ recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2 # add the data to the recovered signal
+
+ os.system("clear")
+ print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%")
+
+recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram
+# this is a quick fix for a bug introduced elsewhere
+
+recovered_signal = np.clip( # constrain the data to the max and min for the datatype
+ recovered_signal,
+ np.iinfo(np.int16).min, # max and min are system dependant so check dynamically
+ np.iinfo(np.int16).max
+)
+
+recovered_signal *= np.average(data) / np.average(recovered_signal) # normalize via the average of both of the audio signals
+
+recovered_signal = np.array(recovered_signal, dtype=np.int16) # covert the data to the required data type
+
+wavfile.write("out.wav", sample_rate, recovered_signal)
+
+print("complete")