blob: 88f44fb859b1d17afe9a578b946f4fcdc8eb19d0 [file] [log] [blame]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import atexit
import logging
import subprocess
from multiprocessing.connection import Client, Connection
from os import path, getcwd
from os import name as os_name
class VideoClient:
def __init__(self):
# Server commands
self.SET_FILENAME = 1
self.STREAM_CONFIGURE = 2
self.STREAM_ENABLE = 3
self.STREAM_DISABLE = 4
self.FRAME_READ = 5
self.FRAME_WRITE = 6
self.CLOSE_SERVER = 7
# Color space
self.GRAYSCALE8 = 1
self.RGB888 = 2
self.BGR565 = 3
self.YUV420 = 4
self.NV12 = 5
self.NV21 = 6
# Variables
self.conn = None
def connectToServer(self, address, authkey):
for _ in range(50):
try:
self.conn = Client(address, authkey=authkey.encode('utf-8'))
if isinstance(self.conn, Connection):
break
else:
self.conn = None
except Exception:
self.conn = None
time.sleep(0.01)
def setFilename(self, filename, mode):
self.conn.send([self.SET_FILENAME, getcwd(), filename, mode])
filename_valid = self.conn.recv()
return filename_valid
def configureStream(self, frame_width, frame_height, color_format, frame_rate):
self.conn.send([self.STREAM_CONFIGURE, frame_width, frame_height, color_format, frame_rate])
configuration_valid = self.conn.recv()
return configuration_valid
def enableStream(self, mode):
self.conn.send([self.STREAM_ENABLE, mode])
stream_active = self.conn.recv()
return stream_active
def disableStream(self):
self.conn.send([self.STREAM_DISABLE])
stream_active = self.conn.recv()
return stream_active
def readFrame(self):
self.conn.send([self.FRAME_READ])
data = self.conn.recv_bytes()
eos = self.conn.recv()
return data, eos
def writeFrame(self, data):
self.conn.send([self.FRAME_WRITE])
self.conn.send_bytes(data)
def closeServer(self):
try:
if isinstance(self.conn, Connection):
self.conn.send([self.CLOSE_SERVER])
self.conn.close()
except Exception as e:
logging.error(f'Exception occurred on cleanup: {e}')
# User registers
REG_IDX_MAX = 12 # Maximum user register index used in VSI
MODE = 0 # Regs[0] // Mode: 0=Input, 1=Output
CONTROL = 0 # Regs[1] // Control: enable, flush
STATUS = 0 # Regs[2] // Status: active, buf_empty, buf_full, overflow, underflow, eos
FILENAME_LEN = 0 # Regs[3] // Filename length
FILENAME_CHAR = 0 # Regs[4] // Filename character
FILENAME_VALID = 0 # Regs[5] // Filename valid flag
FRAME_WIDTH = 300 # Regs[6] // Requested frame width
FRAME_HEIGHT = 300 # Regs[7] // Requested frame height
COLOR_FORMAT = 0 # Regs[8] // Color format
FRAME_RATE = 0 # Regs[9] // Frame rate
FRAME_INDEX = 0 # Regs[10] // Frame index
FRAME_COUNT = 0 # Regs[11] // Frame count
FRAME_COUNT_MAX = 0 # Regs[12] // Frame count maximum
# MODE register definitions
MODE_IO_Msk = 1<<0
MODE_Input = 0<<0
MODE_Output = 1<<0
# CONTROL register definitions
CONTROL_ENABLE_Msk = 1<<0
CONTROL_CONTINUOS_Msk = 1<<1
CONTROL_BUF_FLUSH_Msk = 1<<2
# STATUS register definitions
STATUS_ACTIVE_Msk = 1<<0
STATUS_BUF_EMPTY_Msk = 1<<1
STATUS_BUF_FULL_Msk = 1<<2
STATUS_OVERFLOW_Msk = 1<<3
STATUS_UNDERFLOW_Msk = 1<<4
STATUS_EOS_Msk = 1<<5
# IRQ Status register definitions
IRQ_Status_FRAME_Msk = 1<<0
IRQ_Status_OVERFLOW_Msk = 1<<1
IRQ_Status_UNDERFLOW_Msk = 1<<2
IRQ_Status_EOS_Msk = 1<<3
# Variables
Video = VideoClient()
Filename = ""
FilenameIdx = 0
# Close VSI Video Server on exit
def cleanup():
Video.closeServer()
# Client connection to VSI Video Server
def init(address, authkey):
global FILENAME_VALID
base_dir = path.dirname(__file__)
server_path = path.join(base_dir, 'vsi_video_server.py')
logging.info("Start video server")
if path.isfile(server_path):
# Start Video Server
if os_name == 'nt':
py_cmd = 'python'
else:
py_cmd = 'python3'
cmd = f"{py_cmd} {server_path} " \
f"--ip {address[0]} " \
f"--port {address[1]} " \
f"--authkey {authkey}"
subprocess.Popen(cmd, shell=True)
# Connect to Video Server
Video.connectToServer(address, authkey)
if Video.conn == None:
logging.error("Server not connected")
else:
logging.error(f"Server script not found: {server_path}")
# Register clean-up function
atexit.register(cleanup)
## Flush Stream buffer
def flushBuffer():
global STATUS, FRAME_INDEX, FRAME_COUNT
STATUS |= STATUS_BUF_EMPTY_Msk
STATUS &= ~STATUS_BUF_FULL_Msk
FRAME_INDEX = 0
FRAME_COUNT = 0
## VSI IRQ Status register
# @param IRQ_Status IRQ status register to update
# @param value status bits to clear
# @return IRQ_Status return updated register
def wrIRQ(IRQ_Status, value):
IRQ_Status_Clear = IRQ_Status & ~value
IRQ_Status &= ~IRQ_Status_Clear
return IRQ_Status
## Timer Event
# @param IRQ_Status IRQ status register to update
# @return IRQ_Status return updated register
def timerEvent(IRQ_Status):
IRQ_Status |= IRQ_Status_FRAME_Msk
if (STATUS & STATUS_OVERFLOW_Msk) != 0:
IRQ_Status |= IRQ_Status_OVERFLOW_Msk
if (STATUS & STATUS_UNDERFLOW_Msk) != 0:
IRQ_Status |= IRQ_Status_UNDERFLOW_Msk
if (STATUS & STATUS_EOS_Msk) != 0:
IRQ_Status |= IRQ_Status_EOS_Msk
if (CONTROL & CONTROL_CONTINUOS_Msk) == 0:
wrCONTROL(CONTROL & ~(CONTROL_ENABLE_Msk | CONTROL_CONTINUOS_Msk))
return IRQ_Status
## Read data from peripheral for DMA P2M transfer (VSI DMA)
# @param size size of data to read (in bytes, multiple of 4)
# @return data data read (bytearray)
def rdDataDMA(size):
global STATUS, FRAME_COUNT
if (STATUS & STATUS_ACTIVE_Msk) != 0:
if Video.conn != None:
data, eos = Video.readFrame()
if eos:
STATUS |= STATUS_EOS_Msk
if FRAME_COUNT < FRAME_COUNT_MAX:
FRAME_COUNT += 1
else:
STATUS |= STATUS_OVERFLOW_Msk
if FRAME_COUNT == FRAME_COUNT_MAX:
STATUS |= STATUS_BUF_FULL_Msk
STATUS &= ~STATUS_BUF_EMPTY_Msk
else:
data = bytearray()
else:
data = bytearray()
return data
## Write data to peripheral for DMA M2P transfer (VSI DMA)
# @param data data to write (bytearray)
# @param size size of data to write (in bytes, multiple of 4)
def wrDataDMA(data, size):
global STATUS, FRAME_COUNT
if (STATUS & STATUS_ACTIVE_Msk) != 0:
if Video.conn != None:
Video.writeFrame(data)
if FRAME_COUNT > 0:
FRAME_COUNT -= 1
else:
STATUS |= STATUS_UNDERFLOW_Msk
if FRAME_COUNT == 0:
STATUS |= STATUS_BUF_EMPTY_Msk
STATUS &= ~STATUS_BUF_FULL_Msk
## Write CONTROL register (user register)
# @param value value to write (32-bit)
def wrCONTROL(value):
global CONTROL, STATUS
if ((value ^ CONTROL) & CONTROL_ENABLE_Msk) != 0:
STATUS &= ~STATUS_ACTIVE_Msk
if (value & CONTROL_ENABLE_Msk) != 0:
logging.info("Start video stream")
if Video.conn != None:
logging.info("Configure video stream")
configuration_valid = Video.configureStream(FRAME_WIDTH, FRAME_HEIGHT, COLOR_FORMAT, FRAME_RATE)
if configuration_valid:
logging.info("Enable video stream")
server_active = Video.enableStream(MODE)
if server_active:
STATUS |= STATUS_ACTIVE_Msk
STATUS &= ~(STATUS_OVERFLOW_Msk | STATUS_UNDERFLOW_Msk | STATUS_EOS_Msk)
else:
logging.error("Enable video stream failed")
else:
logging.error("Configure video stream failed")
else:
logging.error("Server not connected")
else:
logging.info("Stop video stream")
if Video.conn != None:
logging.info("Disable video stream")
Video.disableStream()
else:
logging.error("Server not connected")
if (value & CONTROL_BUF_FLUSH_Msk) != 0:
value &= ~CONTROL_BUF_FLUSH_Msk
flushBuffer()
CONTROL = value
## Read STATUS register (user register)
# @return status current STATUS User register (32-bit)
def rdSTATUS():
global STATUS
status = STATUS
STATUS &= ~(STATUS_OVERFLOW_Msk | STATUS_UNDERFLOW_Msk | STATUS_EOS_Msk)
return status
## Write FILENAME_LEN register (user register)
# @param value value to write (32-bit)
def wrFILENAME_LEN(value):
global STATUS, FILENAME_LEN, FILENAME_VALID, Filename, FilenameIdx
logging.info("Set new source name length and reset filename and valid flag")
FilenameIdx = 0
Filename = ""
FILENAME_VALID = 0
FILENAME_LEN = value
## Write FILENAME_CHAR register (user register)
# @param value value to write (32-bit)
def wrFILENAME_CHAR(value):
global FILENAME_VALID, Filename, FilenameIdx
if FilenameIdx < FILENAME_LEN:
logging.info(f"Append {value} to filename")
Filename += f"{value}"
FilenameIdx += 1
logging.debug(f"Received {FilenameIdx} of {FILENAME_LEN} characters")
if FilenameIdx == FILENAME_LEN:
logging.info("Check if file exists on Server side and set VALID flag")
logging.debug(f"Filename: {Filename}")
if Video.conn != None:
FILENAME_VALID = Video.setFilename(Filename, MODE)
else:
logging.error("Server not connected")
logging.debug(f"Filename VALID: {FILENAME_VALID}")
## Write FRAME_INDEX register (user register)
# @param value value to write (32-bit)
# @return value value written (32-bit)
def wrFRAME_INDEX(value):
global STATUS, FRAME_INDEX, FRAME_COUNT
FRAME_INDEX += 1
if FRAME_INDEX == FRAME_COUNT_MAX:
FRAME_INDEX = 0
if (MODE & MODE_IO_Msk) == MODE_Input:
# Input
if FRAME_COUNT > 0:
FRAME_COUNT -= 1
if FRAME_COUNT == 0:
STATUS |= STATUS_BUF_EMPTY_Msk
STATUS &= ~STATUS_BUF_FULL_Msk
else:
# Output
if FRAME_COUNT < FRAME_COUNT_MAX:
FRAME_COUNT += 1
if FRAME_COUNT == FRAME_COUNT_MAX:
STATUS |= STATUS_BUF_FULL_Msk
STATUS &= ~STATUS_BUF_EMPTY_Msk
return FRAME_INDEX
## Read user registers (the VSI User Registers)
# @param index user register index (zero based)
# @return value value read (32-bit)
def rdRegs(index):
value = 0
if index == 0:
value = MODE
elif index == 1:
value = CONTROL
elif index == 2:
value = rdSTATUS()
elif index == 3:
value = FILENAME_LEN
elif index == 4:
value = FILENAME_CHAR
elif index == 5:
value = FILENAME_VALID
elif index == 6:
value = FRAME_WIDTH
elif index == 7:
value = FRAME_HEIGHT
elif index == 8:
value = COLOR_FORMAT
elif index == 9:
value = FRAME_RATE
elif index == 10:
value = FRAME_INDEX
elif index == 11:
value = FRAME_COUNT
elif index == 12:
value = FRAME_COUNT_MAX
return value
## Write user registers (the VSI User Registers)
# @param index user register index (zero based)
# @param value value to write (32-bit)
# @return value value written (32-bit)
def wrRegs(index, value):
global MODE, FRAME_WIDTH, FRAME_HEIGHT, COLOR_FORMAT, FRAME_RATE, FRAME_COUNT_MAX
if index == 0:
MODE = value
elif index == 1:
wrCONTROL(value)
elif index == 2:
value = STATUS
elif index == 3:
wrFILENAME_LEN(value)
elif index == 4:
wrFILENAME_CHAR(chr(value))
elif index == 5:
value = FILENAME_VALID
elif index == 6:
if value != 0:
FRAME_WIDTH = value
elif index == 7:
if value != 0:
FRAME_HEIGHT = value
elif index == 8:
COLOR_FORMAT = value
elif index == 9:
FRAME_RATE = value
elif index == 10:
value = wrFRAME_INDEX(value)
elif index == 11:
value = FRAME_COUNT
elif index == 12:
FRAME_COUNT_MAX = value
flushBuffer()
return value