Subscriber Example
This tool is intended as an example of subscribing to a Nimble data stream.
You can find it under tools/zmq_subscriber
in the release package.
First you will need to make sure that you have the latest docker images:
user@host:~$ docker-compose pull
To run the example:
user@host:~$ docker-compose up
Code Breakdown
First, we need to establish a connection to the configuration socket:
context = zmq.Context()
config_socket = context.socket(zmq.REQ)
config_socket.connect("tcp://nimble:22951")
Make a request for the configuration data:
config_socket.send_string("config")
recv_message = config_socket.recv_json()
The configuration data contains the subscriber port, extract it and establish as connection to the zmq data socket:
subscriber_port = recv_message["dataPort"]
data_socket = context.socket(zmq.SUB)
data_socket.subscribe("")
data_socket.connect(f"tcp://nimble:{subscriber_port}")
data_socket.setsockopt(zmq.RCVTIMEO, 1000)
With the connection established you can start requesting data, and parsing the topic.
There are currently 3 topics: channel/id
, fps
and latency
.
data = data_socket.recv_multipart()
topic = data[0].decode('utf-8')
The fps
and latency
topic are simple reporting streams on overall performance health of the pipeline, the majority of the data is contained within the channel/id
topic(s).
Extract out of the channel id and metadata as a python dictionary:
channel_id = int(topic.split("/")[1])
metadata = json.loads(data[1].decode('utf-8'))
Parsing the metadata is outside the scope of this document please refer to the metadata schemas.
Each channel/id
topic can have either 2, 3 or 4 data items depending on the configuration. The code below check if mp4 proxy streaming is enabled for that channel by referencing the "streams" field in the configuration data. If then uses the length of data packet to determine if JPEG data has been attached.
proxy_streaming = str(channel_id) in recv_message["streams"]
jpeg_exists = len(data) == 4 if proxy_streaming else len(data) == 3
jpeg_data_idx = 3 if proxy_streaming else 2
Finally, the Presentation Timestamp and JPEG data are extracted if they exist:
pts = data[2].decode('utf-8') if proxy_streaming else None
jpeg_data = data[jpeg_data_idx] if jpeg_exists else None
For a detailed description of the ZMQ publishing protocol please refer to: Data Publishing Endpoint (PUB-SUB) in the ZMQ API.