废物再利用计划:使用 RTL-SDR 来收听 FM 广播
某天 @moycat 翻出来一个 RTL-SDR 的接收器(R820T2),天线已经烂掉了,但是粘一下还能用(
用 SDRSharp 调试了一晚上可以收到调频广播,但是比较可惜的是没有收到机场(估计是太远了)和附近出租车司机的对讲。眼看接收器没啥用了,不如用来做点东西
当然是用来搭个 webserver 坠吼啦,说干咱就开始找有没有现成的库。nodejs 有两个 sdr 的库,一个叫 node-rtlsdr,包了一层 librtlsdr;另一个叫 rtl-sdr,也是包了一层 librtlsdr,但是很可惜的是这两个库都很背时,完全跑不起来(其实还有一个极为申必的用 node-usb / WebUSB 做的库,但发现的时候已经太晚了 XD)
要说还是 python 的轮子最全最圆了,我找到一个叫 pyrtlsdr 的库,里面竟然用上了极为先进的协程,可以 async 处理数据流,就是它了
让我们先把 example 粘到 VSCode 里面,好耶!跑起来了,我们可以成功打印出来 sdr 的数据了
import asyncio
from rtlsdr import RtlSdr
async def streaming():
sdr = RtlSdr()
async for samples in sdr.stream():
# do something with samples
print(samples)
# to stop streaming:
await sdr.stop()
# done
sdr.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(streaming())
import asyncio
from rtlsdr import RtlSdr
async def streaming():
sdr = RtlSdr()
async for samples in sdr.stream():
# do something with samples
print(samples)
# to stop streaming:
await sdr.stop()
# done
sdr.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(streaming())
但是有一点点问题,这里的 samples 都是很奇怪的数据,似乎不是某种音频信号。经过一番搜索发现这里的采样数据都是调制后的信号,需要一番变换才能得到原始音频(PCM 数据)
关于 IQ Sampling 可以参考这些文章,当然以博主贫瘠的数学知识暂时还无法参透为什么要这样
万能的 StackOverflow 上面有一份实现,我们只要稍加改造一下就可以给后端使用
Fs = sdr.sample_rate
bwFM = 200000 # approx. 170 kHz for a single channel
decRate = int(Fs/bwFM)
newFs = Fs/decRate
# Find a decimation rate to achieve audio sampling rate between 44-48 kHz
audioFreq = 44100.0
dec_audio = int(newFs/audioFreq)
amp = 10000 / 32768
def demod(samples):
# Convert sampled data into numpy array
x1 = np.array(samples).astype("complex64")
# Downmixed Baseband Signal (Adjust offset to be centered)
offsetFreq = 0 # already centered
fc1 = np.exp(-1.0j * 2.0 * np.pi * offsetFreq/Fs * np.arange(len(x1)))
x2 = x1 * fc1
# Filter and downsample the FM Radio Signal
x3 = signal.decimate(x2, decRate)
### Demodulate 200kHz FM Signal
# Polar discriminator
y4 = x3[1:] * np.conj(x3[:-1])
x4 = np.angle(y4)
# The de-emphasis filter
# Given a signal 'x4' (in a numpy array) with sampling rate newFS
d = newFs * 75e-6 # Calculate the # of samples to hit the -3dB point
x = np.exp(-1/d) # Calculate the decay between each sample
b = [1-x] # Create the filter coefficients
a = [1,-x]
x5 = signal.lfilter(b,a,x4)
audioFs = newFs/dec_audio
x6 = signal.decimate(x5, dec_audio)
# Scale audio to adjust volume
x6 *= amp / np.max(np.abs(x6))
return np.float32(x6)
Fs = sdr.sample_rate
bwFM = 200000 # approx. 170 kHz for a single channel
decRate = int(Fs/bwFM)
newFs = Fs/decRate
# Find a decimation rate to achieve audio sampling rate between 44-48 kHz
audioFreq = 44100.0
dec_audio = int(newFs/audioFreq)
amp = 10000 / 32768
def demod(samples):
# Convert sampled data into numpy array
x1 = np.array(samples).astype("complex64")
# Downmixed Baseband Signal (Adjust offset to be centered)
offsetFreq = 0 # already centered
fc1 = np.exp(-1.0j * 2.0 * np.pi * offsetFreq/Fs * np.arange(len(x1)))
x2 = x1 * fc1
# Filter and downsample the FM Radio Signal
x3 = signal.decimate(x2, decRate)
### Demodulate 200kHz FM Signal
# Polar discriminator
y4 = x3[1:] * np.conj(x3[:-1])
x4 = np.angle(y4)
# The de-emphasis filter
# Given a signal 'x4' (in a numpy array) with sampling rate newFS
d = newFs * 75e-6 # Calculate the # of samples to hit the -3dB point
x = np.exp(-1/d) # Calculate the decay between each sample
b = [1-x] # Create the filter coefficients
a = [1,-x]
x5 = signal.lfilter(b,a,x4)
audioFs = newFs/dec_audio
x6 = signal.decimate(x5, dec_audio)
# Scale audio to adjust volume
x6 *= amp / np.max(np.abs(x6))
return np.float32(x6)
有了 PCM 数据当然是要用 WebSocket 丢给前端啦(至于为什么不是 WebRTC?👴 不会),随便找一个现代化 http server 库把解调后的 buffer 吐给当前连接到的 WebSocket Client,前端这边只需要用个 AudioContext
把 buffer 塞进去就可以在浏览器开始放广播 🌶。
let ws = new WebSocket(sound)
ws.binaryType = 'arraybuffer'
ws.onmessage = function (event) {
let pcm_chunks = new Float32Array(event.data)
let node = audioCtx.createBufferSource()
let buffer = audioCtx.createBuffer(1, pcm_chunks.byteLength, 44100)
let data = buffer.getChannelData(0)
buffer.copyToChannel(pcm_chunks, 0, 0)
node.buffer = buffer
node.loop = false
node.connect(audioCtx.destination)
node.start()
}
let ws = new WebSocket(sound)
ws.binaryType = 'arraybuffer'
ws.onmessage = function (event) {
let pcm_chunks = new Float32Array(event.data)
let node = audioCtx.createBufferSource()
let buffer = audioCtx.createBuffer(1, pcm_chunks.byteLength, 44100)
let data = buffer.getChannelData(0)
buffer.copyToChannel(pcm_chunks, 0, 0)
node.buffer = buffer
node.loop = false
node.connect(audioCtx.destination)
node.start()
}
理想当然是美好的,现实是每次前端收到 buffer 的时间不太确定,从局域网访问还好,如果延迟高一点就会由于每次收到 buffer 之间的延迟产生短暂的卡顿(大概每秒二三十次的样子),会很难受。聪明的 @moycat 提出可以先采集到足够时长的 buffer 再一次性发出去,这个样子理论上就只有每段之间才会卡一下。取 sample_rate 个样本的 chunk(也就是 1s)的话基本就足够了
另外一个措施是用 AudioBufferSourceNode
的 start(when)
指定下一次播放的时间让它精准地在当前 buffer 结束之后播放。如果收到 buffer 的时候已经太迟了,就把这个 buffer 再推迟一点播放。这样即使延迟高一些也基本听不出来卡顿了。
async def sending(queue: asyncio.Queue):
to_send = np.array([])
while True:
samples = await queue.get()
to_send = np.concatenate((to_send, samples))
if len(to_send) >= sdr.sample_rate:
data = demod(to_send[:int(sdr.sample_rate)])[0]
await websocket.send(data)
to_send = to_send[int(sdr.sample_rate):]
async def sending(queue: asyncio.Queue):
to_send = np.array([])
while True:
samples = await queue.get()
to_send = np.concatenate((to_send, samples))
if len(to_send) >= sdr.sample_rate:
data = demod(to_send[:int(sdr.sample_rate)])[0]
await websocket.send(data)
to_send = to_send[int(sdr.sample_rate):]
let wait_time = 1
let next_play = 0
ws.binaryType = 'arraybuffer'
ws.onmessage = function (event) {
let pcm_chunks = new Float32Array(event.data)
let node = audioCtx.createBufferSource()
let buffer = audioCtx.createBuffer(1, pcm_chunks.byteLength, SAMPLE_RATE)
buffer.copyToChannel(pcm_chunks, 0, 0)
node.buffer = buffer
node.loop = false
node.connect(audioCtx.destination)
now = audioCtx.currentTime
if (now >= next_play) {
next_play = parseInt(now + wait_time)
wait_time *= EXP_RATIO
if (wait_time > MAX_WAIT_TIME) wait_time = MAX_WAIT_TIME
} else {
next_play += CHUNK_MS / 1000
}
node.start(next_play, 0, CHUNK_MS)
}
let wait_time = 1
let next_play = 0
ws.binaryType = 'arraybuffer'
ws.onmessage = function (event) {
let pcm_chunks = new Float32Array(event.data)
let node = audioCtx.createBufferSource()
let buffer = audioCtx.createBuffer(1, pcm_chunks.byteLength, SAMPLE_RATE)
buffer.copyToChannel(pcm_chunks, 0, 0)
node.buffer = buffer
node.loop = false
node.connect(audioCtx.destination)
now = audioCtx.currentTime
if (now >= next_play) {
next_play = parseInt(now + wait_time)
wait_time *= EXP_RATIO
if (wait_time > MAX_WAIT_TIME) wait_time = MAX_WAIT_TIME
} else {
next_play += CHUNK_MS / 1000
}
node.start(next_play, 0, CHUNK_MS)
}
做到这里还有一个问题就是多个设备访问只有最后一个设备能收到音频,要支持多设备的话就得稍微改一下,给每个 ws client 建立一个 queue,每次收集到采样数据之后就往所有 client 的 queue 里面塞(
def collect_websocket(func):
@wraps(func)
async def wrapper(*args, **kwargs):
global connected_websockets
queue = asyncio.Queue()
connected_websockets.add(queue)
try:
return await func(queue, *args, **kwargs)
finally:
connected_websockets.remove(queue)
return wrapper
async def broadcast(data):
for queue in connected_websockets:
queue.put_nowait(data)
async def streaming():
async for samples in sdr.stream():
await broadcast(samples)
await sdr.stop()
sdr.close()
async def sending(queue: asyncio.Queue):
to_send = np.array([])
while True:
samples = await queue.get()
to_send = np.concatenate((to_send, samples))
if len(to_send) >= sdr.sample_rate:
data = demod(to_send[:int(sdr.sample_rate)])[0]
await websocket.send(data)
to_send = to_send[int(sdr.sample_rate):]
@app.websocket("/sound")
@collect_websocket
async def sound(queue: asyncio.Queue):
await sending(queue)
def collect_websocket(func):
@wraps(func)
async def wrapper(*args, **kwargs):
global connected_websockets
queue = asyncio.Queue()
connected_websockets.add(queue)
try:
return await func(queue, *args, **kwargs)
finally:
connected_websockets.remove(queue)
return wrapper
async def broadcast(data):
for queue in connected_websockets:
queue.put_nowait(data)
async def streaming():
async for samples in sdr.stream():
await broadcast(samples)
await sdr.stop()
sdr.close()
async def sending(queue: asyncio.Queue):
to_send = np.array([])
while True:
samples = await queue.get()
to_send = np.concatenate((to_send, samples))
if len(to_send) >= sdr.sample_rate:
data = demod(to_send[:int(sdr.sample_rate)])[0]
await websocket.send(data)
to_send = to_send[int(sdr.sample_rate):]
@app.websocket("/sound")
@collect_websocket
async def sound(queue: asyncio.Queue):
await sending(queue)
不过现在界面还是相当单调,加一点可视化应该会好一点。AudioContext
是由一堆 AudioNode
组成的,每个 AudioNode
具有 0 或 1 个输入和 0 或多个输出。比如 SourceNode
只有输出,DestinationNode
只有输入,GainNode
有一个输入和一个输出,用来给音频添加增益。此外还有 DelayNode
, BiquadFilterNode
等甚至包括 AudioWorkletNode
(用自定义的 Worker)来对音频进行各种处理的 Node。
我找到一个效果还行的可视化库 vudio.js,不过人家只支持 HTMLAudioElement
| MediaStream
,只能稍微魔改一下源码了(另外吐槽一下这个库也没有处理 resize 事件)。这里要做的就是让 Vudio 的 AnalyzerNode
接到 BufferSourceNode
的输出上,然后把 AnalyzerNode
的输出接到 AudioContext
的 DestinationNode
上,最终就这么成了
RTL-SDR (R820T2+RTL2832U) -> IQ samples -> PCM buffer -> BufferSourceNode -> AnalyzerNode -> DestinationNode -> 🎵
当然还有切换频率之类的东西过于简单就不在这里讲了。~~想要体验可以到 fm.semesse.me 。~~天线和 rtlsdr 插在 nuc 上,所以只能通过 cloudflare tunnel 从外部访问,如果访问不了那可能是 nuc 挂了(
中秋快乐~
-- 2022 Update --
NUC 的 USB 口插满了,WebFM 暂时下线(