Initial commit

This commit is contained in:
2025-02-12 07:16:30 +00:00
commit 3542cdf84d
9 changed files with 522 additions and 0 deletions
+4
View File
@@ -0,0 +1,4 @@
venv/
test/
__pycache__/
*.pyc
+37
View File
@@ -0,0 +1,37 @@
# SUV-Helper
## What is it
This is a project to get security camera stream and apply smart assistant to it.
## Design
Producer (multiple) -> Server -> Processor (multiple)
## Current status
Scuffolded.
Tested on 145 devices.
## How to play it
```
$ apt install ffmpeg
# Create venv
$ virtualenv venv
$ source venv/bin/activate
# Install requirements
$ pip install -r requirements.txt
# Start server
$ python src/server/server.py
# Start processor
$ python src/processor/processor.py
# Start producer
RTSP_URL="rtsp://user:password@ip_address:554/"
$ python src/producer/producer.py --rtsp_url "${RTSP_URL}" --output_dir test --capture_interval 0.1
```
+4
View File
@@ -0,0 +1,4 @@
numpy==2.2.1
opencv-python==4.10.0.84
pillow==11.1.0
rtsp==1.1.12
View File
+140
View File
@@ -0,0 +1,140 @@
#!/usr/bin/env python3
"""
processor.py
A processor client that connects to the central server, receives image processing tasks,
processes the images using OpenCV, and notifies the server upon completion.
"""
import socket
import time
import cv2
import logging
import os
# Configure logging format and level
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
class ProcessorClient:
"""
ProcessorClient connects to the central server as a processor.
It listens for tasks (i.e. image paths), processes images using OpenCV,
and sends a "DONE" message to the server once processing is complete.
"""
def __init__(self, server_ip: str, server_port: int, buffer_size: int = 1024):
"""
Initialize the ProcessorClient with server connection details.
"""
self.server_ip = server_ip
self.server_port = server_port
self.buffer_size = buffer_size
self.socket = None
def connect(self) -> None:
"""
Establish a connection with the server and register as a processor.
"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.socket.connect((self.server_ip, self.server_port))
logging.info(f"Connected to server at {self.server_ip}:{self.server_port}")
# Send registration message
self.socket.sendall("ROLE:processor".encode())
except Exception as e:
logging.error(f"Failed to connect or register with the server: {e}")
raise
def process_image(self, image_path: str) -> None:
"""
Process the image using OpenCV.
For demonstration, this method converts the image to grayscale and saves it.
"""
logging.info(f"Processing image: {image_path}")
if not os.path.exists(image_path):
logging.error(f"Image file not found: {image_path}")
return
# Load the image using OpenCV
image = cv2.imread(image_path)
if image is None:
logging.error(f"Failed to load image: {image_path}")
return
# Convert to grayscale as an example processing step
processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Generate the processed image path and save the processed image
processed_path = self._get_processed_path(image_path)
cv2.imwrite(processed_path, processed_image)
logging.info(f"Processed image saved to: {processed_path}")
# (Optional) Simulate additional processing delay
time.sleep(1)
def _get_processed_path(self, image_path: str) -> str:
"""
Generate a processed image path based on the original image path.
For example, '/path/to/image.jpg' becomes '/path/to/processed_image.jpg'.
"""
directory, filename = os.path.split(image_path)
processed_filename = f"processed_{filename}"
return os.path.join(directory, processed_filename)
def listen_for_tasks(self) -> None:
"""
Listen for image processing tasks from the server.
Each received message is assumed to be an image file path.
"""
try:
while True:
data = self.socket.recv(self.buffer_size).decode().strip()
if not data:
logging.info("No data received; the server may have closed the connection.")
break
# For this design, we assume 'data' is the image path.
image_path = data
logging.info(f"Received task: {image_path}")
# Process the image
self.process_image(image_path)
# Notify the server that processing is complete
self.socket.sendall("DONE".encode())
logging.info("Notified server: DONE")
except Exception as e:
logging.error(f"Error during task processing: {e}")
finally:
self.disconnect()
def disconnect(self) -> None:
"""
Close the connection with the server.
"""
if self.socket:
self.socket.close()
logging.info("Disconnected from server.")
def run(self) -> None:
"""
Connect to the server and start listening for tasks.
"""
try:
self.connect()
self.listen_for_tasks()
except Exception as e:
logging.error(f"Processor client encountered an error: {e}")
self.disconnect()
def main():
SERVER_IP = "127.0.0.1"
SERVER_PORT = 5000
client = ProcessorClient(SERVER_IP, SERVER_PORT)
client.run()
if __name__ == "__main__":
main()
View File
+193
View File
@@ -0,0 +1,193 @@
#!/usr/bin/env python3
"""
producer.py
A ProducerClient that connects to a central server, reads frames from an RTSP stream using OpenCV,
saves them to disk, and notifies the server when a new image is produced.
Sensitive data (e.g. the RTSP URL) is passed via command-line arguments.
"""
import cv2
import socket
import time
import logging
import os
import argparse
# Configure logging for the module.
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
class ProducerClient:
"""
ProducerClient connects to the central server as a producer.
It reads frames from an RTSP stream, saves them as images, and notifies the server
of newly produced images.
"""
def __init__(
self,
rtsp_url: str,
server_ip: str,
server_port: int,
num_frames: int = 10,
output_dir: str = "./",
capture_interval: float = 2.0,
):
"""
Initialize the ProducerClient.
Args:
rtsp_url (str): The RTSP stream URL (sensitive data).
server_ip (str): The IP address of the central server.
server_port (int): The port number of the central server.
num_frames (int): The number of frames to capture.
output_dir (str): Directory to save the captured images.
capture_interval (float): Time (in seconds) to wait between captures.
"""
self.rtsp_url = rtsp_url
self.server_ip = server_ip
self.server_port = server_port
self.num_frames = num_frames
self.output_dir = output_dir
self.capture_interval = capture_interval
self.socket = None
def connect(self) -> None:
"""
Establish a connection to the central server and register as a producer.
"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.server_ip, self.server_port))
logging.info(f"Connected to server at {self.server_ip}:{self.server_port}")
# Register with the server as a producer.
self.socket.sendall("ROLE:producer".encode())
except Exception as e:
logging.error(f"Failed to connect or register with the server: {e}")
raise
def send_message(self, message: str) -> None:
"""
Send a message to the server over the established socket connection.
Args:
message (str): The message to send.
"""
try:
self.socket.sendall(message.encode())
logging.info(f"Sent message to server: {message}")
except Exception as e:
logging.error(f"Error sending message to server: {e}")
def produce_frames(self) -> None:
"""
Capture frames from the RTSP stream, save them as images, and notify the server.
"""
cap = cv2.VideoCapture(self.rtsp_url)
if not cap.isOpened():
logging.error(f"Error: Could not open RTSP stream: {self.rtsp_url}")
return
os.makedirs(self.output_dir, exist_ok=True)
frame_count = 0
try:
while frame_count < self.num_frames:
ret, frame = cap.read()
if not ret:
logging.error("Error: Could not read frame from stream.")
break
filename = f"frame_{frame_count:03d}.jpg"
filepath = os.path.join(self.output_dir, filename)
if cv2.imwrite(filepath, frame):
logging.info(f"Saved frame {frame_count} to {filepath}")
else:
logging.error(f"Failed to save frame {frame_count} to {filepath}")
continue # Optionally, decide if you want to break or try again
# Send a message to the server notifying a new image is produced.
message = f"{filepath}"
self.send_message(message)
frame_count += 1
time.sleep(self.capture_interval)
logging.info(f"Finished capturing. Total frames saved: {frame_count}")
except Exception as e:
logging.error(f"An error occurred during frame production: {e}")
finally:
cap.release()
def disconnect(self) -> None:
"""
Disconnect from the central server.
"""
if self.socket:
self.socket.close()
logging.info("Disconnected from server.")
def run(self) -> None:
"""
Connect to the server, capture frames, and then disconnect.
"""
try:
self.connect()
self.produce_frames()
except Exception as e:
logging.error(f"Error in ProducerClient: {e}")
finally:
self.disconnect()
def main() -> None:
parser = argparse.ArgumentParser(
description="Producer Client for capturing RTSP stream frames and notifying a central server."
)
parser.add_argument(
"--rtsp_url",
type=str,
required=True,
help="RTSP stream URL (sensitive data; do not hard code)",
)
parser.add_argument(
"--server_ip", type=str, default="127.0.0.1", help="Central server IP address"
)
parser.add_argument(
"--server_port", type=int, default=5000, help="Central server port number"
)
parser.add_argument(
"--num_frames", type=int, default=10, help="Number of frames to capture"
)
parser.add_argument(
"--output_dir",
type=str,
default="./",
help="Directory to save captured frames",
)
parser.add_argument(
"--capture_interval",
type=float,
default=2.0,
help="Interval (in seconds) between frame captures",
)
args = parser.parse_args()
producer = ProducerClient(
rtsp_url=args.rtsp_url,
server_ip=args.server_ip,
server_port=args.server_port,
num_frames=args.num_frames,
output_dir=args.output_dir,
capture_interval=args.capture_interval,
)
producer.run()
if __name__ == "__main__":
main()
View File
+144
View File
@@ -0,0 +1,144 @@
# server.py
import socket
import threading
import queue
import time
class ImageServer:
def __init__(self, host='127.0.0.1', port=5000):
self.server_ip = host
self.server_port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.server_ip, self.server_port))
self.server_socket.listen()
print(f"Server listening on {self.server_ip}:{self.server_port}")
# Queue for new image tasks (the image paths)
self.image_queue = queue.Queue()
# Registry for connected clients.
# Processors are stored as: {client_socket: {'address': addr, 'status': 'idle'/'busy'}}
self.processors = {}
# (Optional) keep track of producers if needed.
self.producers = []
# A lock to protect shared data structures
self.lock = threading.Lock()
def start(self):
# Start a thread that continuously dispatches image tasks to idle processors.
dispatcher_thread = threading.Thread(target=self.dispatcher, daemon=True)
dispatcher_thread.start()
while True:
client_socket, addr = self.server_socket.accept()
print(f"[Server] New connection from {addr}")
threading.Thread(target=self.handle_client, args=(client_socket, addr), daemon=True).start()
def handle_client(self, client_socket, addr):
"""
First message from the client must be its registration,
for example: "ROLE:producer" or "ROLE:processor"
"""
try:
reg_data = client_socket.recv(1024).decode().strip()
if reg_data.startswith("ROLE:"):
role = reg_data.split(":", 1)[1].strip()
if role == "producer":
with self.lock:
self.producers.append(client_socket)
print(f"[Server] Registered producer from {addr}")
self.handle_producer(client_socket, addr)
elif role == "processor":
with self.lock:
self.processors[client_socket] = {'address': addr, 'status': 'idle'}
print(f"[Server] Registered processor from {addr}")
self.handle_processor(client_socket, addr)
else:
print(f"[Server] Unknown role '{role}' from {addr}. Disconnecting.")
client_socket.close()
else:
print(f"[Server] No valid registration from {addr}. Disconnecting.")
client_socket.close()
except Exception as e:
print(f"[Server] Error during registration from {addr}: {e}")
client_socket.close()
def handle_producer(self, client_socket, addr):
"""
Receives image paths from the producer and enqueues them.
"""
try:
while True:
data = client_socket.recv(1024).decode().strip()
if not data:
break # connection closed
# Expect data to be an image path (or any image-notification)
print(f"[Server] Received image path from producer {addr}: {data}")
self.image_queue.put(data)
except Exception as e:
print(f"[Server] Producer {addr} error: {e}")
finally:
print(f"[Server] Producer {addr} disconnected.")
with self.lock:
if client_socket in self.producers:
self.producers.remove(client_socket)
client_socket.close()
def handle_processor(self, client_socket, addr):
"""
Listens for processor messages (e.g. a "DONE" notification after processing).
"""
try:
while True:
data = client_socket.recv(1024).decode().strip()
if not data:
break
print(f"[Server] Received from processor {addr}: {data}")
if data.upper() == "DONE":
with self.lock:
if client_socket in self.processors:
self.processors[client_socket]['status'] = 'idle'
print(f"[Server] Processor {addr} set to idle.")
except Exception as e:
print(f"[Server] Processor {addr} error: {e}")
finally:
print(f"[Server] Processor {addr} disconnected.")
with self.lock:
if client_socket in self.processors:
del self.processors[client_socket]
client_socket.close()
def dispatcher(self):
"""
Waits for new image tasks and assigns them to an idle processor.
If no idle processor is available, waits (polling every 0.5 sec).
"""
while True:
image_path = self.image_queue.get() # Wait for a new image task
assigned = False
while not assigned:
with self.lock:
# Find any idle processor
idle_processors = [
sock for sock, info in self.processors.items()
if info['status'] == 'idle'
]
if idle_processors:
processor_socket = idle_processors[0]
try:
processor_socket.sendall(image_path.encode())
self.processors[processor_socket]['status'] = 'busy'
print(f"[Server] Dispatched image '{image_path}' to processor {self.processors[processor_socket]['address']}")
assigned = True
except Exception as e:
print(f"[Server] Error sending image to processor {self.processors[processor_socket]['address']}: {e}")
# Remove this processor if there's an error
del self.processors[processor_socket]
# If no processor is idle, we simply wait a little before trying again.
if not assigned:
time.sleep(0.5)
if __name__ == "__main__":
server = ImageServer(host="127.0.0.1", port=5000)
server.start()