--- /dev/null
+import cv2 as cv
+import numpy as np
+
+class camera():
+ def __init__(
+ self,
+ window_size: int,
+ window_height: int,
+ display_size: tuple,
+ device_id: int = 0
+ ):
+
+ self.window_size = window_size
+ self.window_height = window_height
+ self.display_size = display_size
+
+ self.camera = cv.VideoCapture(device_id)
+ self.homography = None
+
+ cv.namedWindow("display", cv.WINDOW_NORMAL)
+
+ def calibrate(
+ self
+ ):
+ calibration_image = cv.imread("calibration/calibration.jpg")
+ calibration_image = cv.resize(calibration_image, self.display_size, cv.INTER_NEAREST)
+
+ cv.imshow("display", calibration_image)
+ cv.waitKey(0)
+ _, capture = camera.read()
+
+ # detect SIFT keypoints
+ sift = cv.SIFT_create()
+ kp1, des1 = sift.detectAndCompute(calibration_image, None)
+ kp2, des2 = sift.detectAndCompute(capture, None)
+
+ # get good matches between calibration image and the captured image
+ flann = cv.FlannBasedMatcher(
+ {"algorithm": 1, "trees": 5},
+ {"checks": 50}
+ )
+ matches = flann.knnMatch(des1, des2, k=2)
+
+ #get good matches via ratio test
+ good = []
+ for m,n in matches:
+ if m.distance < 0.7*n.distance:
+ good.append(m)
+
+ if len(good)>10:
+ src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+ dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+ self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+
+ else:
+ print("calibration failed")
+
+ def display(
+ self,
+ image: np.ndarray
+ ) -> None:
+
+ image = cv.resize(image, self.display_size, cv.INTER_NEAREST)
+ cv.imshow("display", image)
+ cv.waitKey(1)
+
+ def capture(
+ self
+ ) -> np.ndarray:
+
+ image = self.camera.read()
+ if self.homography is not None:
+ image = cv.warpPerspective(image, self.homography, self.display_size)
+ image = cv.resize(image, (self.window_size, self.window_height), cv.INTER_NEAREST)
+ image = match_histograms(image, display, channel_axis=-1)
+
+ return image
+
+
--- /dev/null
+import numpy as np
+import cv2 as cv
+
+class fft():
+ def __init__(
+ self,
+ window_size: int,
+ hop_size: int
+ ):
+ self.window_size = window_size
+ self.hop_size = hop_size
+ self.window = np.hanning(window_size)
+
+ self.lower_limit = -40
+ self.upper_limit = 100
+
+ self.amplitude_max = 180
+ self.amplitude_min = 0
+ self.angle_max = 255
+ self.angle_min = 100
+
+ self.amplitude_relative = self.amplitude_max - self.amplitude_min
+ self.angle_relative = self.angle_max - self.angle_min
+
+
+ def stft(
+ self,
+ data: np.ndarray
+ ) -> np.ndarray:
+
+ segment = data * self.window
+ spectrum = np.fft.fft(segment) / self.window_size
+
+ amplitude = np.abs(spectrum)
+ amplitude = 20*np.log10(amplitude)
+ amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
+ amplitude -= self.lower_limit
+ amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min
+
+ angle = np.angle(spectrum)
+ angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
+
+ full = np.full(angle.shape, fill_value=255)
+
+ image = np.stack((amplitude, angle, full), axis=-1)
+ image = np.array([image], dtype=np.uint8)
+
+ image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
+
+ return image
+
+ def istft(
+ self,
+ image: np.ndarray
+ ) -> np.ndarray:
+
+ image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
+
+ amplitude = image[0][...,0].astype(np.float64)
+ angle = image[0][...,1].astype(np.float64)
+
+ amplitude -= self.amplitude_min
+ amplitude /= (self.amplitude_relative / self.upper_limit)
+ amplitude += self.lower_limit
+ amplitude = np.power(10, amplitude / 20)
+
+ angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
+
+ real = np.cos(angle) * amplitude
+ imag = np.sin(angle) * amplitude
+ segment = real + (1j * imag)
+
+ data = np.fft.ifft(segment * self.window_size).real
+
+ return data
+
+
--- /dev/null
+import cv2 as cv
+import numpy as np
+from scipy.io import wavfile
+import scipy.signal as sps
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+
+"""
+notes:
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+"""
+
+sample_rate, data = wavfile.read("/home/will/Downloads/number-station.wav")
+new_rate = 22_050.
+
+sample_count = round(len(data) * new_rate / sample_rate)
+data = sps.resample(data, sample_count)
+sample_rate = int(new_rate)
+
+data = [data[i] for i in range(0, len(data), 2)]
+sample_rate = sample_rate // 2
+
+window_size = 250
+window_height = 125
+
+hop_size = window_size // 2
+camera = camera(window_size, window_height, (1000, 1000))
+transform = fft(window_size, hop_size)
+
+segment_samples = window_height * hop_size
+
+padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1)
+data = np.concatenate((data, padding))
+
+recovered_data = np.zeros(data.shape)
+
+segment_count = round(len(data) / segment_samples)
+
+for segment_index in range(segment_count):
+ segment_start = segment_index * segment_samples
+ rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+ with Pool() as p:
+ mapping = p.map(transform.stft, rows)
+
+ spectrum = np.array(mapping)[:,0,...]
+
+ camera.display(spectrum)
+
+ rows = [np.array([i]) for i in spectrum]
+ with Pool() as p:
+ recovered = np.array(p.map(transform.istft, rows))
+
+ for i, row in enumerate(recovered):
+ row_start = i * hop_size
+ recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row
+
+wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16))
+
+difference = (data - recovered_data)[1000:251000]
+
+
+
+
+++ /dev/null
-import cv2 as cv
-import numpy as np
-from scipy.io import wavfile
-import scipy.signal as sps
-import matplotlib.pyplot as plt
-from multiprocessing import Pool
-
-"""
-notes:
-- window size
- the time to generate the spectrum is logaritmically related to the window size
- bigger windows are exponentially better so you should prefer this if possible
- obviously the biggest you can use is the size of your display unless you have
- some way of arranging the pixles independant of the orrigional spectrogram
-"""
-
-class camera():
- def __init__(
- self,
- window_size: int,
- window_height: int,
- device_id: int = 0
- ):
-
- self.window_size = window_size
- self.window_height = window_height
-
- self.camera = cv.VideoCapture(device_id)
- self.homography = None
-
- cv.namedWindow("display", cv.WINDOW_NORMAL)
-
- def calibrate(
- self
- ):
- calibration_image = cv.imread("calibration/calibration.jpg")
- calibration_image = cv.resize(calibration_image, (self.window_size, self.window_height), cv.INTER_NEAREST)
-
- cv.imshow("display", calibration_image)
- cv.waitKey(0)
- _, capture = camera.read()
-
- # detect SIFT keypoints
- sift = cv.SIFT_create()
- kp1, des1 = sift.detectAndCompute(calibration_image, None)
- kp2, des2 = sift.detectAndCompute(capture, None)
-
- # get good matches between calibration image and the captured image
- flann = cv.FlannBasedMatcher(
- {"algorithm": 1, "trees": 5},
- {"checks": 50}
- )
- matches = flann.knnMatch(des1, des2, k=2)
-
- #get good matches via ratio test
- good = []
- for m,n in matches:
- if m.distance < 0.7*n.distance:
- good.append(m)
-
- if len(good)>10:
- src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
- dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
-
- self.homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
-
- else:
- print("calibration failed")
-
- def display(
- self,
- image: np.ndarray
- ) -> None:
-
- cv.imshow("display", image)
- cv.waitKey(1)
-
- def capture(
- self
- ) -> np.ndarray:
-
- image = self.camera.read()
- if self.homography is not None:
- image = cv.warpPerspective(image, self.homography, (self.display_size, self.display_height))
- image = match_histograms(image, display, channel_axis=-1)
-
- return image
-
-class fft():
- def __init__(
- self,
- window_size: int,
- hop_size: int
- ):
- self.window_size = window_size
- self.hop_size = hop_size
- self.window = np.hanning(window_size)
-
- self.lower_limit = -40
- self.upper_limit = 100
-
- self.amplitude_max = 180
- self.amplitude_min = 0
- self.angle_max = 255
- self.angle_min = 100
-
- self.amplitude_relative = self.amplitude_max - self.amplitude_min
- self.angle_relative = self.angle_max - self.angle_min
-
-
- def stft(
- self,
- data: np.ndarray
- ) -> np.ndarray:
-
- segment = data * self.window
- spectrum = np.fft.fft(segment) / self.window_size
-
- amplitude = np.abs(spectrum)
- amplitude = 20*np.log10(amplitude)
- amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
- amplitude -= self.lower_limit
- amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min
-
- angle = np.angle(spectrum)
- angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
-
- full = np.full(angle.shape, fill_value=255)
-
- image = np.stack((amplitude, angle, full), axis=-1)
- image = np.array([image], dtype=np.uint8)
-
- image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
-
- return image
-
- def istft(
- self,
- image: np.ndarray
- ) -> np.ndarray:
-
- image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
-
- amplitude = image[0][...,0].astype(np.float64)
- angle = image[0][...,1].astype(np.float64)
-
- amplitude -= self.amplitude_min
- amplitude /= (self.amplitude_relative / self.upper_limit)
- amplitude += self.lower_limit
- amplitude = np.power(10, amplitude / 20)
-
- angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
-
- real = np.cos(angle) * amplitude
- imag = np.sin(angle) * amplitude
- segment = real + (1j * imag)
-
- data = np.fft.ifft(segment * self.window_size).real
-
- return data
-
-sample_rate, data = wavfile.read("/home/will/Music/George Michael - Careless Whisper.wav")
-new_rate = 22_050.
-
-sample_count = round(len(data) * new_rate / sample_rate)
-data = sps.resample(data, sample_count)
-sample_rate = int(new_rate)
-
-data = [data[i] for i in range(0, len(data), 2)]
-sample_rate = sample_rate // 2
-
-window_size = 1_000
-window_height = 500
-
-hop_size = window_size // 2
-camera = camera(window_size, window_height)
-transform = fft(window_size, hop_size)
-
-segment_samples = window_height * hop_size
-
-padding = np.full((segment_samples - (len(data) % segment_samples) + window_height), fill_value=.1)
-data = np.concatenate((data, padding))
-
-recovered_data = np.zeros(data.shape)
-
-segment_count = round(len(data) / segment_samples)
-
-for segment_index in range(segment_count):
- segment_start = segment_index * segment_samples
- rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
- with Pool() as p:
- mapping = p.map(transform.stft, rows)
-
- spectrum = np.array(mapping)[:,0,...]
-
- # do the silly capture thing here
- cv.imshow("display", spectrum)
- cv.waitKey(1)
-
- rows = [np.array([i]) for i in spectrum]
- with Pool() as p:
- recovered = np.array(p.map(transform.istft, rows))
-
- for i, row in enumerate(recovered):
- row_start = i * hop_size
- recovered_data[segment_start + row_start:segment_start + row_start + window_size] += row
-
-wavfile.write("out.wav", sample_rate, recovered_data.astype(np.int16))
-
-difference = (data - recovered_data)[1000:251000]
-
-plt.style.use('dark_background')
-fig, (ax1, ax2) = plt.subplots(nrows=2)
-ax1.plot(difference)
-Pxx, freqs, bins, im = ax2.specgram(difference, NFFT=1014, Fs=1/0.0005)
-plt.show()
-
-
-
-
--- /dev/null
+from struct import unpack
+import numpy as np
+from scipy.io import wavfile
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+import pyaudio
+import sys
+import os
+import wave
+
+"""
+notes:
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+- read size (the window size)
+ this is the amount of data that is read from the audio device at one time
+ i belive the maximum for this specific device is 990? its something to do with
+ the number of channels and the sample rate...
+
+every time the window width / 2 number of samples is available to read from the audio
+device. the program puts that chunk of audio into the biffer. each chunk is then
+appended to the last chunk. the last chunk (with no later chunk to append onto it) is
+left in the buffer to provide a smooth transition between the images
+"""
+
+window_width = 750
+window_height = 500
+sample_rate = 22_050
+channels = 1
+
+hop_size = window_width // 2
+camera = camera(window_width, window_height, (1000, 1000))
+transform = fft(window_width, hop_size)
+
+pyaudio_object = pyaudio.PyAudio()
+stream = pyaudio_object.open(
+ format = pyaudio.paInt16,
+ channels = channels,
+ rate = sample_rate,
+ input = True
+)
+
+buffer = []
+spectrum = np.zeros((window_height, window_width, 3), dtype=np.uint8)
+spectrum_index = 0
+audio = np.zeros((hop_size,), dtype=np.int16)
+
+try:
+ file = wave.open("out.wav", "wb")
+ file.setparams((
+ channels,
+ 2, # sample width
+ sample_rate,
+ 0,
+ "NONE", # compression type
+ "NONE" # compression name
+ ))
+
+ while stream.is_active():
+ data = stream.read(hop_size, exception_on_overflow = False)
+ data = unpack(f"<{hop_size}h", data)
+ buffer.append(list(data))
+
+ if len(buffer) == 2:
+ spectrum[spectrum_index] = transform.stft(buffer[0] + buffer[1])
+ spectrum_index += 1
+ del buffer[0]
+
+ camera.display(spectrum)
+
+ if spectrum_index == window_height:
+ spectrum_index = 0
+
+ rows = [np.array([i]) for i in spectrum]
+ with Pool(3) as p:
+ recovered = np.array(p.map(transform.istft, rows), dtype=np.int16)
+
+ for row in recovered:
+ audio[-hop_size:] += row[:hop_size]
+ audio = np.append(audio, row[hop_size:])
+
+ file.writeframes(audio[:-hop_size])
+ audio = np.delete(audio, np.s_[:-hop_size])
+
+except KeyboardInterrupt:
+
+ stream.stop_stream()
+ stream.close()
+ pyaudio_object.terminate()
+ file.close()
+
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
+
+
+++ /dev/null
-import wave
-
-import pyaudio
-
-def handler (in_data, frame_count, time_info, status):
- print(in_data)
- print(frame_count)
- print(time_info)
- print(status)
-
- (in_data, pyaudio.paContinue)
-
-CHUNK = 1024
-
-p = pyaudio.PyAudio()
-
-stream = p.open(
- format=pyaudio.paInt8,
- channels=1,
- rate=22_050,
- input=True,
- stream_callback=handler
-)
-
-stream.close()
-p.terminate()