Skip to main content

Subscriber Example

This tool is intended as an example of subscribing to a Nimble data stream. You can find it under tools/zmq_subscriber in the release package.

First, you will need to make sure that your license manager is configured, and that you have the latest docker images:

user@host:~$ docker-compose pull

To run the example:

user@host:~$ docker-compose up

Code Breakdown

First, we need to establish a connection to the configuration socket:

context = zmq.Context()
config_socket = context.socket(zmq.REQ)
config_socket.connect("tcp://nimble:22951")

Make a request for the configuration data:

config_socket.send_string("config")
recv_message = config_socket.recv_json()

The configuration data contains the subscriber port, extract it and establish as connection to the zmq data socket:

subscriber_port = recv_message["dataPort"]
data_socket = context.socket(zmq.SUB)
data_socket.subscribe("")
data_socket.connect(f"tcp://nimble:{subscriber_port}")
data_socket.setsockopt(zmq.RCVTIMEO, 1000)

With the connection established you can start requesting data, and parsing the topic. There are currently 3 topics: channel/id, fps and latency.

data = data_socket.recv_multipart()
topic = data[0].decode('utf-8')

The fps and latency topic are simple reporting streams on overall performance health of the pipeline, the majority of the data is contained within the channel/id topic(s). Extract out of the channel id and metadata as a python dictionary:

channel_id = int(topic.split("/")[1])
metadata = json.loads(data[1].decode('utf-8'))

Parsing the metadata is outside the scope of this document please refer to the metadata schemas.

Each channel/id topic can have either 2, 3 or 4 data items depending on the configuration. The code below check if mp4 proxy streaming is enabled for that channel by referencing the "streams" field in the configuration data. If then uses the length of data packet to determine if JPEG data has been attached.

proxy_streaming = str(channel_id) in recv_message["streams"]
jpeg_exists = len(data) == 4 if proxy_streaming else len(data) == 3
jpeg_data_idx = 3 if proxy_streaming else 2

Finally, the Presentation Timestamp and JPEG data are extracted if they exist:

pts = data[2].decode('utf-8') if proxy_streaming else None
jpeg_data = data[jpeg_data_idx] if jpeg_exists else None

For a detailed description of the ZMQ publishing protocol please refer to: Data Publishing Endpoint (PUB-SUB) in the ZMQ API.