Kinect Interface

We will introduce how to record samples from the kinect and explain the process that read depth images from the camera and send them as inputs to the Loihi chip

Kinect recording

In order to use the process, you first need to install the pykinect_azure library :

#source your virtualenv
source neuro/bin/activate
pip install pykinect_azure

Then you can record a sample by runing this command :

k4arecorder -d WFOV_UNBINNED -c 1080p -r 15 -l 10 wfov.mkv

WFOV_UNBINNED is the option to choose the wide angle of the depth camera. The option -r 15 means that we record at 15fps and -l 10 indicates the length in seconds of the recording. More information on this command can be found here

Kinect process

The class can take several parameters such as the size of the depth (1024x1024 for WFOV option). More information about camera parameters are available here. The mode parameter indicate how you want to process the depth images : depth, smooth depth, tranformed (transform depth to color frame - removing the fisheye distortion). Here is an example : test test

The other parameters are min_depth and max_depth, which defines a depth you want to monitor. Everything outside this interval will by excluded from the depth image.

import sys
import numpy as np
import time
from threading import Thread
import cv2
from lava.magma.core.decorator import implements, requires, tag
from lava.magma.core.model.py.model import PyLoihiProcessModel
from lava.magma.core.model.py.ports import PyOutPort
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.process.ports.ports import OutPort
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.resources import CPU
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
import pykinect_azure as pykinect
import ctypes
from pykinect_azure.k4a import _k4a
from pykinect_azure.k4a.image import Image

class KinectReader(AbstractProcess):
   """
   Process that read depth images from ros bag file and send them through outport 

   Parameters
   ----------
   depth_shape: tuple
      shape of the depth image.
   file_name: str
      name of the recording.
   mode: str
      type of depth image. "depth","smooth","transformed"
   min_depth: int
      minimal depth to process.
   max_depth: int
      maximal depth to process.
   resize: tuple
      resizing the images if the parameter is set.
   """
   def __init__(
            self,
            depth_shape: tuple,
            filename: str = "",
            mode: str = "depth",
            min_depth: int = 0,
            max_depth: int = 10000,
            resize: tuple = None) -> None:
      if filename == "":
         raise ValueError(
            "you must provide the path of a bag"
         )
      if mode != "":
         self.mode = mode
      self.filename = filename
      self.min_depth = min_depth
      self.max_depth = max_depth
      self.resize = resize
      if resize is not None:
         self.depth_shape = self.resize
      else:
         self.depth_shape = depth_shape
      self.depth_out_port = OutPort(shape=self.depth_shape)
      super().__init__(
         depth_shape=self.depth_shape,
         filename=self.filename,
         mode=self.mode,
         min_depth=self.min_depth,
         max_depth=self.max_depth,
         resize=self.resize
      )

#class implementing the bag reader
@implements(proc=KinectReader, protocol=LoihiProtocol)
@requires(CPU)
class LoihiDensePyKinectReader(PyLoihiProcessModel):
   depth_out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, int)

   def __init__(self, proc_params: dict) -> None:
      super().__init__(proc_params)
      self._file_name = proc_params["filename"] 
      self._depth_shape = proc_params["depth_shape"]
      self._mode = proc_params["mode"]
      self._min_depth = proc_params["min_depth"]
      self._max_depth = proc_params["max_depth"]
      self._resize = proc_params["resize"]
      pykinect.initialize_libraries()
      self._playback = pykinect.start_playback(self._file_name)
      self._playback_config = self._playback.get_record_configuration()

   def run_spk(self):
      #print("get image")
      depth_image = self._get_kinect_frame()
      self.depth_out_port.send(depth_image)

   def _get_kinect_frame(self):
      #get next frame
      ret, capture = self._playback.update()
      depth_image = None
      cv_image_resized = None
      #setup mode
      if ret:
         if self._mode == "depth":
            ret_depth, depth_image = capture.get_depth_image()
         if self._mode == "smooth":
            ret_depth, depth_image = capture.get_smooth_depth_image()
         if self._mode == "transformed":
            ret_depth, depth_image = capture.get_transformed_depth_image()
         #resize to param size
         cv_image_resized = cv2.resize(depth_image, (self._resize[1],self._resize[0]), interpolation = cv2.INTER_AREA)
         #exclude depth values not between min_depth and max_depth
         for i in range(cv_image_resized.shape[0]):
            for j in range(cv_image_resized.shape[1]):
               if cv_image_resized[i,j] > self._max_depth or cv_image_resized[i,j] < self._min_depth:
                  cv_image_resized[i,j] = 0
      else:
         cv_image_resized = np.zeros((self._resize))

      return cv_image_resized
   
   def _pause(self):
      """Pause was called by the runtime"""
      super()._pause()
      self.t_pause = time.time_ns()

   def _stop(self) -> None:
      """Stop was called by the runtime"""
      super()._stop()   

The use of this process will be demonstrated in the tutorial section.