Skip to main content

Add Custom Pipeline Stage

  • All pipeline stages (other than inference models, sources and sinks) currently supported are maintained under ./deploy/pipelines dir

  • Following steps explain how to add a custom pipeline stage.
    The pipeline stage support class:

    • should derive from the Pipeline base class.

    • should declare and initialize the member variable: name to a unique string value. This custom pipeline stage is referenced in the deployment configuration by this name.
      For example, the image-crop pipeline stage is incorporated into a pipeline as shown:

      pipeline:
      - image-crop:100:100:200:200
      - person-detection-0200:a:0.3:0.3
      - social-distancing:0.2
    • should override the extract_params method.

      • The extract_params method accepts the short-form of a pipeline element as a list of strings, validates and returns the arg list to be passed to the __init__ method
    • should override the extract_params_to_kwargs method.

      • The extract_params_to_kwargs method accepts the long-form of a pipeline element as a dict object, validates and returns the kwargs to be passed to the __init__ method
    • should implement an __init__ method that accepts an input param list corresponding to the o/p params list of the extract_params method and extract_params_to_kwargs method.

    • should implement __str__ and __repr__ methods as shown in the sample implementations (refer ./deploy/pipelines dir).

    • should implement the pipeline stage logic in the process method

      • The process method receives a single param meta, which is a frame level dict, with the following basic structure:

        {
        "plid": <preset pipeline id>,
        "sidx": <preset source index>,
        "timestamp": <frame timestamp>,
        "image": <raw ndarray of the image frame>,
        "jpeg": <jpeg encoded image>,
        "auxiliary": {<dictionary maintaining frame level auxiliary information>},
        "latency": <pipeline latency pertaining to this frame>,
        }
        • The meta dict additionally contains a key:val pair for every inference type supported. For example:
          {
          "detections": [<list of any o/p bbox detected by a prev inf stage>],
          "pose": [<list of any pose coordinates detected by a prev inf stage>],
          ...
          }
      • The process method can manipulate any of the following key:val of the meta dict. Some example snippets:

        • The following snippet manipulates the detection value, adding additional analytics info per detection:

          def process(self, meta):
          detections = meta.pop("detections")

          if len(detections) != 0:
          # Transform dict to np array
          inp = np.array(
          [
          [
          det.get("analytics", {}).get("xmin", det["xmin"]),
          det.get("analytics", {}).get("ymin", det["ymin"]),
          det.get("analytics", {}).get("xmax", det["xmax"]),
          det.get("analytics", {}).get("ymax", det["ymax"]),
          ]
          for det in detections
          ]
          )
          ret = self._dist_violation.eval(inp)
          # Apply the violation mask
          for did, det in enumerate(detections):
          analytics = det.pop("analytics", {})
          analytics.update({"violation": np.bool(ret[did, -1])})
          det["analytics"] = analytics

          meta["detections"] = detections
          return meta
        • The following snippet manipulates the image and crops it to a desired ROI:

          image = meta.pop("image")
          image = np.ascontiguousarray(image[self._start_coords[0]:self._shape[0], self._start_coords[1]:self._shape[1]])
          meta["image"] = image
          return meta
        • In some cases the auxiliary value is updated along with other updates. The following snippet demonstrates the same:

          auxiliary = meta.pop("auxiliary")
          auxiliary.update(self._get_line())
          meta["auxiliary"] = auxiliary

          In this example, the auxiliary info is updated with a json dict defining a boundary line at the frame level, which gets passed to the downstream zmq subscriber.