Subscriber Example¶
This tool is intended as an example of subscribing to a Nimble data stream. You can find it under tools/zmq_subscriber
First you will need to make sure that your license.json
is in the directory, and that you have the latest docker images:
user@host:~$ docker-compose pull
To run the example:
user@host:~$ docker-compose up
Code Breakdown¶
First we need to establish a connection to the configuration socket:
context = zmq.Context()
config_socket = context.socket(zmq.REQ)
config_socket.connect("tcp://nimble:22951")
Make a request for the configuration data:
config_socket.send_string("config")
recv_message = config_socket.recv_json()
The configuration data contains the subscriber port, extract it and establish as connection to the zmq data socket:
subscriber_port = recv_message["dataPort"]
data_socket = context.socket(zmq.SUB)
data_socket.subscribe("")
data_socket.connect(f"tcp://nimble:{subscriber_port}")
data_socket.setsockopt(zmq.RCVTIMEO, 1000)
With the connection established you can start requesting data, and parsing the topic. There are currently 3 Topics: channel/id
, fps
and latency
.
data = data_socket.recv_multipart()
topic = data[0].decode('utf-8')
The fps
and latency
topic are simple reporting streams on overall performance health of the pipeline, the majority of the data is contained within the channel/id
topic(s). Extract out of the channel id and meta data as a python dictionary:
channel_id = int(topic.split("/")[1])
metadata = json.loads(data[1].decode('utf-8'))
Parsing the metadata is outside of the scope of this document please refer to the metadata schemas.
Each channel/id
topic can have either 2, 3 or 4 data items depending on the configuration. The code below check if mp4 proxy streaming is enabled for that channel by referencing the “streams” field in the configuration data. If then uses the length of data packet to determine if JPEG data has been attached.
proxy_streaming = str(channel_id) in recv_message["streams"]
jpeg_exists = len(data) == 4 if proxy_streaming else len(data) == 3
jpeg_data_idx = 3 if proxy_streaming else 2
Finally the Presentation Timestamp and JPEG data are extracted if they exists.
pts = data[2].decode('utf-8') if proxy_streaming else None
jpeg_data = data[jpeg_data_idx] if jpeg_exists else None
For a detailed description of the ZMQ publishing protocol please refer to: Data Publishing Endpoint (PUB-SUB) in the ZMQ API.