import cv2 as cv
import numpy as np
+import queue
+import threading
+
+class VideoCapture:
+ def __init__(self, device_id):
+ cv.CAP_GSTREAMER
+ self.camera = cv.VideoCapture(device_id)
+ self.camera.set(cv.CAP_PROP_FRAME_WIDTH, 1920.0)
+ self.camera.set(cv.CAP_PROP_FRAME_HEIGHT, 1080.0)
+
+ self.queue = Queue.Queue()
+ read_thread = threading.Thread(target=self.reader)
+ read_thread.daemon = True
+ read_thread.start()
+
+ # read frames as soon as they are available, keeping only most recent one
+ def reader(self):
+ while True:
+ ret, frame = self.cap.read()
+ if not ret:
+ break
+ if not self.queue.empty():
+ try:
+ self.queue.get_nowait() # discard previous (unprocessed) frame
+ except Queue.Empty:
+ pass
+ self.queue.put(frame)
+
+ def read(self):
+ return self.queue.get()
class camera():
def __init__(
window_size: int,
window_height: int,
display_size: tuple,
- device_id: int = 0
+ device_id: int = 0,
+ debug: bool = True,
+ dummy: bool = False
):
self.window_size = window_size
self.window_height = window_height
self.display_size = display_size
+ self.match_histograms = False
+ self.show_debug = debug
+ self.dummy = dummy
+
+ cv.CAP_GSTREAMER
self.camera = cv.VideoCapture(device_id)
+
+ self.camera.set(cv.CAP_PROP_BUFFERSIZE, 1)
+ self.camera.set(cv.CAP_PROP_FRAME_WIDTH, 1920.0)
+ self.camera.set(cv.CAP_PROP_FRAME_HEIGHT, 1080.0)
+
self.homography = None
+ self.lookup_color = None
+ self.lookup_vingette = None
+ self.lookup_compression = None
+ self.last_display = None
+ self.last_capture = None
+ self.last_recovered = None
cv.namedWindow("display", cv.WINDOW_NORMAL)
+ if self.show_debug == True:
+ cv.namedWindow("debug", cv.WINDOW_NORMAL)
+
+ def capture_raw(
+ self
+ ) -> np.ndarray:
+
+ _, capture = self.camera.read()
+
+ return capture
def calibrate(
self
):
+ if self.dummy == True:
+ return
+
calibration_image = cv.imread("calibration/calibration.jpg")
- calibration_image = cv.resize(calibration_image, self.display_size, cv.INTER_NEAREST)
+ calibration_image = cv.resize(calibration_image, self.display_size)
cv.imshow("display", calibration_image)
cv.waitKey(0)
- _, capture = camera.read()
+ capture = self.camera.read()
# detect SIFT keypoints
sift = cv.SIFT_create()
else:
print("calibration failed")
+ def get_lookup(
+ self
+ ) -> None:
+
+ if self.dummy == True:
+ return
+
+ vingette_compression = 50
+
+ self.lookup_vingette = np.zeros((
+ 255 // vingette_compression + 1, # potentially +1
+ self.window_height // vingette_compression + 1,
+ self.window_size // vingette_compression + 1
+ ), dtype=np.uint8)
+
+ for v in range(0, 255, vingette_compression):
+ pixel = np.array([[[0, 0, v]]], dtype=np.uint8)
+ pixel = cv.cvtColor(pixel, cv.COLOR_HSV2BGR)
+
+ self.display(pixel)
+ capture = self.capture()
+
+ capture = cv.cvtColor(capture, cv.COLOR_BGR2HSV)
+
+ for y in range(0, self.window_height, vingette_compression):
+ for x in range(0, self.window_size, vingette_compression):
+ self.lookup_vingette[v, y, x] = capture[y, x, 2] - v
+
+ color_compression = 90
+
+ self.lookup_color = np.array((
+ 180 // color_compression + 1,
+ 255 // color_compression + 1,
+ 255 // color_compression + 1,
+ 3
+ ))
+
+ for h in range(0, 180, color_compression):
+ for s in range(0, 255, color_compression):
+ for v in range(0, 255, color_compression):
+ pixel = np.array([[[h, s, v]]], dtype=np.uint8)
+ pixel = cv.cvtColor(pixel, cv.COLOR_HSV2BGR)
+
+ self.display(pixel)
+ capture = self.capture()
+
+ capture = cv.cvtColor(capture, cv.COLOR_BGR2HSV)
+
+ color = capture[self.window_height // 2, self.window_size // 2]
+
+ self.lookup_color[h // color_compression, s // color_compression, v // color_compression] = color - [h, s, v]
+
+ np.save("lookup_vingette", self.lookup_vingette)
+ np.save("lookup_color", self.lookup_color)
+
def display(
self,
image: np.ndarray
) -> None:
-
- image = cv.resize(image, self.display_size, cv.INTER_NEAREST)
+
+ self.last_display = image
+ image = cv.resize(image, self.display_size, interpolation=cv.INTER_NEAREST_EXACT)
cv.imshow("display", image)
cv.waitKey(1)
+ def debug(
+ self
+ ) -> None:
+
+ if self.last_display is not None and self.last_capture is not None and self.last_recovered is not None:
+ height = round(self.last_capture.shape[0] / 2)
+ width = round((self.display_size[0] / self.display_size[1]) * height)
+ last_display = cv.resize(self.last_display, (width, height))
+ last_recovered = cv.resize(self.last_recovered, (width, height))
+ comparison = np.concatenate((last_display, last_recovered), axis=0)
+ debug_image = np.concatenate((self.last_capture, comparison), axis=1)
+ cv.imshow("debug", debug_image)
+ cv.waitKey(1)
+
def capture(
self
) -> np.ndarray:
- image = self.camera.read()
- if self.homography is not None:
- image = cv.warpPerspective(image, self.homography, self.display_size)
- image = cv.resize(image, (self.window_size, self.window_height), cv.INTER_NEAREST)
- image = match_histograms(image, display, channel_axis=-1)
+ if self.dummy == True:
+ image = self.last_display
+
+ else:
+ image = self.camera.read()
+ self.last_capture = image
+ if self.homography is not None:
+ image = cv.warpPerspective(image, self.homography, self.display_size)
+ image = cv.resize(image, (self.window_size, self.window_height))
+
+ if self.lookup_vingette is not None and self.lookup_color is not None:
+ for row in image:
+ for pixel in row:
+ pixel = self.lookup[pixel[0], pixel[1], pixel[2]]
+
+ self.last_recovered = image
+
+ if self.show_debug == True:
+ self.debug()
return image
--- /dev/null
+from struct import unpack
+import numpy as np
+from scipy.io import wavfile
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from fft import fft
+import pyaudio
+import sys
+import os
+import wave
+
+"""
+notes:
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+- read size (the window size)
+ this is the amount of data that is read from the audio device at one time
+ i belive the maximum for this specific device is 990? its something to do with
+ the number of channels and the sample rate...
+
+every time the window width / 2 number of samples is available to read from the audio
+device. the program puts that chunk of audio into the biffer. each chunk is then
+appended to the last chunk. the last chunk (with no later chunk to append onto it) is
+left in the buffer to provide a smooth transition between the images
+"""
+
+window_width = 100
+window_height = 300
+sample_rate = 22_050
+channels = 1
+
+hop_size = window_width // 2
+camera = camera(window_width, window_height, (1000, 1000), device_id=2)
+transform = fft(window_width, hop_size)
+
+pyaudio_object = pyaudio.PyAudio()
+stream = pyaudio_object.open(
+ format = pyaudio.paInt16,
+ channels = channels,
+ rate = sample_rate,
+ input = True
+)
+
+buffer = []
+spectrum = np.zeros((window_height, window_width, 3), dtype=np.uint8)
+spectrum_index = 0
+audio = np.zeros((hop_size,), dtype=np.int16)
+
+try:
+ file = wave.open("out.wav", "wb")
+ file.setparams((
+ channels,
+ 2, # sample width
+ sample_rate,
+ 0,
+ "NONE", # compression type
+ "NONE" # compression name
+ ))
+
+ while stream.is_active():
+ data = stream.read(hop_size, exception_on_overflow = False)
+ data = unpack(f"<{hop_size}h", data)
+ buffer.append(list(data))
+
+ if len(buffer) == 2:
+ spectrum[spectrum_index] = transform.stft(buffer[0] + buffer[1])
+ spectrum_index += 1
+ del buffer[0]
+
+ camera.display(spectrum)
+
+ if spectrum_index == window_height:
+ spectrum_index = 0
+
+ rows = [np.array([i]) for i in spectrum]
+ with Pool(3) as p:
+ recovered = np.array(p.map(transform.istft, rows), dtype=np.int16)
+
+ for row in recovered:
+ audio[-hop_size:] += row[:hop_size]
+ audio = np.append(audio, row[hop_size:])
+
+ file.writeframes(audio[:-hop_size])
+ audio = np.delete(audio, np.s_[:-hop_size])
+
+except KeyboardInterrupt:
+
+ stream.stop_stream()
+ stream.close()
+ pyaudio_object.terminate()
+ file.close()
+
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
+
+
self.lower_limit = -40
self.upper_limit = 100
- self.amplitude_max = 180
+ self.amplitude_max = 254
self.amplitude_min = 0
- self.angle_max = 255
- self.angle_min = 100
+ self.angle_max = 179
+ self.angle_min = 0
self.amplitude_relative = self.amplitude_max - self.amplitude_min
self.angle_relative = self.angle_max - self.angle_min
amplitude = np.clip(amplitude, self.lower_limit, self.upper_limit)
amplitude -= self.lower_limit
amplitude *= (self.amplitude_relative / self.upper_limit) + self.amplitude_min
+ amplitude = np.clip(amplitude, self.amplitude_min, self.amplitude_max)
angle = np.angle(spectrum)
angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
+ angle = np.clip(angle, self.angle_min, self.angle_max)
full = np.full(angle.shape, fill_value=255)
- image = np.stack((amplitude, angle, full), axis=-1)
+ image = np.stack((angle, full, amplitude), axis=-1)
image = np.array([image], dtype=np.uint8)
image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
- amplitude = image[0][...,0].astype(np.float64)
- angle = image[0][...,1].astype(np.float64)
+ amplitude = image[0][...,2].astype(np.float64)
+ angle = image[0][...,0].astype(np.float64)
amplitude -= self.amplitude_min
amplitude /= (self.amplitude_relative / self.upper_limit)
from multiprocessing import Pool
from camera import camera
from fft import fft
+import time
"""
notes:
some way of arranging the pixles independant of the orrigional spectrogram
"""
-sample_rate, data = wavfile.read("/home/will/Downloads/number-station.wav")
-new_rate = 22_050.
+sample_rate, data = wavfile.read("/home/will/Downloads/Adducci - Around the Horn.wav")
+
+
+new_rate = 22050.
sample_count = round(len(data) * new_rate / sample_rate)
data = sps.resample(data, sample_count)
data = [data[i] for i in range(0, len(data), 2)]
sample_rate = sample_rate // 2
-window_size = 250
-window_height = 125
+window_size = 80
+window_height = 45
hop_size = window_size // 2
-camera = camera(window_size, window_height, (1000, 1000))
+camera = camera(window_size, window_height, (1840, 1000), device_id=2, debug=False, dummy=True)
+
transform = fft(window_size, hop_size)
segment_samples = window_height * hop_size
segment_count = round(len(data) / segment_samples)
+camera.calibrate()
+
for segment_index in range(segment_count):
segment_start = segment_index * segment_samples
rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
spectrum = np.array(mapping)[:,0,...]
+ if segment_index == 10: cv.imwrite("sample.jpg", spectrum)
+
camera.display(spectrum)
- rows = [np.array([i]) for i in spectrum]
+ time.sleep(0.5)
+
+ capture = camera.capture()
+
+ rows = [np.array([i]) for i in capture]
with Pool() as p:
recovered = np.array(p.map(transform.istft, rows))
--- /dev/null
+import cv2 as cv
+import numpy as np
+from scipy.io import wavfile
+import scipy.signal as sps
+import matplotlib.pyplot as plt
+from multiprocessing import Pool
+from camera import camera
+from struct import pack
+from fft import fft
+import time
+import pyaudio
+import os
+import sys
+import matplotlib.pyplot as plt
+
+"""
+notes:
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram
+"""
+
+sample_rate, data = wavfile.read("/home/will/Downloads/Adducci - Around the Horn.wav")
+# data = data[...,0]
+
+new_rate = 22050.
+sample_count = round(len(data) * new_rate / sample_rate)
+data = sps.resample(data, sample_count)
+sample_rate = int(new_rate)
+
+window_size = 176
+window_height = 99
+
+hop_size = window_size // 2
+camera = camera(
+ window_size,
+ window_height,
+ (1840, 1000),
+ device_id=2,
+ debug=False,
+ dummy=True
+)
+camera.calibrate()
+camera.get_lookup()
+
+print(camera.lookup_vingette)
+print(camera.lookup_color)
+
+transform = fft(window_size, hop_size)
+
+segment_samples = window_height * hop_size
+overflow_samples = segment_samples - (len(data) % segment_samples) + window_size
+data = np.concatenate((data, data[0:overflow_samples]))
+
+segment_count = round(len(data) / segment_samples)
+segment_index = 0
+audio = np.zeros((hop_size,), dtype=np.int16)
+
+def callback(in_data, frame_count, time_info, status):
+
+ global audio
+
+ data = audio[:frame_count]
+ if len(data) < frame_count:
+ data = np.pad(data, [(0, frame_count - len(data))], mode='constant')
+ audio = np.zeros((hop_size,), dtype=np.int16)
+ else:
+ audio = np.delete(audio, np.s_[:frame_count])
+
+ return (data, pyaudio.paContinue)
+
+pyaudio_object = pyaudio.PyAudio()
+stream = pyaudio_object.open(
+ format = pyaudio.paInt16,
+ channels = 1,
+ rate = sample_rate,
+ frames_per_buffer = 2048,
+ output = True,
+ stream_callback = callback
+)
+
+try:
+ while stream.is_active():
+ segment_start = segment_index * segment_samples
+ rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+ with Pool() as p:
+ mapping = p.map(transform.stft, rows)
+
+ spectrum = np.array(mapping)[:,0,...]
+
+ if segment_index == 10: cv.imwrite("sample.jpg", spectrum)
+
+ camera.display(spectrum)
+ time.sleep(0.1)
+ capture = camera.capture()
+
+ rows = [np.array([i]) for i in capture]
+ with Pool() as p:
+ recovered = np.array(p.map(transform.istft, rows))
+
+ if len(audio) < hop_size:
+ audio = np.zeros((hop_size,), dtype=np.int16)
+
+ for row in recovered:
+ row = row.astype(np.int16)
+
+ audio[-hop_size:] += row[:hop_size]
+ audio = np.append(audio, row[hop_size:])
+
+ segment_index += 1
+ if segment_index == segment_count: segment_index = 0
+
+ slept = 0
+ while len(audio) > 2 * segment_samples:
+ time.sleep(0.01)
+ slept += 1
+ print(f"slept {slept} times")
+
+except KeyboardInterrupt:
+ stream.stop_stream()
+ stream.close()
+ pyaudio_object.terminate()
+
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
left in the buffer to provide a smooth transition between the images
"""
-window_width = 750
-window_height = 500
+window_width = 100
+window_height = 300
sample_rate = 22_050
channels = 1
hop_size = window_width // 2
-camera = camera(window_width, window_height, (1000, 1000))
+camera = camera(window_width, window_height, (1000, 1000), device_id=0, debug=True)
+camera.calibrate()
transform = fft(window_width, hop_size)
pyaudio_object = pyaudio.PyAudio()
if spectrum_index == window_height:
spectrum_index = 0
- rows = [np.array([i]) for i in spectrum]
+ capture = camera.capture()
+
+ rows = [np.array([i]) for i in capture]
with Pool(3) as p:
recovered = np.array(p.map(transform.istft, rows), dtype=np.int16)