]> OzVa Git service - audio-over-stft/commitdiff
done (:
authorwill <greenwoodw50@gmail.com>
Thu, 20 Jun 2024 14:03:52 +0000 (15:03 +0100)
committerwill <greenwoodw50@gmail.com>
Thu, 20 Jun 2024 14:03:52 +0000 (15:03 +0100)
AOSTFT.py [deleted file]
calibration/calibration.jpg [new file with mode: 0644]
calibration/calibration.png [deleted file]
calibration/calibration1.png [deleted file]
data/audio1.wav [deleted file]
data/audio2.wav [deleted file]
data/data.wav [new file with mode: 0644]
main.py [new file with mode: 0644]
out.gif [new file with mode: 0644]
out.wav [changed mode: 0755->0644]
test.py [new file with mode: 0644]

diff --git a/AOSTFT.py b/AOSTFT.py
deleted file mode 100755 (executable)
index a954048..0000000
--- a/AOSTFT.py
+++ /dev/null
@@ -1,141 +0,0 @@
-import wave\r
-import random\r
-import time\r
-import cv2 as cv\r
-import numpy as np\r
-from scipy.signal import ShortTimeFFT\r
-from scipy.signal.windows import cosine\r
-from scipy.io import wavfile\r
-import os\r
-\r
-\r
-import matplotlib.pyplot as plt\r
-\r
-def calibrate(windowsize):\r
-    print("Attempting calibration")\r
-    calibrated2 = False\r
-    while not calibrated2:\r
-\r
-        calibrationimage = cv.imread("calibration.png")\r
-        cv.imshow("display", calibrationimage)\r
-        cv.waitKey(1)\r
-        \r
-        cameraimage = cv.imread("test.jpg") #replace with taking a picture from the camera\r
-\r
-        #detect SIFT keypoints\r
-        sift = cv.SIFT_create()\r
-        kp1, des1 = sift.detectAndCompute(calibrationimage,None)\r
-        kp2, des2 = sift.detectAndCompute(cameraimage,None)\r
-\r
-        #cv2 bullshit\r
-        FLANN_INDEX_KDTREE = 1\r
-        index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)\r
-        search_params = dict(checks = 50)\r
-        flann = cv.FlannBasedMatcher(index_params, search_params)\r
-        matches = flann.knnMatch(des1,des2,k=2)\r
-        #get good matches via ratio test\r
-        good = []\r
-        for m,n in matches:\r
-            if m.distance < 0.7*n.distance:\r
-                good.append(m)\r
-        \r
-        #if theres enough matches\r
-        if len(good)>10:\r
-            src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)\r
-            dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)\r
-            M, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)\r
-            \r
-            img3 = cv.warpPerspective(cameraimage, M, windowsize)\r
-\r
-            calibrated2 = True\r
-            print("Calibration sucessfull")\r
-\r
-            cv.imshow("display", img3)\r
-            cv.waitKey(1)\r
-        else:\r
-            print("calibration unsucessfull - retrying...")\r
-\r
-        return M\r
-\r
-def getSFT():\r
-    width = 719\r
-    w = cosine(width, sym=True)\r
-    SFT = ShortTimeFFT(w, hop=1, fs=16_000, scale_to='magnitude') # hop was og width *2 but then it started shouting at me\r
-\r
-    return SFT\r
-\r
-def getSTFT(data, start, stop):\r
-\r
-    diff = (stop-start)-len(data[start:stop])\r
-    if diff > 0:\r
-        data = np.pad(data, (0, diff), 'constant')\r
-\r
-    SFT = getSFT()\r
-    Sx = SFT.stft(data, p0=start, p1=stop)\r
-\r
-    Sx = 20*np.log10(Sx)\r
-    real, imag = Sx.real, Sx.imag\r
-\r
-    img = np.stack((real, imag, [real, imag][random.randint(0,1)]), axis=-1)\r
-    img = np.array(img+128, dtype=np.uint8)\r
-\r
-    return img\r
-\r
-def getISIFT(img, predata, step):\r
-\r
-    img = np.array(img, dtype=np.float64)-128\r
-\r
-    real, imag = img[...,0], img[...,1]\r
-\r
-    Sx = np.vectorize(complex)(real, imag)\r
-    Sx = np.power(10, Sx/20) # i think this bit is wrong\r
-\r
-    SFT = getSFT()\r
-    data = SFT.istft(Sx, k1=step)\r
-    data = np.array(data, dtype=np.int16)\r
-\r
-    return data\r
-\r
-def transmit(img, homo, windowsize):\r
-\r
-    img = cv.resize(img, (1080, 720), cv.INTER_NEAREST)\r
-\r
-    cv.imshow("display", img)\r
-    cv.waitKey(1)\r
-\r
-    #cap = cv.warpPerspective(cameraimage, homo, windowsize)\r
-    \r
-    img = cv.resize(img, (1080, 360), cv.INTER_NEAREST)\r
-\r
-    return img\r
-\r
-if __name__ == "__main__":\r
-\r
-    windowsize = (1080, 720)\r
-\r
-    cv.namedWindow("display")\r
-    homo = None\r
-    #homo = calibrate()\r
-\r
-    sr, data = wavfile.read("audio2.wav")\r
-    data = np.array(data, dtype=np.int16)\r
-    SFT = getSFT()\r
-\r
-    step = 360\r
-    newdata = np.zeros((1,), dtype=np.int16)\r
-    try:\r
-        for i in range(0, len(data), step):\r
-            img = getSTFT(data, i, i+step)\r
-            img = transmit(img, homo, windowsize)\r
-            recovered = getISIFT(img, data[i:i+step], step)\r
-            newdata = np.concatenate((newdata, recovered), axis=0)\r
-            os.system('cls')\r
-            print(f"total difference: {np.sum(abs(recovered-data[i:i+step]))}")\r
-            print(f"origional data limits: {np.max(data), np.min(data)}")\r
-            print(f"current data limits: {np.max(newdata), np.min(newdata)}")\r
-            print(f"current data factor: {np.max(data)//np.max(newdata), np.min(data)//np.min(newdata)}")\r
-            print(f"{round(i/data.shape[0], 2)*100}% done")\r
-    except:\r
-        print("errored out!")\r
-\r
-    wavfile.write("out.wav", sr, newdata)
\ No newline at end of file
diff --git a/calibration/calibration.jpg b/calibration/calibration.jpg
new file mode 100644 (file)
index 0000000..54d79ba
Binary files /dev/null and b/calibration/calibration.jpg differ
diff --git a/calibration/calibration.png b/calibration/calibration.png
deleted file mode 100755 (executable)
index 40576b8..0000000
Binary files a/calibration/calibration.png and /dev/null differ
diff --git a/calibration/calibration1.png b/calibration/calibration1.png
deleted file mode 100755 (executable)
index 83bb076..0000000
Binary files a/calibration/calibration1.png and /dev/null differ
diff --git a/data/audio1.wav b/data/audio1.wav
deleted file mode 100755 (executable)
index e53bc92..0000000
Binary files a/data/audio1.wav and /dev/null differ
diff --git a/data/audio2.wav b/data/audio2.wav
deleted file mode 100755 (executable)
index 70ac56b..0000000
Binary files a/data/audio2.wav and /dev/null differ
diff --git a/data/data.wav b/data/data.wav
new file mode 100644 (file)
index 0000000..c78f1b9
Binary files /dev/null and b/data/data.wav differ
diff --git a/main.py b/main.py
new file mode 100644 (file)
index 0000000..06df698
--- /dev/null
+++ b/main.py
@@ -0,0 +1,201 @@
+import os
+import time
+import cv2 as cv
+import screeninfo
+import numpy as np
+from PIL import ImageGrab
+from scipy.io import wavfile
+from skimage.exposure import match_histograms
+
+sample_rate, data = wavfile.read("./data/data.wav")
+camera = cv.VideoCapture(0)
+
+window_size = 12_000                           # the window size is the number of frequency bins
+hop_size = 6_000                                       # size of each jump of the window
+display_size = (900, 900)                      # SHOULD be greater than segment size otherwise youll get information loss
+segment_size = 160                                     # the ((window_size * 2) / segment_size) * segment_parity should not be more than the display size
+segment_parity = 1                                     # number of parity copies in the display
+screen_id = 0
+
+dummy = True
+time_skip = False
+
+data = np.concatenate(
+       (data, np.zeros((
+               # add empty samples to bring the size up to a multiple of hop + the window width
+               window_size + (hop_size - (len(data) % hop_size))
+       )))
+)
+
+segment_count = round(len(data) / hop_size) - 1        # get the number of jumps required
+
+window = np.hanning(window_size)                               # window is half cosine so the overlap produces constant power
+
+result_array = np.empty((segment_count, window_size), dtype=np.complex128)     # result array
+
+for i in range(segment_count):
+       segment_offset = hop_size * i
+       segment = data[segment_offset:segment_offset+window_size]       # current segment of data
+
+       window_segment = segment * window                                                       # multiply by the window
+       spectrum = np.fft.fft(window_segment) / window_size                     # take the Fourier Transform and scale by the number of samples
+
+       result_array[i, :] = spectrum[:window_size]                                     # append to the results array
+
+       os.system("clear")
+       print(f"1/2 {round((i / segment_count) * 100)}%")
+
+result_array = np.transpose(result_array)
+
+result_real = np.concatenate((                                                                 # get the positive and negative (top and bottom) real arrays
+       np.where(result_array.real > 0., result_array.real, 0.1),
+       np.where(result_array.real < 0., result_array.real * -1, 0.1)
+), axis=0)
+result_imag = np.concatenate((                                                                 # get the positive and negative (top and bottom) imaginary arrays
+       np.where(result_array.imag > 0., result_array.imag, 0.1),
+       np.where(result_array.imag < 0., result_array.imag * -1, 0.1)
+), axis=0)
+
+result = np.stack((result_real, result_imag, np.flip(result_imag, axis=(0,1))), axis=-1)
+
+result = 20*np.log10(result)           # scale to db
+result = np.clip(result, -40, 200)     # clip values
+
+image = (result + 40) * 1.275                          # put the data in range for an image
+
+image = np.array(np.rint(image), dtype=np.uint8)
+recovered = np.zeros((image.shape), dtype=np.uint8)
+
+cv.namedWindow("display")
+cv.namedWindow("debug1")
+cv.namedWindow("debug2")
+
+calibrated = False
+while not calibrated and not dummy:
+       calibration_image = cv.imread("calibration/calibration.jpg")
+       calibration_image = cv.resize(calibration_image, display_size, cv.INTER_NEAREST)
+       cv.imshow("display", calibration_image)
+       cv.waitKey(0)
+       _, capture = camera.read()
+
+       # detect SIFT keypoints
+       sift = cv.SIFT_create()
+       kp1, des1 = sift.detectAndCompute(calibration_image,None)
+       kp2, des2 = sift.detectAndCompute(capture,None)
+
+       # get good matches between calibration image and the captured image
+       FLANN_INDEX_KDTREE = 1
+       index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
+       search_params = dict(checks = 50)
+       flann = cv.FlannBasedMatcher(index_params, search_params)
+       matches = flann.knnMatch(des1,des2,k=2)
+       #get good matches via ratio test
+       good = []
+       for m,n in matches:
+               if m.distance < 0.7*n.distance:
+                       good.append(m)
+
+       if len(good)>10:
+               src_pts = np.float32([ kp1[m.queryIdx].pt for m in good ]).reshape(-1,1,2)
+               dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good ]).reshape(-1,1,2)
+               homography, mask = cv.findHomography(dst_pts, src_pts, cv.RANSAC, 5.0)
+
+               img3 = cv.warpPerspective(capture, homography, display_size)
+
+               calibrated = True
+               print("calibrated")
+
+               cv.imshow("display", img3)
+               cv.waitKey(1)
+       else:
+               print("retrying calibration")
+
+               calibrated = True
+
+frame_time = (window_size * 2) / (sample_rate / window_size)
+
+for i in range(image.shape[1]):
+       time_start = time.time()
+       segment = np.copy(image[:,i])                                                           # get the column of the image we are to work on
+
+       columns = round(len(segment) / segment_size)                            # get the number of columns to split the single column into
+       segment = np.reshape(segment, (segment_size, columns, 3))       # reshape the column into a 3d array
+
+       segment = np.concatenate((segment,)*segment_parity, axis=1)                             # affix the object with 5 parity copies
+
+       display = cv.resize(segment, display_size, cv.INTER_NEAREST)                    # resize the array for display
+
+       cv.imshow("display", display)           # show image to display
+       cv.waitKey(1)                                   # wait till capture
+
+       if dummy: capture = display             # pass the image to display straight to the capture
+       else:
+               good, capture = camera.read()   # send the capture to the buffer
+               if not good:
+                       print("capture failed")
+                       print("diverting to dummy output")
+                       dummy = True
+                       capture = display
+
+               cv.imshow("debug1", capture)            # show image to display
+               capture = cv.warpPerspective(capture, homography, display_size)         # fix distorition in the captured image and crop
+               capture = match_histograms(capture, display, channel_axis=-1)
+               cv.imshow("debug2", capture)            # show image to display
+
+       capture = cv.resize(capture, (columns * segment_parity, segment_size), cv.INTER_NEAREST)                                # resize back to the segment size
+
+       recovered_segment = np.array_split(capture, segment_parity, axis=1)                                                                     # split to list of parity copies
+       recovered_segment = np.array([a for a in recovered_segment if a.shape[1] == columns])   # get the array of the parity copies
+       recovered_segment = np.mean(recovered_segment, axis=0)                                                                  # get the mean of the parity copies
+
+       recovered_segment = np.reshape(recovered_segment, (2*window_size, 3))                                   # reshape to origional column
+
+       recovered[:, i] = recovered_segment     # insert into recovered data
+
+       real_time = time.time() - time_start
+       wait_time = frame_time - real_time
+       if wait_time > 0 and time_skip: time.sleep(wait_time)
+       else:
+               os.system("clear")
+               print(f"running @ {round(real_time / frame_time, 2)}x realtime")
+
+recovered = np.where(np.isreal(recovered), recovered, 0.1)     # remove the nans introduces through transformation of a blank spectrogram
+recovered = np.power(10, ((recovered / 1.275) - 40) / 20)      # unscale from dB
+
+recovered[...,1] = (recovered[...,1] + np.flip(recovered[...,2], axis=(1,0))) / 2                              # revert the parity copy flipped in the red channel
+
+recovered_real = np.array_split(recovered[...,0], 2)                                                                           # split into two arrays each of the positive and negative component (top and bottom)
+recovered_imag = np.array_split(recovered[...,1], 2)
+
+recovered_real = recovered_real[0] + (recovered_real[1] * -1)                                                          # revert the negative array and combine
+recovered_imag = recovered_imag[0] + (recovered_imag[1] * -1)
+recovered_array = np.transpose(recovered_real * -1 + 1j * recovered_imag)                                      # transpose and make complex again
+# because of the two transposes, the array might be (in total) flipped horisonally?
+# this is not an issue for the audio signal
+
+recovered_signal = np.zeros(((recovered_array.shape[0] + 1) * hop_size))                                       # empty array for the recovered signal
+
+for i in range(recovered_array.shape[0]):
+       signal_offset = i * hop_size                                                                                                                    # get the sample offset of the range we insert
+       signal_segment = np.fft.ifft(recovered_array[i] * window_size, n=window_size).real              # get the istft of the data
+       recovered_signal[signal_offset:signal_offset + window_size] += signal_segment #/ 2              # add the data to the recovered signal
+
+       os.system("clear")
+       print(f"2/2 {round((i / recovered_array.shape[0]) * 100)}%")
+
+recovered_signal = np.where(np.isreal(recovered_signal), recovered_signal, 0) # remove the nans introduces through transformation of a blank spectrogram
+# this is a quick fix for a bug introduced elsewhere
+
+recovered_signal = np.clip(            # constrain the data to the max and min for the datatype
+       recovered_signal,
+       np.iinfo(np.int16).min,         # max and min are system dependant so check dynamically
+       np.iinfo(np.int16).max
+)
+
+recovered_signal *= np.average(data) / np.average(recovered_signal)    # normalize via the average of both of the audio signals
+
+recovered_signal = np.array(recovered_signal, dtype=np.int16)          # covert the data to the required data type
+
+wavfile.write("out.wav", sample_rate, recovered_signal)
+
+print("complete")
diff --git a/out.gif b/out.gif
new file mode 100644 (file)
index 0000000..b45b639
Binary files /dev/null and b/out.gif differ
diff --git a/out.wav b/out.wav
old mode 100755 (executable)
new mode 100644 (file)
index 60f58a3..d03eef7
Binary files a/out.wav and b/out.wav differ
diff --git a/test.py b/test.py
new file mode 100644 (file)
index 0000000..bce9ae7
--- /dev/null
+++ b/test.py
@@ -0,0 +1,5 @@
+import cv2 as cv
+
+image = cv.imread("calibration/calibration.jpg")
+
+cv.imwrite("calibration/calibration.jpg")