Add Custom Pipeline Stage
All pipeline stages (other than inference models, sources and sinks) currently supported are maintained under
./deploy/pipelines
dirFollowing steps explain how to add a custom pipeline stage.
The pipeline stage support class:should derive from the
Pipeline
base class.should declare and initialize the member variable:
name
to a unique string value. This custom pipeline stage is referenced in the deployment configuration by this name.
For example, theimage-crop
pipeline stage is incorporated into a pipeline as shown:pipeline:
- image-crop:100:100:200:200
- person-detection-0200:a:0.3:0.3
- social-distancing:0.2should override the
extract_params
method.- The
extract_params
method accepts the short-form of a pipeline element as a list of strings, validates and returns the arg list to be passed to the__init__
method
- The
should override the
extract_params_to_kwargs
method.- The
extract_params_to_kwargs
method accepts the long-form of a pipeline element as adict
object, validates and returns the kwargs to be passed to the__init__
method
- The
should implement an
__init__
method that accepts an input param list corresponding to the o/p params list of theextract_params
method andextract_params_to_kwargs
method.should implement
__str__
and__repr__
methods as shown in the sample implementations (refer./deploy/pipelines
dir).should implement the pipeline stage logic in the
process
methodThe
process
method receives a single parammeta
, which is a frame leveldict
, with the following basic structure:{
"plid": <preset pipeline id>,
"sidx": <preset source index>,
"timestamp": <frame timestamp>,
"image": <raw ndarray of the image frame>,
"jpeg": <jpeg encoded image>,
"auxiliary": {<dictionary maintaining frame level auxiliary information>},
"latency": <pipeline latency pertaining to this frame>,
}- The
meta
dict additionally contains a key:val pair for every inference type supported. For example:{
"detections": [<list of any o/p bbox detected by a prev inf stage>],
"pose": [<list of any pose coordinates detected by a prev inf stage>],
...
}
- The
The
process
method can manipulate any of the following key:val of themeta
dict. Some example snippets:The following snippet manipulates the
detection
value, adding additional analytics info per detection:def process(self, meta):
detections = meta.pop("detections")
if len(detections) != 0:
# Transform dict to np array
inp = np.array(
[
[
det.get("analytics", {}).get("xmin", det["xmin"]),
det.get("analytics", {}).get("ymin", det["ymin"]),
det.get("analytics", {}).get("xmax", det["xmax"]),
det.get("analytics", {}).get("ymax", det["ymax"]),
]
for det in detections
]
)
ret = self._dist_violation.eval(inp)
# Apply the violation mask
for did, det in enumerate(detections):
analytics = det.pop("analytics", {})
analytics.update({"violation": np.bool(ret[did, -1])})
det["analytics"] = analytics
meta["detections"] = detections
return metaThe following snippet manipulates the
image
and crops it to a desired ROI:image = meta.pop("image")
image = np.ascontiguousarray(image[self._start_coords[0]:self._shape[0], self._start_coords[1]:self._shape[1]])
meta["image"] = image
return metaIn some cases the
auxiliary
value is updated along with other updates. The following snippet demonstrates the same:auxiliary = meta.pop("auxiliary")
auxiliary.update(self._get_line())
meta["auxiliary"] = auxiliaryIn this example, the auxiliary info is updated with a json dict defining a boundary line at the frame level, which gets passed to the downstream zmq subscriber.