流量统计 图片 Optasia: A Relational Platform for Efficient Large-Scale Video Analytics
Yao Lu, Aakanksha Chowdhery, Srikanth Kandula
Microsoft
ACM SoCC 2016. PDF

The publicly avaliable version of Optasia is now coming via Azure Data Lake.
A tutorial can be found here.


Camera deployments are ubiquitous; however exiting methods to analyze video
feeds from many cameras do not scale and are error-prone. For example, below lists
lists code examples to extract SIFT keypoints from many video frames.

Spark script to extract SIFT keypoints
  1. import logging
  2. import io
  3. import sys
  4. import os
  5.  
  6. import cv2
  7. import numpy as np
  8. def extract_sift_features:
  9.  
  10.     def extract_sift_features_nested(imgfile_imgbytes):
  11.         try:
  12.             imgfilename, imgbytes = imgfile_imgbytes
  13.             nparr = np.fromstring(buffer(imgbytes), np.uint8)
  14.             img = cv2.imdecode(nparr, 0)
  15.             extractor = cv2.SIFT()
  16.             kp, descriptors = extractor.detectAndCompute(img, None)
  17.             return [(imgfilename, descriptors)]
  18.         except Exception, e:
  19.             logging.exception(e)
  20.             return []
  21.  
  22.     return extract_opencv_features_nested
  23.  
  24. if __name__ == "__main__":
  25.     sc = SparkContext(appName="sift_extractor")
  26.     sqlContext = SQLContext(sc)
  27.  
  28.     try:
  29.         image_seqfile_path = sys.argv[1]
  30.         feature_parquet_path = sys.argv[2]
  31.         partitions = int(sys.argv[3])
  32.     except:
  33.         print("Usage: spark-submit sift_extraction.py "
  34.         <image_input_path> <feature_output_path> <partitions>")
  35.  
  36.     images = sc.sequenceFile(image_seqfile_path, minSplits=partitions)
  37.  
  38.     features = images.flatMap(extract_sift_features)
  39.     features = features.filter(lambda x: x[1] != None)
  40.     features = features.map(lambda x: (Row(fileName=x[0], features=x[1].tolist())))
  41.     featuresSchema = sqlContext.createDataFrame(features)
  42.     featuresSchema.registerTempTable("images")
  43.     featuresSchema.write.parquet(feature_parquet_path)

To run this script, one must upload the images to Hadoop compatible storage
(Swift), specify degree of parallism (e.g., 100), and run:
       spark-submit --executor-memory 8g sift.py swift://spark.swift1/images.hseq ...
       swift://spark.swift1/images.parquet 100
The execution optimality cannot be garanteed.


We present a system, Optasia, that is friendly to end-users while efficiently executes
the scripe on the cluster. End-users do not need to worry about degree of parallism,
task scheduling, etc. The analystic units are highly modulized, decoupling the roles
of algorithm and application engineers.

Optasia script to extract SIFT keypoints
  1. USING Optasia;
  2.  
  3. images =
  4.     EXTRACT id : int,
  5.             frame : string
  6.     FROM SPARSE STREAMSET @"images/"
  7.     USING ImageExtractor();
  8. feat =
  9.     PROCESS images
  10.     USING SIFTProcessor()
  11.     PRODUCE id,
  12.             feature;
  13.  
  14. OUTPUT feat
  15. TO @"features.txt"
  16. USING DefaultTextOutputter();

A powerful query optimization technique is applied to generate optimal execution
plans so that resource useage is greatly reduced. More details described in the paper.



Click here for example dataflow and queries used in the paper.
Click here for example SCOPE wrappers in C#.