Add Custom Pipeline StageΒΆ
All pipeline stages (other than inference models, sources and sinks) currently supported are maintained under
./deploy/pipelines
dirFollowing steps explain how to add a custom pipeline stage. The pipeline stage support class:
should derive from the
Pipeline
base class.should declare and initialize the member variable:
name
to a unique string value. This custom pipeline stage is referenced in the configuration json by this name. For example,The
image-crop
pipeline stage is incorporated into a pipeline as shown:"pipeline": [ "image-crop:100:100:200:200","person-detection-0200:a:0.3:0.3", "social-distancing:0.2" ]
- should override the
extract_params
method. The
extract_params
method accepts the variable length arg list from the configuration json parser, validates and returns the arg list to be passed to the__init__
method
- should override the
should implement an
__init__
method that accepts an input param list corresponding to the o/p params list of theextract_params
methodshould implement
__str__
and__repr__
methods as shown in the sample implementations (refer./deploy/pipelines
dir).- should implement the pipeline stage logic in the
run
method The
run
method receives a single parammeta
, which is a frame level dict, with the following basic structure:{ "sid": <preset stream id>, "cid": <preset channel id>, "timestamp": <frame timestamp>, "image": <raw ndarray of the image frame>, "jpeg": <jpeg encoded image>, "auxiliary": {<dictionary maintaining frame level auxiliary information>}, "latency": <pipeline latency pertaining to this frame>, }
The
meta
dict additionally contains a key:val pair for every inference type supported. for example:{ "detections": [<list of any o/p bbox detected by a prev inf stage>], "pose": [<list of any pose coordinates detected by a prev inf stage>], ... }
The
run
method can manipulate any of the following key:val of themeta
dict. Some example snippets:The following snippet manipulates the
detection
value, adding additional analytics info per detectiondef run(self, meta): detections = meta.pop("detections") if len(detections) != 0: # Transform dict to np array inp = np.array( [ [ det.get("analytics", {}).get("xmin", det["xmin"]), det.get("analytics", {}).get("ymin", det["ymin"]), det.get("analytics", {}).get("xmax", det["xmax"]), det.get("analytics", {}).get("ymax", det["ymax"]), ] for det in detections ] ) ret = self._dist_violation.eval(inp) # Apply the violation mask for did, det in enumerate(detections): analytics = det.pop("analytics", {}) analytics.update({"violation": np.bool(ret[did, -1])}) det["analytics"] = analytics meta["detections"] = detections return meta
The following snippet manipulates the
image
and crops it to a desired ROIimage = meta.pop("image") image = np.ascontiguousarray(image[self._start_coords[0]:self._shape[0], self._start_coords[1]:self._shape[1]]) meta["image"] = image return meta
In some cases the
auxiliary
value is updated along with other updates. The following snippet demonstrates the same:auxiliary = meta.pop("auxiliary") auxiliary.update(self._get_line()) meta["auxiliary"] = auxiliary
In this example, the auxiliary info is updated with a json dict defining a boundary line at the frame level, which gets passed to the downstream zmq subscriber
- should implement the pipeline stage logic in the