だいぶ一般的になっているBLEについて。すごく概念的なことはあちこちに書いてある。が、下から上まで(むしろ下だけ)知りたいのに、資料少なすぎ。なので自力で調べる無茶な挑戦。第5回=CH37に飛んでるアドバタイズをじゃんじゃん見てみる
Adalm-plutoを操作するPythonコードを変更してたっくさんのデータをとれるように
サンプリングレートを8MS/sにして、取得データ長を10Mptsにする。
-
import numpy as np
-
import adi
-
import matplotlib.pyplot as plt
-
sample_rate = 8e6 # Hz
-
center_freq = 2402e6 # Hz
-
num_samps = 10000000 # number of samples per call to rx()
-
sdr = adi.Pluto("ip:192.168.2.1")
-
sdr.sample_rate = int(sample_rate)
-
# Config Rx
-
sdr.rx_lo = int(center_freq)
-
#sdr.rx_rf_bandwidth = int(sample_rate)
-
sdr.rx_rf_bandwidth = int(2e6)
-
sdr.rx_buffer_size = num_samps
-
sdr.gain_control_mode_chan0 = 'manual'
-
sdr.rx_hardwaregain_chan0 = 6.0 # dB, increase to increase the receive gain, but be careful not to saturate the ADC
-
# Clear buffer just to be safe
-
for i in range (0, 10):
-
raw_data = sdr.rx()
-
# Receive samples
-
rx_samples = sdr.rx()
-
print(rx_samples)
-
t=np.linspace(0,1/sample_rate*num_samps,num_samps)
-
np.save('o_t',t)
-
np.save('o_rx_t',rx_samples)
-
# Calculate power spectral density (frequency domain version of signal)
-
psd = np.abs(np.fft.fftshift(np.fft.fft(rx_samples)))**2
-
psd_dB = 10*np.log10(psd)
-
f = np.linspace(sample_rate/-2, sample_rate/2, len(psd))
-
# Plot time domain
-
plt.figure(0)
-
plt.plot(t,np.real(rx_samples))
-
plt.plot(t,np.imag(rx_samples))
-
plt.xlabel("Time")
-
# Plot freq domain
-
plt.figure(1)
-
plt.plot(f/1e6, psd_dB)
-
plt.xlabel("Frequency [MHz]")
-
plt.ylabel("PSD")
-
plt.xlim(-2,2)
-
plt.show()
で、こうなる。
nRF52840モジュールをAdalm-plutoにピタピタに置いているんで一番振幅が大きいのがきっとそれで、それ以外にも色々いるよね。大きいのも小さいのもできるだけ見ていくことにする。
で、全部復調復号してみると、エラーが出まくるので、データがありそうなところだけ復調復号していくことにする。で、最初は人力で範囲を指定して、、、って思ったけど、まぁまぁメンドクサイ。振幅とパタンで自動判定しよう!いやーこの辺の判断が難しいよねー。自動化に要する時間と人力で要する時間と一体どっちが早いのか。ある程度人力でやってみないと自動化の必要性がわからないってのは間違いない。今回はもう何回かやってたので、こりゃ人力じゃしんどいってわかったので、自動化や!-
import numpy as np
-
import matplotlib.pyplot as plt
-
from scipy import signal
-
-
def search_index_by_pattern(data,pattern):
-
ds=data.size
-
ps=pattern.size
-
cyc=ds-ps+1
-
ret=-1
-
if cyc>0:
-
for i in range(cyc):
-
data_part=data[i:i+ps]
-
if np.all(data_part==pattern):
-
ret=i
-
break
-
return ret
-
-
f_sym=1e6 #
-
rx_all=np.load('o_rx_t.npy')
-
t_all=np.load('o_t.npy')
-
t_all=t_all-t_all[0]
-
t_all_0=t_all[:-1:]
-
t_all_1=t_all[1::]
-
t_samp=np.mean(t_all_1-t_all_0)
-
f_samp=1/t_samp
-
print(f_samp/1e6) # should be 8MHz
-
-
# フィルター係数生成
-
freq_cutoff=f_sym*2
-
w_cutoff=freq_cutoff/(f_samp/2)
-
b,a=signal.butter(1,w_cutoff,'lowpass')
-
-
# フィルター適用
-
rx_all=signal.lfilter(b,a,rx_all)
-
-
rx_all_amp=np.abs(rx_all)
-
rx_all_amp_dig=np.where(rx_all_amp>5,1,0)
-
rx_all_amp_dig_0=rx_all_amp_dig[:-1:]
-
rx_all_amp_dig_1=rx_all_amp_dig[1::]
-
rx_all_amp_dig_10=np.append(rx_all_amp_dig_1-rx_all_amp_dig_0,0)
-
rx_all_amp_dig_rising_edge=np.where(rx_all_amp_dig_10>0)[0]
-
rx_all_amp_dig_falling_edge=np.where(rx_all_amp_dig_10<0)[0]
-
print(rx_all_amp_dig_rising_edge.size)
-
print(rx_all_amp_dig_falling_edge.size)
-
-
pa_and_aa=np.array([0,1,0,1,0,1,0,1,0,1,1,0,1,0,1,1,0,1,1,1,1,1,0,1,1,0,0,1,0,0,0,1,0,1,1,1,0,0,0,1])
-
-
for i in range(rx_all_amp_dig_rising_edge.size):
-
if rx_all_amp_dig_falling_edge[i]-rx_all_amp_dig_rising_edge[i]>40:
-
rx=rx_all[rx_all_amp_dig_rising_edge[i]:rx_all_amp_dig_falling_edge[i]]
-
t=t_all[rx_all_amp_dig_rising_edge[i]:rx_all_amp_dig_falling_edge[i]]
-
# demod
-
rx_t0=rx[:-1:]
-
rx_t1=rx[1::]
-
rx_dem=-1*np.angle(rx_t1/rx_t0)
-
rx_dem=np.append(0,rx_dem)
-
# normalization
-
rx_dem=rx_dem/(f_sym/f_samp*np.pi*1)
-
# digitization
-
rx_dig=np.where(rx_dem>0,0,1)
-
# counter top value
-
counter_top=8-1
-
counter_half=int(counter_top/2)
-
cnt_list=[]
-
cnt_list.append(int(0))
-
for i in range(rx_dig.size-1):
-
dig_pres=rx_dig[i+1]
-
dig_prev=rx_dig[i]
-
if (dig_pres!=dig_prev):
-
cnt_list.append(int(0))
-
else:
-
if (cnt_list[i]<counter_top):
-
cnt_list.append(cnt_list[i]+int(1))
-
else:
-
cnt_list.append(int(0))
-
cnt=np.array(cnt_list)
-
smp_pos=np.where(cnt==counter_half)[0]
-
rd_dig_smp=rx_dig[smp_pos]
-
pp_and_aa_pos=search_index_by_pattern(rd_dig_smp,pa_and_aa)
-
if(pp_and_aa_pos>0):
-
print("t_start=",end="")
-
print(t[0])
-
print("t_end=",end="")
-
print(t[-1])
-
print(pp_and_aa_pos)
-
rd_dig_smp=rd_dig_smp[pp_and_aa_pos:]
-
pos=0
-
while pos<rd_dig_smp.size:
-
for n in range(8):
-
print(rd_dig_smp[pos],end='')
-
pos=pos+1
-
if pos==rd_dig_smp.size:
-
break
-
print("")
-
振幅が5以上の範囲で、復調復号して、プリアンブルとAccess Addressを既知のパタンとして検索して存在すれば結果を表示する。ってしている。ndarrayからパタンマッチするインデックスを探すってのが機能としてすでにありそうなのに見つからなかったので自力で作成している。まぁ絶妙にローテク。それと、ndarrayを直接printすると、"["がついてたり、勝手に改行したり後で面倒なので、これまたローテクで、ついでに8シンボルごとに改行してprintしている。ちなみに今度は行数が多すぎて、VSCodeのコンソールの行バッファが足りなくなるので、そこは都度、よしなに。ちなみに個人的にはリダイレクトでファイルに出力するって手段を好んで使っている。
で、こんな感じで出力される。
で、数えると24フレームとれたようだ。実際の受信波形を見ると、振幅大と振幅中を数えると24個ある。ちなみに振幅小部分は拡大すると、
って感じで振幅が動いているので、きっと隣のチャンネルだね。で、隣はアドバタイズじゃないからAccess Addressが見つからなかったってことでしょ(そもそもFSK復調できたのかどうかもわからんが)。
で、解析してみると、こんなんで、
結局、
6024DE30CD58ABDF0201061AFF590002150112233445566778899AABBCCDDEEFF0DEADBEEFCA54545E
ってのと、
4225E6BDBB8333081EFF060001092022A0DA83015D56DBE5C18DB0E031F8F60DC69DA1F4E008C09DAD78
ってのが繰り返し来てるってのがわかった。
で、1つ目はSeeed XIAO BLE nRF52840のbeaconで2つ目は何かわからん(いや、いつかわかるようになりたい)。ところで、これをどう解釈するのかってので、Android用にnRF Connectっていうツールがある(アイホン用ももちろんあるだろうけど林檎法人は商品戦略が好きじゃないので敢えて知らんふうを装っている)ので利用してみると、
RAWを押すと、
Header : 60
PDU Length : 24 (36bytes)
AdvA : DE30CD58ABDF
AdvData :
02 01 06
1A FF 590002150112233445566778899AABBCCDDEEFF0DEADBEEFCA
CRC : 54545E
で、AdvDataについて、Assigned Numbers(https://www.bluetooth.com/ja-jp/specifications/assigned-numbers/)を見ると、
ということらしい。で、電波からデータを取り出すってのは一部できたんだけど、Complete local Nameって飛んできてないんだけど、どうなってるん?って思ってる。
コメントをお書きください