--- /dev/null
+import cv2 as cv
+import numpy as np
+class camera():
+ def __init__(
+ self,
+ window_size: int,
+ window_height: int,
+ display_size: tuple,
+ device_id: int = 0
+ ):
+ self.window_size = window_size
+ self.window_height = window_height
+ self.display_size = display_size
+ self.camera = cv.VideoCapture(device_id)
+ self.homography = None
+ cv.namedWindow("display", cv.WINDOW_NORMAL)
+ def calibrate(
+ self
+ ):
+ calibration_image = cv.imread("calibration/calibration.jpg")
+ calibration_image = cv.resize(calibration_image, self.display_size, cv.INTER_NEAREST)
+ cv.imshow("display", calibration_image)
+ cv.waitKey(0)
+ _, capture = camera.read()
+ # detect SIFT keypoints
+ sift = cv.SIFT_create()
+ kp1, des1 = sift.detectAndCompute(calibration_image, None)
+ kp2, des2 = sift.detectAndCompute(capture, None)
+ # get good matches between calibration image and the captured image
+ flann = cv.FlannBasedMatcher(
+ {"algorithm": 1, "trees": 5},
+ {"checks": 50}
+ )
+ matches = flann.knnMatch(des1, des2, k=2)
+ #get good matches via ratio test
+ good = []
+ for m,n in matches:
+ if m.distance < 0.7*n.distance:
+ good.append(m)
+ if len(good)>10:
+ src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+ dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+ self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+ else:
+ print("calibration failed")
+ def display(
+ self,
+ image: np.ndarray
+ ) -> None:
+ image = cv.resize(image, self.display_size, cv.INTER_NEAREST)
+ cv.imshow("display", image)
+ cv.waitKey(1)
+ def capture(
+ self
+ ) -> np.ndarray:
+ image = self.camera.read()
+ if self.homography is not None:
+ image = cv.warpPerspective(image, self.homography, self.display_size)
+ image = cv.resize(image, (self.window_size, self.window_height), cv.INTER_NEAREST)
+ image = match_histograms(image, display, channel_axis=-1)
+ return image
--- /dev/null
+import numpy as np
+import cv2 as cv
+class fft():
+ def __init__(
+ self,
+ window_size: int,
+ hop_size: int
+ ):
+ self.window_size = window_size
+ self.hop_size = hop_size
+ self.window = np.hanning(window_size)
+ self.lower_limit = -40
+ self.upper_limit = 100
+ self.amplitude_max = 180
+ self.amplitude_min = 0
+ self.angle_max = 255
+ self.angle_min = 100
+ self.amplitude_relative = self.amplitude_max - self.amplitude_min
+ self.angle_relative = self.angle_max - self.angle_min
+ def stft(
+ self,
+ data: np.ndarray
+ ) -> np.ndarray:
+ segment = data * self.window
+ spectrum = np.fft.fft(segment) / self.window_size
+ amplitude = np.abs(spectrum)
+ amplitude = 20*np.log10(amplitude)
+ amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
+ amplitude -= self.lower_limit
+ amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min
+ angle = np.angle(spectrum)
+ angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
+ full = np.full(angle.shape, fill_value=255)
+ image = np.stack((amplitude, angle, full), axis=-1)
+ image = np.array([image], dtype=np.uint8)
+ image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
+ return image
+ def istft(
+ self,
+ image: np.ndarray
+ ) -> np.ndarray:
+ image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
+ amplitude = image[0][...,0].astype(np.float64)
+ angle = image[0][...,1].astype(np.float64)
+ amplitude -= self.amplitude_min
+ amplitude /= (self.amplitude_relative / self.upper_limit)
+ amplitude += self.lower_limit
+ amplitude = np.power(10, amplitude / 20)
+ angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
+ real = np.cos(angle) * amplitude
+ imag = np.sin(angle) * amplitude
+ segment = real + (1j * imag)
+ data = np.fft.ifft(segment * self.window_size).real
+ return data
--- /dev/null
+import cv2 as cv
+import numpy as np
+from scipy.io import wavfile
+import scipy.signal as sps
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+sample_rate, data = wavfile.read("/home/will/Downloads/number-station.wav")
+new_rate = 22_050.
+sample_count = round(len(data) * new_rate / sample_rate)
+data = sps.resample(data, sample_count)
+sample_rate = int(new_rate)
+data = [data[i] for i in range(0, len(data), 2)]
+sample_rate = sample_rate // 2
+window_size = 250
+window_height = 125
+hop_size = window_size // 2
+camera = camera(window_size, window_height, (1000, 1000))
+transform = fft(window_size, hop_size)
+segment_samples = window_height * hop_size
+padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1)
+data = np.concatenate((data, padding))
+recovered_data = np.zeros(data.shape)
+segment_count = round(len(data) / segment_samples)
+for segment_index in range(segment_count):
+ segment_start = segment_index * segment_samples
+ rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+ with Pool() as p:
+ mapping = p.map(transform.stft, rows)
+ spectrum = np.array(mapping)[:,0,...]
+ camera.display(spectrum)
+ rows = [np.array([i]) for i in spectrum]
+ with Pool() as p:
+ recovered = np.array(p.map(transform.istft, rows))
+ for i, row in enumerate(recovered):
+ row_start = i * hop_size
+ recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row
+wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16))
+difference = (data - recovered_data)[1000:251000]
+from struct import unpack
+import numpy as np
+from scipy.io import wavfile
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+import pyaudio
+import sys
+import os
+import wave
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+- read size (the window size)
+ this is the amount of data that is read from the audio device at one time
+ i belive the maximum for this specific device is 990? its something to do with
+ the number of channels and the sample rate...
+every time the window width / 2 number of samples is available to read from the audio
+device. the program puts that chunk of audio into the biffer. each chunk is then
+appended to the last chunk. the last chunk (with no later chunk to append onto it) is
+left in the buffer to provide a smooth transition between the images
+window_width = 750
+window_height = 500
+sample_rate = 22_050
+channels = 1
+hop_size = window_width // 2
+camera = camera(window_width, window_height, (1000, 1000))
+transform = fft(window_width, hop_size)
+pyaudio_object = pyaudio.PyAudio()
+stream = pyaudio_object.open(
+ format = pyaudio.paInt16,
+ channels = channels,
+ rate = sample_rate,
+ input = True
+buffer = []
+spectrum = np.zeros((window_height, window_width, 3), dtype=np.uint8)
+spectrum_index = 0
+audio = np.zeros((hop_size,), dtype=np.int16)
+ file = wave.open("out.wav", "wb")
+ file.setparams((
+ channels,
+ 2, # sample width
+ sample_rate,
+ 0,
+ "NONE", # compression type
+ "NONE" # compression name
+ ))
+ while stream.is_active():
+ data = stream.read(hop_size, exception_on_overflow = False)
+ data = unpack(f"<{hop_size}h", data)
+ buffer.append(list(data))
+ if len(buffer) == 2:
+ spectrum[spectrum_index] = transform.stft(buffer[0] + buffer[1])
+ spectrum_index += 1
+ del buffer[0]
+ camera.display(spectrum)
+ if spectrum_index == window_height:
+ spectrum_index = 0
+ rows = [np.array([i]) for i in spectrum]
+ with Pool(3) as p:
+ recovered = np.array(p.map(transform.istft, rows), dtype=np.int16)
+ for row in recovered:
+ audio[-hop_size:] += row[:hop_size]
+ audio = np.append(audio, row[hop_size:])
+ file.writeframes(audio[:-hop_size])
+ audio = np.delete(audio, np.s_[:-hop_size])
+except KeyboardInterrupt:
+ stream.stop_stream()
+ stream.close()
+ pyaudio_object.terminate()
+ file.close()
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
