blog-acoustic-fingerprinting
ARCHIVED - acoustic fingerprinting television shows with python
git clone https://git.vogt.world/blog-acoustic-fingerprinting.git
Log | Files | README.md
← All files
name: src/python/fingerprinter/fingerprint.py
-rw-r--r--
3135
 1import numpy
 2import matplotlib.mlab as mlab
 3import matplotlib.pyplot as plt
 4from scipy.ndimage.filters import maximum_filter
 5from scipy.ndimage.morphology import (generate_binary_structure, iterate_structure, binary_erosion)
 6from mmh3 import hash
 7from operator import itemgetter
 8
 9
10MINIMUM_TIME_BETWEEN_PEAKS = 0
11MAX_TIME_BETWEEN_PEAKS = 200
12PEAK_NEIGHBOR_COUNT = 20
13HASH_PEAK_FAN_OUT = 4
14
15FFT_WINDOW_SIZE = 4096
16OVERLAP_RATIO = 0.5
17MINIMUM_AMPLITUDE = 20
18PLOT_FINGERPRINT = False
19
20
21class Fingerprinter(object):
22  def __init__(self, plot_fingerprint=PLOT_FINGERPRINT,
23      min_between_peaks=MINIMUM_TIME_BETWEEN_PEAKS,
24      max_between_peaks=MAX_TIME_BETWEEN_PEAKS,
25      peak_neighbor_count=PEAK_NEIGHBOR_COUNT,
26      hash_peak_fan_out=HASH_PEAK_FAN_OUT):
27    self.plot_fingerprint = plot_fingerprint
28    self.max_between_peaks = max_between_peaks
29    self.min_between_peaks = min_between_peaks
30    self.peak_neighbor_count = peak_neighbor_count
31    self.hash_peak_fan_out = hash_peak_fan_out
32
33
34  def fingerprint(self, channel_samples, sample_rate):
35    frequency_bins = mlab.specgram(channel_samples,
36         NFFT=FFT_WINDOW_SIZE,
37         Fs=sample_rate,
38         window=mlab.window_hanning,
39         noverlap=int(FFT_WINDOW_SIZE * OVERLAP_RATIO))[0]
40    # specgram gives us a frequency break down that is linear, when frequencies are logarithmic
41    frequency_bins = 10 * numpy.log10(frequency_bins)
42    frequency_bins[frequency_bins == -numpy.inf] = 0
43    binary_structure = generate_binary_structure(2, 1)
44    neighborhood = iterate_structure(binary_structure, PEAK_NEIGHBOR_COUNT)
45    local_max = maximum_filter(frequency_bins, footprint=neighborhood) == frequency_bins
46    background = (frequency_bins == 0)
47    eroded_background = binary_erosion(background, structure=neighborhood, border_value=1)
48    detected_peaks = local_max ^ eroded_background
49    amps = frequency_bins[detected_peaks]
50    j, i = numpy.where(detected_peaks)
51    amps = amps.flatten()
52    peaks = zip(i, j, amps)
53    filtered_peaks = [x for x in peaks if x[2] > MINIMUM_AMPLITUDE]
54    frequency_component = [x[1] for x in filtered_peaks]
55    time_component = [x[0] for x in filtered_peaks]
56    if self.plot_fingerprint:
57      fig, ax = plt.subplots()
58      ax.imshow(frequency_bins)
59      ax.scatter(time_component, frequency_component)
60      ax.set_xlabel('Time')
61      ax.set_ylabel('Frequency')
62      ax.set_title("Spectrogram")
63      plt.gca().invert_yaxis()
64      plt.show()
65    peaks = zip(frequency_component, time_component)
66    print("peak count: {}".format(len(peaks)))
67    return self.generate_hashes(peaks)
68
69
70  @staticmethod
71  def generate_hashes(peaks):
72    peaks.sort(key=itemgetter(1))
73    for i in range(len(peaks)):
74      for j in range(1, HASH_PEAK_FAN_OUT):
75        if (i + j) < len(peaks):
76          freq1 = peaks[i][0]
77          freq2 = peaks[i + j][0]
78          t1 = peaks[i][1]
79          t2 = peaks[i + j][1]
80          t_delta = t2 - t1
81          if t_delta >= MINIMUM_TIME_BETWEEN_PEAKS and t_delta <= MAX_TIME_BETWEEN_PEAKS:
82            h = hash("{}:{}:{}".format(str(freq1), str(freq2), str(t_delta)))
83            yield (t1, h)