+#!./.venv/bin/python
+
import numpy as np
import cv2 as cv
-import matplotlib.pyplot as plt
"""
Notes:
def __init__(
self,
window_size: int,
- hop_size: int
):
# calculate the window and hop size, use to calulate the cosine window
self.window_size = window_size
- self.hop_size = hop_size
+ self.hop_size = window_size // 2
self.window = np.hanning(window_size)
+ self.placeholder = np.full((window_size,), fill_value=60)
# set the max and min numerical values for amplitude and angle to allow for easier combinations of them both
self.amplitude_max = 254
self.amplitude_min = 0
+
self.angle_max = 254
self.angle_min = 0
- # set the upper and lower limits (in dB) that are to be displayed on the screen
+
self.volume_max = 100
self.volume_min = -40
self.volume_relative = self.volume_max - self.volume_min
# generate lookup table for the converstion from decibels to power
- a = self.volume_min
- b = self.volume_max / self.amplitude_relative
+ self.a = a = self.volume_min
+ self.b = b = self.volume_relative / self.amplitude_relative
+ self.c = c = self.amplitude_min
- # this is the parameterized inverted function of y = (20 * log10(x) - 40) * (255/140)
- log_lookup = [10 ** (((x * b) + a) / 20) for x in range(0, 256)]
+ # this is the parameterized inverted function of (e.g.) y = (20 * log10(x) - 40) * (255/140)
+ log_lookup = [10 ** ((((x - c) * b) + a) / 20) for x in range(0, 256)]
self.log_lookup = np.array(log_lookup)
def stft(
data: np.ndarray
) -> np.ndarray:
- # apply window and perform the fft
- segment = data * self.window
- spectrum = np.fft.fft(segment) / self.window_size
-
- # convert the vector length to decimals and confine
- amplitude = np.abs(spectrum)
-
- amplitude = 20*np.log10(amplitude)
- amplitude = np.clip(amplitude, self.volume_min, self.volume_max)
+ try:
+ # apply window and perform the fft
+ segment = data * self.window
+ spectrum = np.fft.fft(segment) / self.window_size
- # confine the amplitude within the limits specified
- a = self.volume_min
- b = self.amplitude_relative / self.volume_relative # possibly change the vol_max to vol_rel ?? see [2]
- c = self.amplitude_min
- amplitude = ((amplitude - a) * b) + c
+ # convert the vector length to decimals and confine
+ amplitude = np.abs(spectrum)
+ amplitude = 20*np.log10(amplitude)
+ amplitude = np.clip(amplitude, self.volume_min, self.volume_max)
+ amplitude = ((amplitude - self.a) / self.b) + self.c
- # convert x and y to the angle and confine
- angle = np.angle(spectrum)
+ # convert x and y to the angle and confine
+ angle = np.angle(spectrum)
+ angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
+ angle = np.clip(angle, self.angle_min, self.angle_max)
- # confine the angle within the limits specified
- angle = ((angle + np.pi) * (self.angle_relative / (2 * np.pi))) + self.angle_min
- angle = np.clip(angle, self.angle_min, self.angle_max)
+ # rearrange to image format
+ image = np.stack((amplitude * (180/255), angle, amplitude), axis=-1)
+ image = np.array([image], dtype=np.uint8)
- # rearrange to image format
- full = np.full(angle.shape, fill_value=60)
- image = np.stack((amplitude * (180/255), angle, amplitude), axis=-1)
- image = np.array([image], dtype=np.uint8)
- image = cv.cvtColor(image, cv.COLOR_HSV2BGR)
+ return image
- return image
+ except KeyboardInterrupt:
+ return None
def istft(
self,
image: np.ndarray
) -> np.ndarray:
- # split the image into constituant parts
- image = cv.cvtColor(image, cv.COLOR_BGR2HSV)
- amplitude = image[0][...,2]
- angle = image[0][...,1].astype(np.float64)
- #hue = image[0][...,0].astype(np.float64) * (255/180)
+ try:
+ # split the image into constituant parts
+ amplitude = image[0][...,2]
+ angle = image[0][...,1].astype(np.float64)
- #amplitude = np.mean( np.array([ amplitude, hue ]), axis=0 ).astype(np.uint8)
+ # Use hue as seperate data point
+ #hue = image[0][...,0].astype(np.float64) * (255/180)
+ #amplitude = np.mean( np.array([ amplitude, hue ]), axis=0 ).astype(np.uint8)
- # convert amplitude back into vector length
- amplitude = self.log_lookup[amplitude]
+ # convert amplitude back into vector length
+ amplitude = self.log_lookup[amplitude]
- # convert angle back into x and y
- angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
+ # convert angle back into x and y
+ angle = ((angle - self.angle_min) / (self.angle_relative / (2 * np.pi))) - np.pi
- # rearrange back into fft result
- real = np.cos(angle) * amplitude
- imag = np.sin(angle) * amplitude
- segment = real + (1j * imag)
+ # rearrange back into fft result
+ real = np.cos(angle) * amplitude
+ imag = np.sin(angle) * amplitude
+ segment = real + (1j * imag)
+ data = np.fft.ifft(segment * self.window_size).real
- data = np.fft.ifft(segment * self.window_size).real
+ return data.astype(np.int16)
- return data.astype(np.int16)
+ except KeyboardInterrupt:
+ return None
+#!./.venv/bin/python
+
import cv2 as cv
import numpy as np
from scipy.io import wavfile
import scipy.signal as sps
-import matplotlib.pyplot as plt
from multiprocessing import Pool
from camera import camera
-from struct import pack
from fft import fft
import time
import pyaudio
import os
import sys
import wave
-import matplotlib.pyplot as plt
-import gi
-from gi.repository import Gtk
+from tqdm import tqdm
-"""
-notes:
-- window size
- the time to generate the spectrum is logaritmically related to the window size
- bigger windows are exponentially better so you should prefer this if possible
- obviously the biggest you can use is the size of your display unless you have
- some way of arranging the pixles independant of the orrigional spectrogram
-"""
+def normalize(array: np.ndarray):
+ array = array.astype(np.float64)
+ array -= np.min(array)
+ array *= 255 / np.max(array)
-sample_rate, data = wavfile.read("/home/will/Downloads/birdsong.wav")
-#data = data[...,0]
+ return array.astype(np.uint8)
-new_rate = 10_000.
-sample_count = round(len(data) * new_rate / sample_rate)
-data = sps.resample(data, sample_count)
-sample_rate = int(new_rate)
+def get_audio(
+ uri: str,
+ window_size: int,
+ window_height: int,
+ new_rate: int = 11_050
+) -> np.ndarray:
-window_size = 170
-window_height = 80
+ sample_rate, data = wavfile.read(uri)
-hop_size = window_size // 2
-camera = camera(
- window_size,
- window_height,
- (1920, 1080),
- device_id = 2,
- debug = True,
- dummy = False
-)
-
-file = wave.open("out.wav", "wb")
-file.setparams((
- 1, # channels
- 2, # sample width
- sample_rate,
- 0,
- "NONE", # compression type
- "NONE" # compression name
-))
-
-camera.calibrate()
-
-transform = fft(window_size, hop_size)
-
-segment_samples = window_height * hop_size
-overflow_samples = segment_samples - (len(data) % segment_samples) + window_size
-data = np.concatenate((data, data[0:overflow_samples]))
-
-segment_count = round(len(data) / segment_samples)
-segment_index = 0
-audio = np.zeros((hop_size,), dtype=np.int16)
+ # ensure 1 channel
+ if len(data.shape) > 1:
+ data = data[...,0]
-def callback(in_data, frame_count, time_info, status):
+ # resample
+ sample_count = round(len(data) * float(new_rate) / sample_rate)
+ data = sps.resample(data, sample_count)
+
+ # make divisisible into screens
+ segment_samples = window_height * (window_height // 2)
+ overflow_samples = segment_samples - (len(data) % segment_samples) + window_size
+ data = np.concatenate((data, data[0:overflow_samples]))
+
+ return data
+def callback(in_data, frame_count, time_info, status):
global audio
+ global caching
+ # handle not enough frames being available
data = audio[:frame_count]
if len(data) < frame_count:
data = np.pad(data, [(0, frame_count - len(data))], mode='constant')
audio = np.zeros((hop_size,), dtype=np.int16)
+ if not caching: print("Dropped frames!")
+
else:
audio = np.delete(audio, np.s_[:frame_count])
+ #handle buffer minimum
+ if len(audio) < hop_size:
+ audio = np.zeros((hop_size,), dtype=np.int16)
+ print("buffer minimum exceeded ")
+
return (data, pyaudio.paContinue)
-pyaudio_object = pyaudio.PyAudio()
-stream = pyaudio_object.open(
- format = pyaudio.paInt16,
- channels = 1,
- rate = sample_rate,
- frames_per_buffer = 1024,
- output = True,
- stream_callback = callback
-)
-
-try:
- while stream.is_active():
+def process_loop(
+ data: np.ndarray,
+ transform,
+ camera,
+ window_size: int,
+ window_height: int,
+ loop: bool = True,
+ correction: bool = False,
+ correction_array: np.ndarray = None
+):
+ global audio
+ global caching
+
+ hop_size = window_size // 2
+ segment_samples = window_height * hop_size
+
+ segment_count = round(len(data) / segment_samples)
+
+ error_array = np.zeros((5, window_size))
+ error_spectrum = np.zeros((window_height, window_size, 3))
+
+ print("caching data...")
+ caching = True
+
+ for segment_index in tqdm(range(segment_count)):
+ # get the specturm of the current sample
segment_start = segment_index * segment_samples
- rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
+ segment_rows = [data[segment_start + i:segment_start + i + window_size] for i in range(0, segment_samples, hop_size)]
with Pool() as p:
- mapping = p.map(transform.stft, rows)
+ mapping = p.map(transform.stft, segment_rows)
spectrum = np.array(mapping)[:,0,...]
+ if correction_array is not None:
+ spectrum -= correction_array
+ spectrum = np.clip(spectrum, 0, 255)
+
+ spectrum = spectrum.astype(np.uint8)
- if segment_index == 10: cv.imwrite("sample.jpg", spectrum)
+ # print sample image
+ if segment_index == 10:
+ cv.imwrite("spectrum_sample.jpg", spectrum)
+ np.save(f"cache/frame{segment_index}.npy", spectrum)
+
+ segment_index = 0
+ print("cached!")
+ caching = False
+
+ while segment_index < segment_count:
+
+ spectrum = np.load(f"cache/frame{segment_index}.npy")
+
+ # display and capture
camera.display(spectrum)
capture = camera.capture()
- # plt.clf()
- # plt.plot(rows[0])
-
- rows = [np.array([i]) for i in capture]
+ # get the recovered sample data
+ recovered_rows = [np.array([i]) for i in capture]
with Pool() as p:
- recovered = np.array(p.map(transform.istft, rows))
-
- if len(audio) < hop_size:
- audio = np.zeros((hop_size,), dtype=np.int16)
+ recovered = np.array(p.map(transform.istft, recovered_rows))
- # plt.plot(recovered[0])
- # plt.pause(0.05)
+ if correction:
+ error_array[1:] = error_array[0:-1]
+ error_array[0] = np.mean(np.array(segment_rows) - recovered, axis=0)
+ # write the recovered data to the file and soundcard
for row in recovered:
audio[-hop_size:] += row[:hop_size]
audio = np.append(audio, row[hop_size:])
- file.writeframes(row[hop_size:])
-
- segment_index += 1
- if segment_index == segment_count: segment_index = 0
+ # file.writeframes(row[hop_size:])
- slept = False
+ # sleep condition
time.sleep(0.1)
while len(audio) > 1 * segment_samples:
- slept = True
cv.waitKey(1)
- if not slept:
- print("Dropped frames!")
+ # move to next segment
+ segment_index += 1
+ if loop and segment_index == segment_count:
+ segment_index = 0
+
+ error_spectrum = transform.stft(np.mean(error_array, axis=0))
+ error_spectrum = np.tile(error_spectrum, (window_height, 1, 1))
+ error_spectrum[...,2] *= 0 # correction of phase seems to cause stability issues
+
+ return error_spectrum
+
+"""
+notes:
+- sample rate
+ due to nyquist, the maximum possible frequncy that can be recovered is half this
+ number. Lower can be dramatically better from a speed standpoint.
+- window size
+ the time to generate the spectrum is logaritmically related to the window size
+ bigger windows are exponentially better so you should prefer this if possible
+ obviously the biggest you can use is the size of your display unless you have
+ some way of arranging the pixles independant of the orrigional spectrogram.
+- Window height
+ This is the height of the image, or, the number of ffts performed in one "batch".
+ This batching is multithreaded and therefore has some effect on process speed.
+"""
+
+# define parameters
+sample_rate = 22_050
+window_size = 150
+window_height = 80
-except KeyboardInterrupt:
- stream.stop_stream()
- stream.close()
- pyaudio_object.terminate()
- file.close()
+caching = False
+
+if __name__ == "__main__":
+
+ # get audio data
+ data = get_audio("/home/will/Downloads/Adducci - Around the Horn.wav", window_size, window_height, sample_rate)
+
+ # setup fft
+ transform = fft(window_size)
+
+ # setup and calibrate camera
+ camera = camera(
+ window_size,
+ window_height,
+ (1920, 1080),
+ device_id = 2,
+ debug = False,
+ dummy = True
+ )
+ camera.calibrate()
+
+ # setup output file
+ file = wave.open("out.wav", "wb")
+ file.setparams((
+ 1, # channels
+ 2, # sample width
+ sample_rate,
+ 0,
+ "NONE", # compression type
+ "NONE" # compression name
+ ))
+
+ # setup stream output
+ audio = np.zeros((window_size // 2,), dtype=np.int16)
+ pyaudio_object = pyaudio.PyAudio()
+ stream = pyaudio_object.open(
+ format = pyaudio.paInt16,
+ channels = 1,
+ rate = sample_rate,
+ frames_per_buffer = 1024,
+ output = True,
+ stream_callback = callback
+ )
try:
- sys.exit()
- except SystemExit:
- os._exit(130)
+ print("performing error correction...")
+ silence = np.full(((10 * window_size * window_height) + window_size,), fill_value=1, dtype=np.int16)
+ correction_array = process_loop(silence, transform, camera, window_size, window_height, loop = False, correction = True)
+
+ cv.imwrite("error.jpg", correction_array)
+ cv.imwrite("error_norm.jpg", normalize(correction_array))
+ print("error correction complete!")
+
+ process_loop(data, transform, camera, window_size, window_height, loop = True, correction = False, correction_array = correction_array)
+
+ # elegantly handle interupt (intended but not succeded)
+ except KeyboardInterrupt:
+ stream.stop_stream()
+ stream.close()
+ pyaudio_object.terminate()
+ file.close()
+
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)