Source code for depth_processing

__author__ = "Andrea Rizzo, Matteo Bruni"
__copyright__ = "Copyright 2014, Dining Engineers"
__license__ = "GPLv2"

"""
This module contains class for depth processing.
This class handles the depth camera status and its methods ensure proper updates to the background models
and the bounding boxes extraction.
"""

import numpy as np
import bg_models
from const import *
from utils import *


[docs]class DepthProcessing: """ Depth Processing Class """ ACCUMULATOR = 0 RECT_MATCHING = 1 RECT_MATCHING2 = 2 def __init__(self, image_shape=(640, 480)): self.current_frame = np.zeros(shape=image_shape, dtype=np.uint16) self.current_frame_holes = np.zeros(shape=image_shape, dtype=np.uint64) self.background_holes = np.zeros(shape=image_shape, dtype=np.uint64) self.accumulator = np.zeros(shape=image_shape, dtype=np.uint8) #self.background_aggregator = np.zeros(shape=image_shape, dtype=np.int8) self.background_model = np.zeros(shape=image_shape, dtype=np.float32) self.foreground_mask = np.zeros(shape=image_shape) self.rect_accum = [] self.rect_accum2 = np.array([], dtype=int)
[docs] def update_background_model(self, current_frame, holes_frame=np.zeros(shape=IMAGE_SHAPE, dtype=np.uint64)): """ Update depth background by running average :param current_frame: current frame whereby update bg model :return: background model :rtype: np.float32 """ self.background_model, self.background_holes = bg_models.compute_background_running_average( current_frame, self.background_model, BG_RUN_AVG_LRATE, holes_frame) return self.background_model
[docs] def extract_foreground_mask_from_run_avg(self, current_frame): """ Extract depth foreground mask from running average computed substracting current_frame from background model where the difference is above BG_MASK_THRESHOLD :param current_frame: current frame from which extract foreground :return: binary mask with 1 for foreground and 0 for background :rtype: np.int64 """ # if current frame has holes use modelbg pixels instead #current_frame_filtered = (np.where(current_frame == DEPTH_HOLE_VALUE, self.background_model, current_frame)).astype(np.float32) #diff = (current_frame_filtered - self.background_model) diff = (current_frame - self.background_model) self.foreground_mask = (np.where(np.abs(diff) >= BG_MASK_THRESHOLD, 1, 0)) * np.logical_not(self.current_frame_holes) # * np.logical_not(self.background_holes) return self.foreground_mask
[docs] def extract_proposal_bbox(self, method=ACCUMULATOR): """ Compute bounding boxes for connected components from foreground masks that remain constant for AGG_DEPTH_MAX_E frames. To keep track of the bounding boxes over time the function uses an aggregator depending on the method specified :param method: method used to keep track of the bounding boxes history. Methods available are: - *ACCUMULATOR*: to use an image accumulator for each pixel (fastest method). The bounding boxes are extracted from the pixels accumulated AGG_DEPTH_MAX_E times. - *RECT_MATCHING/RECT_MATCHING2*: to keep track of the number of times a particular bounding box occurs over time (slower method but more accurate). Two bounding boxes in different frames are considered the same if their placement and dimension remain within a tolerance threshold. :return: list of bounding boxes in the form of (x,y, width, height) where (x,y) is the top left corner :rtype: List :raise: *NotImplementedError*: if a method different from ACCUMULATOR or RECT_MATCHING or RECT_MATCHING2 is specified """ bbox_to_draw = [] if method == self.ACCUMULATOR: self.accumulator = update_depth_detection_aggregator(self.accumulator, self.foreground_mask) bbox = bg_models.get_bounding_boxes(np.uint8(self.accumulator)) bbox_to_draw = bbox elif method == self.RECT_MATCHING: # quicker than RECT_MATCHING2 but less accurate # doesn't consider the best match but any match # temp list of proposal results = [] # get current bbox bbox = bg_models.get_bounding_boxes(self.foreground_mask) # bool list for each bbox in rect_accumulator # if true => we had a match between current and accumulator bool_accum = [False]*len(self.rect_accum) bool_curr = [False]*len(bbox) if len(self.rect_accum) != 0: for i in range(len(self.rect_accum)): accum_entry = self.rect_accum[i] for j in range(len(bbox)): curr_entry = bbox[j] if rect_similarity(accum_entry[0], curr_entry): if accum_entry[1] < AGG_DEPTH_MAX_E: val = (curr_entry, accum_entry[1] + 1) else: val = (curr_entry, accum_entry[1]) results.append(val) bool_accum[i] = bool_curr[j] = True for i, rect_match in enumerate(bool_curr): if not rect_match: val = (bbox[i], 1) results.append(val) for i, rect_match in enumerate(bool_accum): if not rect_match: counter = self.rect_accum[i][1] if counter > 0: val = (self.rect_accum[i][0], self.rect_accum[i][1]-AGG_DEPTH_PENALTY) results.append(val) else: if len(bbox) is not 0: for rect in bbox: #c_x, c_y, area = get_center_area_from_rect(rect) #query = (c_x, c_y, area, 1) results.append((rect, 1)) self.rect_accum = results for box in self.rect_accum: if box[1] >= AGG_DEPTH_BBOX: bbox_to_draw.append(box[0]) elif method == self.RECT_MATCHING2: rect_current = bg_models.get_bounding_boxes2(self.foreground_mask) if self.rect_accum2.size != 0: # accumulator is not empty check with current rect # define an int array (of rect_current lenght) # where we will save the index of the best match # for each rect in rect_current with rect_accumulator # -1 means no match current_best_match_idx = [-1]*rect_current.shape[0] # define a bool array of rect_accum lenght # where we will save whatever the i-th element of the accumulator # had a match with rect_current accumulator_match = [False]*self.rect_accum2.shape[0] for i, r_curr in enumerate(rect_current): max_value = 0 max_idx = -1 #print "da cur. " for j, r_acc in enumerate(self.rect_accum2): # keep track of the best match for the current rect distance = similarity_measure_rect(r_curr, r_acc) #print distance, if distance > 0.5: #if rect_similarity(r_curr, r_acc): #print r_curr, r_acc # save in accumulator_match that we have a match accumulator_match[j] = True if max_value < distance: max_idx = j #max_value = distance current_best_match_idx[i] = max_idx # increment counter of the matched rect and add the new ones found in current for i, idx in enumerate(current_best_match_idx): if idx == -1: # this rect has no match in accumulator # add it to accumul # print "rect current: ", self.rect_accum2, rect_current[i] self.rect_accum2 = np.concatenate((self.rect_accum2, [rect_current[i]])) else: # we had a match in accumulator # increment counter if self.rect_accum2[idx][-1] < AGG_DEPTH_MAX_E: self.rect_accum2[idx][-1] += 1 # update coords with the new values self.rect_accum2[idx][0:4] = rect_current[i][0:4] # now update the accumulator: element_to_delete = [] for j, match in enumerate(accumulator_match): if not match: # get the counter for the rect without match counter = self.rect_accum2[j][-1] if counter > 1: # decrement counter self.rect_accum2[j][-1] -= 1 # self.rect_accum2[j][-1] - 1 else: element_to_delete.append(j) # delete rect that reached 0 counter self.rect_accum2 = np.delete(self.rect_accum2, element_to_delete, 0) else: # accumulator is empty, fill it with current rect #print "fill accum with curr", rect_current self.rect_accum2 = np.copy(rect_current) # np.append(self.rect_accum2, rect_current) #print self.rect_accum2.shape # extract bounding box for box in self.rect_accum2: #print box # if counter at leas THRESHOLD print it if box[-1] > AGG_DEPTH_BBOX: bbox_to_draw.append(box[0:4]) else: raise NotImplementedError("Not implemented") return bbox_to_draw
[docs]def update_depth_detection_aggregator(aggregator, foreground_current): """ Update aggregator with the provided foreground. Each pixel of the image has a value that keeps the number of times it has been seen as foreground. :param aggregator: an image of uint8 :param foreground_current: mask of the current foreground :return: updated accumulator """ not_in_current_foreground = np.int8(np.logical_not(foreground_current)) # increment aggregator result = aggregator + foreground_current - not_in_current_foreground * AGG_DEPTH_PENALTY # set aggregate bounds result = np.clip(result, 0, AGG_DEPTH_MAX_E) return result