Add Custom Pipeline StageΒΆ

  • All pipeline stages (other than inference models, sources and sinks) currently supported are maintained under ./deploy/pipelines dir

  • Following steps explain how to add a custom pipeline stage. The pipeline stage support class:

    • should derive from the Pipeline base class.

    • should declare and initialize the member variable: name to a unique string value. This custom pipeline stage is referenced in the configuration json by this name. For example,

      • The image-crop pipeline stage is incorporated into a pipeline as shown:

        "pipeline": [
            "image-crop:100:100:200:200","person-detection-0200:a:0.3:0.3", "social-distancing:0.2"
    • should override the extract_params method.
      • The extract_params method accepts the variable length arg list from the configuration json parser, validates and returns the arg list to be passed to the __init__ method

    • should implement an __init__ method that accepts an input param list corresponding to the o/p params list of the extract_params method

    • should implement __str__ and __repr__ methods as shown in the sample implementations (refer ./deploy/pipelines dir).

    • should implement the pipeline stage logic in the run method
      • The run method receives a single param meta, which is a frame level dict, with the following basic structure:

            "sid": <preset stream id>,
            "cid": <preset channel id>,
            "timestamp": <frame timestamp>,
            "image": <raw ndarray of the image frame>,
            "jpeg": <jpeg encoded image>,
            "auxiliary": {<dictionary maintaining frame level auxiliary information>},
            "latency": <pipeline latency pertaining to this frame>,
        • The meta dict additionally contains a key:val pair for every inference type supported. for example:

              "detections": [<list of any o/p bbox detected by a prev inf stage>],
              "pose": [<list of any pose coordinates detected by a prev inf stage>],
      • The run method can manipulate any of the following key:val of the meta dict. Some example snippets:

        • The following snippet manipulates the detection value, adding additional analytics info per detection

          def run(self, meta):
          detections = meta.pop("detections")
          if len(detections) != 0:
              # Transform dict to np array
              inp = np.array(
                          det.get("analytics", {}).get("xmin", det["xmin"]),
                          det.get("analytics", {}).get("ymin", det["ymin"]),
                          det.get("analytics", {}).get("xmax", det["xmax"]),
                          det.get("analytics", {}).get("ymax", det["ymax"]),
                      for det in detections
              ret = self._dist_violation.eval(inp)
              # Apply the violation mask
              for did, det in enumerate(detections):
                  analytics = det.pop("analytics", {})
                  analytics.update({"violation": np.bool(ret[did, -1])})
                  det["analytics"] = analytics
          meta["detections"] = detections
          return meta
        • The following snippet manipulates the image and crops it to a desired ROI

          image = meta.pop("image")
          image = np.ascontiguousarray(image[self._start_coords[0]:self._shape[0], self._start_coords[1]:self._shape[1]])
          meta["image"] = image
          return meta
        • In some cases the auxiliary value is updated along with other updates. The following snippet demonstrates the same:

          auxiliary = meta.pop("auxiliary")
          meta["auxiliary"] = auxiliary

          In this example, the auxiliary info is updated with a json dict defining a boundary line at the frame level, which gets passed to the downstream zmq subscriber