blob: f98b2acda1f76de9ca04be89165be80892f815c2 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import ipaddress
import logging
import os
from multiprocessing.connection import Listener
import cv2
import numpy as np
## Set verbosity level
verbosity = logging.ERROR
# [debugging] Verbosity settings
level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
logging.basicConfig(format='VSI Server: [%(levelname)s]\t%(message)s', level = verbosity)
logging.info("Verbosity level is set to " + level[verbosity])
# Default Server configuration
default_address = ('127.0.0.1', 6000)
default_authkey = 'vsi_video'
# Supported file extensions
video_file_extensions = ('wmv', 'avi', 'mp4')
image_file_extensions = ('bmp', 'png', 'jpg')
video_fourcc = {'wmv' : 'WMV1', 'avi' : 'MJPG', 'mp4' : 'mp4v'}
# Mode Input/Output
MODE_IO_Msk = 1<<0
MODE_Input = 0<<0
MODE_Output = 1<<0
class VideoServer:
def __init__(self, address, authkey):
# Server commands
self.SET_FILENAME = 1
self.STREAM_CONFIGURE = 2
self.STREAM_ENABLE = 3
self.STREAM_DISABLE = 4
self.FRAME_READ = 5
self.FRAME_WRITE = 6
self.CLOSE_SERVER = 7
# Color space
self.GRAYSCALE8 = 1
self.RGB888 = 2
self.BGR565 = 3
self.YUV420 = 4
self.NV12 = 5
self.NV21 = 6
# Variables
self.listener = Listener(address, authkey=authkey.encode('utf-8'))
self.filename = ""
self.mode = None
self.active = False
self.video = True
self.stream = None
self.frame_ratio = 0
self.frame_drop = 0
self.frame_index = 0
self.eos = False
# Stream configuration
self.resolution = (None, None)
self.color_format = None
self.frame_rate = None
# Set filename
def _setFilename(self, base_dir, filename, mode):
filename_valid = False
if self.active:
return filename_valid
self.filename = ""
self.frame_index = 0
file_extension = str(filename).split('.')[-1].lower()
if file_extension in video_file_extensions:
self.video = True
else:
self.video = False
file_path = os.path.join(base_dir, filename)
logging.debug(f"File path: {file_path}")
if (mode & MODE_IO_Msk) == MODE_Input:
self.mode = MODE_Input
if os.path.isfile(file_path):
if file_extension in (video_file_extensions + image_file_extensions):
self.filename = file_path
filename_valid = True
else:
self.mode = MODE_Output
if file_extension in (video_file_extensions + image_file_extensions):
if os.path.isfile(file_path):
os.remove(file_path)
self.filename = file_path
filename_valid = True
return filename_valid
# Configure video stream
def _configureStream(self, frame_width, frame_height, color_format, frame_rate):
if (frame_width == 0 or frame_height == 0 or frame_rate == 0):
return False
self.resolution = (frame_width, frame_height)
self.color_format = color_format
self.frame_rate = frame_rate
return True
# Enable video stream
def _enableStream(self, mode):
if self.active:
return
self.eos = False
self.frame_ratio = 0
self.frame_drop = 0
if self.stream is not None:
self.stream.release()
self.stream = None
if self.filename == "":
self.video = True
if (mode & MODE_IO_Msk) == MODE_Input:
# Device mode: camera
self.mode = MODE_Input
else:
# Device mode: display
self.mode = MODE_Output
if self.video:
if self.mode == MODE_Input:
if self.filename == "":
self.stream = cv2.VideoCapture(0)
if not self.stream.isOpened():
logging.error("Failed to open Camera interface")
return
else:
self.stream = cv2.VideoCapture(self.filename)
self.stream.set(cv2.CAP_PROP_POS_FRAMES, self.frame_index)
video_fps = self.stream.get(cv2.CAP_PROP_FPS)
if video_fps > self.frame_rate:
self.frame_ratio = video_fps / self.frame_rate
logging.debug(f"Frame ratio: {self.frame_ratio}")
else:
if self.filename != "":
extension = str(self.filename).split('.')[-1].lower()
fourcc = cv2.VideoWriter_fourcc(*f'{video_fourcc[extension]}')
if os.path.isfile(self.filename) and (self.frame_index != 0):
tmp_filename = f'{self.filename.rstrip(f".{extension}")}_tmp.{extension}'
os.rename(self.filename, tmp_filename)
cap = cv2.VideoCapture(tmp_filename)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.resolution = (width, height)
self.frame_rate = cap.get(cv2.CAP_PROP_FPS)
self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
cap.release()
os.remove(tmp_filename)
break
self.stream.write(frame)
del frame
else:
self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution)
self.active = True
logging.info("Stream enabled")
# Disable Video Server
def _disableStream(self):
self.active = False
if self.stream is not None:
if self.mode == MODE_Input:
self.frame_index = self.stream.get(cv2.CAP_PROP_POS_FRAMES)
self.stream.release()
self.stream = None
logging.info("Stream disabled")
# Resize frame to requested resolution in pixels
def __resizeFrame(self, frame, resolution):
frame_h = frame.shape[0]
frame_w = frame.shape[1]
# Calculate requested aspect ratio (width/height):
crop_aspect_ratio = resolution[0] / resolution[1]
if crop_aspect_ratio != (frame_w / frame_h):
# Crop into image with resize aspect ratio
crop_w = int(frame_h * crop_aspect_ratio)
crop_h = int(frame_w / crop_aspect_ratio)
if crop_w > frame_w:
# Crop top and bottom part of the image
top = (frame_h - crop_h) // 2
bottom = top + crop_h
frame = frame[top : bottom, 0 : frame_w]
elif crop_h > frame_h:
# Crop left and right side of the image``
left = (frame_w - crop_w) // 2
right = left + crop_w
frame = frame[0 : frame_h, left : right]
else:
# Crop to the center of the image
left = (frame_w - crop_w) // 2
right = left + crop_w
top = (frame_h - crop_h) // 2
bottom = top + crop_h
frame = frame[top : bottom, left : right]
logging.debug(f"Frame cropped from ({frame_w}, {frame_h}) to ({frame.shape[1]}, {frame.shape[0]})")
logging.debug(f"Resize frame from ({frame.shape[1]}, {frame.shape[0]}) to ({resolution[0]}, {resolution[1]})")
try:
frame = cv2.resize(frame, resolution)
except Exception as e:
logging.error(f"Error in resizeFrame(): {e}")
return frame
# Change color space of a frame from BGR to selected profile
def __changeColorSpace(self, frame, color_space):
color_format = None
# Default OpenCV color profile: BGR
if self.mode == MODE_Input:
if color_space == self.GRAYSCALE8:
color_format = cv2.COLOR_BGR2GRAY
elif color_space == self.RGB888:
color_format = cv2.COLOR_BGR2RGB
elif color_space == self.BGR565:
color_format = cv2.COLOR_BGR2BGR565
elif color_space == self.YUV420:
color_format = cv2.COLOR_BGR2YUV_I420
elif color_space == self.NV12:
frame = self.__changeColorSpace(frame, self.YUV420)
color_format = cv2.COLOR_YUV2RGB_NV12
elif color_space == self.NV21:
frame = self.__changeColorSpace(frame, self.YUV420)
color_format = cv2.COLOR_YUV2RGB_NV21
else:
if color_space == self.GRAYSCALE8:
color_format = cv2.COLOR_GRAY2BGR
elif color_space == self.RGB888:
color_format = cv2.COLOR_RGB2BGR
elif color_space == self.BGR565:
color_format = cv2.COLOR_BGR5652BGR
elif color_space == self.YUV420:
color_format = cv2.COLOR_YUV2BGR_I420
elif color_space == self.NV12:
color_format = cv2.COLOR_YUV2BGR_I420
elif color_space == self.NV21:
color_format = cv2.COLOR_YUV2BGR_I420
if color_format != None:
logging.debug(f"Change color space to {color_format}")
try:
frame = cv2.cvtColor(frame, color_format)
except Exception as e:
logging.error(f"Error in changeColorSpace(): {e}")
return frame
# Read frame from source
def _readFrame(self):
frame = bytearray()
if not self.active:
return frame
if self.eos:
return frame
if self.video:
if self.frame_ratio > 1:
_, tmp_frame = self.stream.read()
self.frame_drop += (self.frame_ratio - 1)
if self.frame_drop > 1:
logging.debug(f"Frames to drop: {self.frame_drop}")
drop = int(self.frame_drop // 1)
for i in range(drop):
_, _ = self.stream.read()
logging.debug(f"Frames dropped: {drop}")
self.frame_drop -= drop
logging.debug(f"Frames left to drop: {self.frame_drop}")
else:
_, tmp_frame = self.stream.read()
if tmp_frame is None:
self.eos = True
logging.debug("End of stream.")
else:
tmp_frame = cv2.imread(self.filename)
self.eos = True
logging.debug("End of stream.")
if tmp_frame is not None:
tmp_frame = self.__resizeFrame(tmp_frame, self.resolution)
tmp_frame = self.__changeColorSpace(tmp_frame, self.color_format)
frame = bytearray(tmp_frame.tobytes())
return frame
# Write frame to destination
def _writeFrame(self, frame):
if not self.active:
return
try:
decoded_frame = np.frombuffer(frame, dtype=np.uint8)
decoded_frame = decoded_frame.reshape((self.resolution[0], self.resolution[1], 3))
bgr_frame = self.__changeColorSpace(decoded_frame, self.RGB888)
if self.filename == "":
cv2.imshow(self.filename, bgr_frame)
cv2.waitKey(10)
else:
if self.video:
self.stream.write(np.uint8(bgr_frame))
self.frame_index += 1
else:
cv2.imwrite(self.filename, bgr_frame)
except Exception:
pass
# Run Video Server
def run(self):
logging.info("Video server started")
try:
conn = self.listener.accept()
logging.info(f'Connection accepted {self.listener.address}')
except Exception:
logging.error("Connection not accepted")
return
while True:
try:
recv = conn.recv()
except EOFError:
return
cmd = recv[0] # Command
payload = recv[1:] # Payload
if cmd == self.SET_FILENAME:
logging.info("Set filename called")
filename_valid = self._setFilename(payload[0], payload[1], payload[2])
conn.send(filename_valid)
elif cmd == self.STREAM_CONFIGURE:
logging.info("Stream configure called")
configuration_valid = self._configureStream(payload[0], payload[1], payload[2], payload[3])
conn.send(configuration_valid)
elif cmd == self.STREAM_ENABLE:
logging.info("Enable stream called")
self._enableStream(payload[0])
conn.send(self.active)
elif cmd == self.STREAM_DISABLE:
logging.info("Disable stream called")
self._disableStream()
conn.send(self.active)
elif cmd == self.FRAME_READ:
logging.info("Read frame called")
frame = self._readFrame()
conn.send_bytes(frame)
conn.send(self.eos)
elif cmd == self.FRAME_WRITE:
logging.info("Write frame called")
frame = conn.recv_bytes()
self._writeFrame(frame)
elif cmd == self.CLOSE_SERVER:
logging.info("Close server connection")
self.stop()
# Stop Video Server
def stop(self):
self._disableStream()
if (self.mode == MODE_Output) and (self.filename == ""):
try:
cv2.destroyAllWindows()
except Exception:
pass
self.listener.close()
logging.info("Video server stopped")
# Validate IP address
def ip(ip):
try:
_ = ipaddress.ip_address(ip)
return ip
except:
raise argparse.ArgumentTypeError(f"Invalid IP address: {ip}!")
def parse_arguments():
formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=41)
parser = argparse.ArgumentParser(formatter_class=formatter, description="VSI Video Server")
parser_optional = parser.add_argument_group("optional")
parser_optional.add_argument("--ip", dest="ip", metavar="<IP>",
help=f"Server IP address (default: {default_address[0]})",
type=ip, default=default_address[0])
parser_optional.add_argument("--port", dest="port", metavar="<TCP Port>",
help=f"TCP port (default: {default_address[1]})",
type=int, default=default_address[1])
parser_optional.add_argument("--authkey", dest="authkey", metavar="<Auth Key>",
help=f"Authorization key (default: {default_authkey})",
type=str, default=default_authkey)
return parser.parse_args()
if __name__ == '__main__':
args = parse_arguments()
Server = VideoServer((args.ip, args.port), args.authkey)
try:
Server.run()
except KeyboardInterrupt:
Server.stop()