-import os
-import time
import cv2 as cv
-import screeninfo
import numpy as np
-from PIL import ImageGrab
from scipy.io import wavfile
-from skimage.exposure import match_histograms
-
-sample_rate, data = wavfile.read("./data/data.wav")
-camera = cv.VideoCapture(0)
-
-window_size = 12_000 # the window size is the number of frequency bins
-hop_size = 6_000 # size of each jump of the window
-display_size = (900, 900) # SHOULD be greater than segment size otherwise youll get information loss
-segment_size = 160 # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size
-segment_parity = 1 # number of parity copies in the display
-screen_id = 0
-
-dummy = True
-time_skip = False
-
-data = np.concatenate(
- (data, np.zeros((
- # add empty samples to bring the size up to a multiple of hop + the window width
- window_size + (hop_size - (len(data) % hop_size))
- )))
-)
-
-segment_count = round(len(data) / hop_size) - 1 # get the number of jumps required
-
-window = np.hanning(window_size) # window is half cosine so the overlap produces constant power
-
-result_array = np.empty((segment_count, window_size), dtype=np.complex128) # result array
-
-for i in range(segment_count):
- segment_offset = hop_size * i
- segment = data[segment_offset:segment_offset+window_size] # current segment of data
-
- window_segment = segment * window # multiply by the window
- spectrum = np.fft.fft(window_segment) / window_size # take the Fourier Transform and scale by the number of samples
-
- result_array[i, :] = spectrum[:window_size] # append to the results array
-
- os.system("clear")
- print(f"1/2 {round((i / segment_count) * 100)}%")
-
-result_array = np.transpose(result_array)
-
-result_real = np.concatenate(( # get the positive and negative (top and bottom) real arrays
- np.where(result_array.real > 0., result_array.real, 0.1),
- np.where(result_array.real < 0., result_array.real * -1, 0.1)
-), axis=0)
-result_imag = np.concatenate(( # get the positive and negative (top and bottom) imaginary arrays
- np.where(result_array.imag > 0., result_array.imag, 0.1),
- np.where(result_array.imag < 0., result_array.imag * -1, 0.1)
-), axis=0)
-
-result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1)
-
-result = 20*np.log10(result) # scale to db
-result = np.clip(result, -40, 200) # clip values
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+
+"""
+notes:
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+"""
+
+class camera():
+ def __init__(
+ self,
+ window_size: int,
+ window_height: int,
+ device_id: int = 0
+ ):
+
+ self.window_size = window_size
+ self.window_height = window_height
+
+ self.camera = cv.VideoCapture(device_id)
+ self.homography = None
+
+ cv.namedWindow("display", cv.WINDOW_NORMAL)
+
+ def calibrate(
+ self
+ ):
+ calibration_image = cv.imread("calibration/calibration.jpg")
+ calibration_image = cv.resize(calibration_image, (self.window_size, self.window_height), cv.INTER_NEAREST)
+
+ cv.imshow("display", calibration_image)
+ cv.waitKey(0)
+ _, capture = camera.read()
+
+ # detect SIFT keypoints
+ sift = cv.SIFT_create()
+ kp1, des1 = sift.detectAndCompute(calibration_image, None)
+ kp2, des2 = sift.detectAndCompute(capture, None)
+
+ # get good matches between calibration image and the captured image
+ flann = cv.FlannBasedMatcher(
+ {"algorithm": 1, "trees": 5},
+ {"checks": 50}
+ )
+ matches = flann.knnMatch(des1, des2, k=2)
+
+ #get good matches via ratio test
+ good = []
+ for m,n in matches:
+ if m.distance < 0.7*n.distance:
+ good.append(m)
+
+ if len(good)>10:
+ src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+ dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+
+ self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+
+ else:
+ print("calibration failed")
+
+ def display(
+ self,
+ image: np.ndarray
+ ) -> None:
+
+ cv.imshow("display", image)
+ cv.waitKey(1)
-image = (result + 40) * 1.275 # put the data in range for an image
+ def capture(
+ self
+ ) -> np.ndarray:
-image = np.array(np.rint(image), dtype=np.uint8)
-recovered = np.zeros((image.shape), dtype=np.uint8)
+ image = self.camera.read()
+ if self.homography is not None:
+ image = cv.warpPerspective(image, self.homography, (self.display_size, self.display_height))
+ image = match_histograms(image, display, channel_axis=-1)
-cv.namedWindow("display")
-cv.namedWindow("debug1")
-cv.namedWindow("debug2")
+ return image
-calibrated = False
-while not calibrated and not dummy:
- calibration_image = cv.imread("calibration/calibration.jpg")
- calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST)
- cv.imshow("display", calibration_image)
- cv.waitKey(0)
- _, capture = camera.read()
+class fft():
+ def __init__(
+ self,
+ window_size: int,
+ hop_size: int
+ ):
+ self.window_size = window_size
+ self.hop_size = hop_size
+ self.window = np.hanning(window_size)
- # detect SIFT keypoints
- sift = cv.SIFT_create()
- kp1, des1 = sift.detectAndCompute(calibration_image,None)
- kp2, des2 = sift.detectAndCompute(capture,None)
+ self.lower_limit = -40
+ self.upper_limit = 100
- # get good matches between calibration image and the captured image
- FLANN_INDEX_KDTREE = 1
- index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
- search_params = dict(checks = 50)
- flann = cv.FlannBasedMatcher(index_params, search_params)
- matches = flann.knnMatch(des1,des2,k=2)
- #get good matches via ratio test
- good = []
- for m,n in matches:
- if m.distance < 0.7*n.distance:
- good.append(m)
+ def stft(
+ self,
+ data: np.ndarray
+ ) -> np.ndarray:
- if len(good)>10:
- src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
- dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
- homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+ segment = data * self.window
+ spectrum = np.fft.fft(segment) / self.window_size
- img3 = cv.warpPerspective(capture, homography, display_size)
+ amplitude = np.abs(spectrum)
+ amplitude = 20*np.log10(amplitude)
+ amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
+ amplitude -= self.lower_limit
+ amplitude *= (255 / self.upper_limit)
- calibrated = True
- print("calibrated")
+ angle = np.angle(spectrum)
+ angle = (angle + np.pi) * (255 / (2 * np.pi))
- cv.imshow("display", img3)
- cv.waitKey(1)
- else:
- print("retrying calibration")
+ image = np.stack((amplitude, angle, amplitude), axis=-1)
+ image = np.array([image], dtype=np.uint8)
- calibrated = True
+ return image
-frame_time = (window_size * 2) / (sample_rate / window_size)
+ def istft(
+ self,
+ image: np.ndarray
+ ) -> np.ndarray:
-for i in range(image.shape[1]):
- time_start = time.time()
- segment = np.copy(image[:,i]) # get the column of the image we are to work on
+ amplitude = image[0][...,0].astype(np.float64)
+ angle = image[0][...,1].astype(np.float64)
- columns = round(len(segment) / segment_size) # get the number of columns to split the single column into
- segment = np.reshape(segment, (segment_size, columns, 3)) # reshape the column into a 3d array
+ amplitude /= (255 / self.upper_limit)
+ amplitude += self.lower_limit
+ amplitude = np.power(10, amplitude / 20)
- segment = np.concatenate((segment,)*segment_parity, axis=1) # affix the object with 5 parity copies
+ angle = (angle / (255 / (2 * np.pi))) - np.pi
- display = cv.resize(segment, display_size, cv.INTER_NEAREST) # resize the array for display
+ real = np.cos(angle) * amplitude
+ imag = np.sin(angle) * amplitude
+ segment = real + (1j * imag)
- cv.imshow("display", display) # show image to display
- cv.waitKey(1) # wait till capture
+ data = np.fft.ifft(segment * self.window_size).real
- if dummy: capture = display # pass the image to display straight to the capture
- else:
- good, capture = camera.read() # send the capture to the buffer
- if not good:
- print("capture failed")
- print("diverting to dummy output")
- dummy = True
- capture = display
+ return data
- cv.imshow("debug1", capture) # show image to display
- capture = cv.warpPerspective(capture, homography, display_size) # fix distorition in the captured image and crop
- capture = match_histograms(capture, display, channel_axis=-1)
- cv.imshow("debug2", capture) # show image to display
+sample_rate, data = wavfile.read("/home/will/Music/George Michael - Careless Whisper.wav")
- capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST) # resize back to the segment size
+window_size = 1_000
+window_height = 500
- recovered_segment = np.array_split(capture, segment_parity, axis=1) # split to list of parity copies
- recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns]) # get the array of the parity copies
- recovered_segment = np.mean(recovered_segment, axis=0) # get the mean of the parity copies
+hop_size = window_size // 2
+camera = camera(window_size, window_height)
+transform = fft(window_size, hop_size)
- recovered_segment = np.reshape(recovered_segment, (2*window_size, 3)) # reshape to origional column
+segment_samples = window_height * hop_size
- recovered[:, i] = recovered_segment # insert into recovered data
+padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1)
+data = np.concatenate((data, padding))
- real_time = time.time() - time_start
- wait_time = frame_time - real_time
- if wait_time > 0 and time_skip: time.sleep(wait_time)
- else:
- os.system("clear")
- print(f"running @ {round(real_time / frame_time, 2)}x realtime")
+recovered_data = np.zeros(data.shape)
-recovered = np.where(np.isreal(recovered), recovered, 0.1) # remove the nans introduces through transformation of a blank spectrogram
-recovered = np.power(10, ((recovered / 1.275) - 40) / 20) # unscale from dB
+segment_count = round(len(data) / segment_samples)
-recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2 # revert the parity copy flipped in the red channel
+for segment_index in range(segment_count):
+ segment_start = segment_index * segment_samples
+ rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+ with Pool() as p:
+ mapping = p.map(transform.stft, rows)
-recovered_real = np.array_split(recovered[...,0], 2) # split into two arrays each of the positive and negative component (top and bottom)
-recovered_imag = np.array_split(recovered[...,1], 2)
+ spectrum = np.array(mapping)[:,0,...]
-recovered_real = recovered_real[0] + (recovered_real[1] * -1) # revert the negative array and combine
-recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1)
-recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag) # transpose and make complex again
-# because of the two transposes, the array might be (in total) flipped horisonally?
-# this is not an issue for the audio signal
+ # do the silly capture thing here
+ cv.imshow("display", spectrum)
+ cv.waitKey(1)
-recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size)) # empty array for the recovered signal
+ rows = [[i] for i in spectrum]
+ with Pool() as p:
+ recovered = np.array(p.map(transform.istft, rows))
-for i in range(recovered_array.shape[0]):
- signal_offset = i * hop_size # get the sample offset of the range we insert
- signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real # get the istft of the data
- recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2 # add the data to the recovered signal
+ for i, row in enumerate(recovered):
+ row_start = i * hop_size
+ recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row
- os.system("clear")
- print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%")
+wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16))
-recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram
-# this is a quick fix for a bug introduced elsewhere
+difference = data - recovered_data
-recovered_signal = np.clip( # constrain the data to the max and min for the datatype
- recovered_signal,
- np.iinfo(np.int16).min, # max and min are system dependant so check dynamically
- np.iinfo(np.int16).max
-)
+plt.style.use('dark_background')
+fig, (ax1, ax2) = plt.subplots(nrows=2)
+ax1.plot(difference)
+Pxx, freqs, bins, im = ax2.specgram(difference, NFFT=1014, Fs=1/0.0005)
+plt.show()
-recovered_signal *= np.average(data) / np.average(recovered_signal) # normalize via the average of both of the audio signals
-recovered_signal = np.array(recovered_signal, dtype=np.int16) # covert the data to the required data type
-wavfile.write("out.wav", sample_rate, recovered_signal)
-print("complete")
--- /dev/null
+import os
+import time
+import cv2 as cv
+import screeninfo
+import numpy as np
+from PIL import ImageGrab
+from scipy.io import wavfile
+from skimage.exposure import match_histograms
+
+sample_rate, data = wavfile.read("./data/data.wav")
+camera = cv.VideoCapture(0)
+
+window_size = 12_000 # the window size is the number of frequency bins
+hop_size = 6_000 # size of each jump of the window
+display_size = (900, 900) # SHOULD be greater than segment size otherwise youll get information loss
+segment_size = 160 # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size
+segment_parity = 1 # number of parity copies in the display
+screen_id = 0
+
+dummy = True
+time_skip = False
+
+data = np.concatenate(
+ (data, np.zeros((
+ # add empty samples to bring the size up to a multiple of hop + the window width
+ window_size + (hop_size - (len(data) % hop_size))
+ )))
+)
+
+segment_count = round(len(data) / hop_size) - 1 # get the number of jumps required
+
+window = np.hanning(window_size) # window is half cosine so the overlap produces constant power
+
+result_array = np.empty((segment_count, window_size), dtype=np.complex128) # result array
+
+for i in range(segment_count):
+ segment_offset = hop_size * i
+ segment = data[segment_offset:segment_offset+window_size] # current segment of data
+
+ window_segment = segment * window # multiply by the window
+ spectrum = np.fft.fft(window_segment) / window_size # take the Fourier Transform and scale by the number of samples
+
+ result_array[i, :] = spectrum[:window_size] # append to the results array
+
+ os.system("clear")
+ print(f"1/2 {round((i / segment_count) * 100)}%")
+
+result_array = np.transpose(result_array)
+
+result_real = np.concatenate(( # get the positive and negative (top and bottom) real arrays
+ np.where(result_array.real > 0., result_array.real, 0.1),
+ np.where(result_array.real < 0., result_array.real * -1, 0.1)
+), axis=0)
+result_imag = np.concatenate(( # get the positive and negative (top and bottom) imaginary arrays
+ np.where(result_array.imag > 0., result_array.imag, 0.1),
+ np.where(result_array.imag < 0., result_array.imag * -1, 0.1)
+), axis=0)
+
+result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1)
+
+result = 20*np.log10(result) # scale to db
+result = np.clip(result, -40, 200) # clip values
+
+image = (result + 40) * 1.275 # put the data in range for an image
+
+image = np.array(np.rint(image), dtype=np.uint8)
+recovered = np.zeros((image.shape), dtype=np.uint8)
+
+cv.namedWindow("display")
+cv.namedWindow("debug1")
+cv.namedWindow("debug2")
+
+calibrated = False
+while not calibrated and not dummy:
+ calibration_image = cv.imread("calibration/calibration.jpg")
+ calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST)
+ cv.imshow("display", calibration_image)
+ cv.waitKey(0)
+ _, capture = camera.read()
+
+ # detect SIFT keypoints
+ sift = cv.SIFT_create()
+ kp1, des1 = sift.detectAndCompute(calibration_image,None)
+ kp2, des2 = sift.detectAndCompute(capture,None)
+
+ # get good matches between calibration image and the captured image
+ FLANN_INDEX_KDTREE = 1
+ index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
+ search_params = dict(checks = 50)
+ flann = cv.FlannBasedMatcher(index_params, search_params)
+ matches = flann.knnMatch(des1,des2,k=2)
+ #get good matches via ratio test
+ good = []
+ for m,n in matches:
+ if m.distance < 0.7*n.distance:
+ good.append(m)
+
+ if len(good)>10:
+ src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+ dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+ homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+
+ img3 = cv.warpPerspective(capture, homography, display_size)
+
+ calibrated = True
+ print("calibrated")
+
+ cv.imshow("display", img3)
+ cv.waitKey(1)
+ else:
+ print("retrying calibration")
+
+ calibrated = True
+
+frame_time = (window_size * 2) / (sample_rate / window_size)
+
+for i in range(image.shape[1]):
+ time_start = time.time()
+ segment = np.copy(image[:,i]) # get the column of the image we are to work on
+
+ columns = round(len(segment) / segment_size) # get the number of columns to split the single column into
+ segment = np.reshape(segment, (segment_size, columns, 3)) # reshape the column into a 3d array
+
+ segment = np.concatenate((segment,)*segment_parity, axis=1) # affix the object with 5 parity copies
+
+ display = cv.resize(segment, display_size, cv.INTER_NEAREST) # resize the array for display
+
+ cv.imshow("display", display) # show image to display
+ cv.waitKey(1) # wait till capture
+
+ if dummy: capture = display # pass the image to display straight to the capture
+ else:
+ good, capture = camera.read() # send the capture to the buffer
+ if not good:
+ print("capture failed")
+ print("diverting to dummy output")
+ dummy = True
+ capture = display
+
+ cv.imshow("debug1", capture) # show image to display
+ capture = cv.warpPerspective(capture, homography, display_size) # fix distorition in the captured image and crop
+ capture = match_histograms(capture, display, channel_axis=-1)
+ cv.imshow("debug2", capture) # show image to display
+
+ capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST) # resize back to the segment size
+
+ recovered_segment = np.array_split(capture, segment_parity, axis=1) # split to list of parity copies
+ recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns]) # get the array of the parity copies
+ recovered_segment = np.mean(recovered_segment, axis=0) # get the mean of the parity copies
+
+ recovered_segment = np.reshape(recovered_segment, (2*window_size, 3)) # reshape to origional column
+
+ recovered[:, i] = recovered_segment # insert into recovered data
+
+ real_time = time.time() - time_start
+ wait_time = frame_time - real_time
+ if wait_time > 0 and time_skip: time.sleep(wait_time)
+ else:
+ os.system("clear")
+ print(f"running @ {round(real_time / frame_time, 2)}x realtime")
+
+recovered = np.where(np.isreal(recovered), recovered, 0.1) # remove the nans introduces through transformation of a blank spectrogram
+recovered = np.power(10, ((recovered / 1.275) - 40) / 20) # unscale from dB
+
+recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2 # revert the parity copy flipped in the red channel
+
+recovered_real = np.array_split(recovered[...,0], 2) # split into two arrays each of the positive and negative component (top and bottom)
+recovered_imag = np.array_split(recovered[...,1], 2)
+
+recovered_real = recovered_real[0] + (recovered_real[1] * -1) # revert the negative array and combine
+recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1)
+recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag) # transpose and make complex again
+# because of the two transposes, the array might be (in total) flipped horisonally?
+# this is not an issue for the audio signal
+
+recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size)) # empty array for the recovered signal
+
+for i in range(recovered_array.shape[0]):
+ signal_offset = i * hop_size # get the sample offset of the range we insert
+ signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real # get the istft of the data
+ recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2 # add the data to the recovered signal
+
+ os.system("clear")
+ print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%")
+
+recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram
+# this is a quick fix for a bug introduced elsewhere
+
+recovered_signal = np.clip( # constrain the data to the max and min for the datatype
+ recovered_signal,
+ np.iinfo(np.int16).min, # max and min are system dependant so check dynamically
+ np.iinfo(np.int16).max
+)
+
+recovered_signal *= np.average(data) / np.average(recovered_signal) # normalize via the average of both of the audio signals
+
+recovered_signal = np.array(recovered_signal, dtype=np.int16) # covert the data to the required data type
+
+wavfile.write("out.wav", sample_rate, recovered_signal)
+
+print("complete")