commit 3542cdf84dcd85fa54d9ae355c636d96fa135a54 Author: haopengzhan Date: Wed Feb 12 07:16:30 2025 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..55c2381 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +venv/ +test/ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..88128b1 --- /dev/null +++ b/Readme.md @@ -0,0 +1,37 @@ +# SUV-Helper + +## What is it + +This is a project to get security camera stream and apply smart assistant to it. + +## Design + +Producer (multiple) -> Server -> Processor (multiple) + +## Current status + +Scuffolded. + +Tested on 145 devices. + +## How to play it +``` +$ apt install ffmpeg + +# Create venv +$ virtualenv venv +$ source venv/bin/activate + +# Install requirements +$ pip install -r requirements.txt + +# Start server +$ python src/server/server.py + +# Start processor +$ python src/processor/processor.py + +# Start producer +RTSP_URL="rtsp://user:password@ip_address:554/" +$ python src/producer/producer.py --rtsp_url "${RTSP_URL}" --output_dir test --capture_interval 0.1 +``` \ No newline at end of file diff --git a/requirement.txt b/requirement.txt new file mode 100644 index 0000000..a8e9d5a --- /dev/null +++ b/requirement.txt @@ -0,0 +1,4 @@ +numpy==2.2.1 +opencv-python==4.10.0.84 +pillow==11.1.0 +rtsp==1.1.12 diff --git a/src/processor/__init__.py b/src/processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/processor/processor.py b/src/processor/processor.py new file mode 100644 index 0000000..8e92862 --- /dev/null +++ b/src/processor/processor.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +processor.py + +A processor client that connects to the central server, receives image processing tasks, +processes the images using OpenCV, and notifies the server upon completion. +""" + +import socket +import time +import cv2 +import logging +import os + +# Configure logging format and level +logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") + + +class ProcessorClient: + """ + ProcessorClient connects to the central server as a processor. + It listens for tasks (i.e. image paths), processes images using OpenCV, + and sends a "DONE" message to the server once processing is complete. + """ + + def __init__(self, server_ip: str, server_port: int, buffer_size: int = 1024): + """ + Initialize the ProcessorClient with server connection details. + """ + self.server_ip = server_ip + self.server_port = server_port + self.buffer_size = buffer_size + self.socket = None + + def connect(self) -> None: + """ + Establish a connection with the server and register as a processor. + """ + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + self.socket.connect((self.server_ip, self.server_port)) + logging.info(f"Connected to server at {self.server_ip}:{self.server_port}") + # Send registration message + self.socket.sendall("ROLE:processor".encode()) + except Exception as e: + logging.error(f"Failed to connect or register with the server: {e}") + raise + + def process_image(self, image_path: str) -> None: + """ + Process the image using OpenCV. + For demonstration, this method converts the image to grayscale and saves it. + """ + logging.info(f"Processing image: {image_path}") + if not os.path.exists(image_path): + logging.error(f"Image file not found: {image_path}") + return + + # Load the image using OpenCV + image = cv2.imread(image_path) + if image is None: + logging.error(f"Failed to load image: {image_path}") + return + + # Convert to grayscale as an example processing step + processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + + # Generate the processed image path and save the processed image + processed_path = self._get_processed_path(image_path) + cv2.imwrite(processed_path, processed_image) + logging.info(f"Processed image saved to: {processed_path}") + + # (Optional) Simulate additional processing delay + time.sleep(1) + + def _get_processed_path(self, image_path: str) -> str: + """ + Generate a processed image path based on the original image path. + For example, '/path/to/image.jpg' becomes '/path/to/processed_image.jpg'. + """ + directory, filename = os.path.split(image_path) + processed_filename = f"processed_{filename}" + return os.path.join(directory, processed_filename) + + def listen_for_tasks(self) -> None: + """ + Listen for image processing tasks from the server. + Each received message is assumed to be an image file path. + """ + try: + while True: + data = self.socket.recv(self.buffer_size).decode().strip() + if not data: + logging.info("No data received; the server may have closed the connection.") + break + + # For this design, we assume 'data' is the image path. + image_path = data + logging.info(f"Received task: {image_path}") + + # Process the image + self.process_image(image_path) + + # Notify the server that processing is complete + self.socket.sendall("DONE".encode()) + logging.info("Notified server: DONE") + except Exception as e: + logging.error(f"Error during task processing: {e}") + finally: + self.disconnect() + + def disconnect(self) -> None: + """ + Close the connection with the server. + """ + if self.socket: + self.socket.close() + logging.info("Disconnected from server.") + + def run(self) -> None: + """ + Connect to the server and start listening for tasks. + """ + try: + self.connect() + self.listen_for_tasks() + except Exception as e: + logging.error(f"Processor client encountered an error: {e}") + self.disconnect() + + +def main(): + SERVER_IP = "127.0.0.1" + SERVER_PORT = 5000 + client = ProcessorClient(SERVER_IP, SERVER_PORT) + client.run() + + +if __name__ == "__main__": + main() diff --git a/src/producer/__init__.py b/src/producer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/producer/producer.py b/src/producer/producer.py new file mode 100644 index 0000000..acd9074 --- /dev/null +++ b/src/producer/producer.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +""" +producer.py + +A ProducerClient that connects to a central server, reads frames from an RTSP stream using OpenCV, +saves them to disk, and notifies the server when a new image is produced. + +Sensitive data (e.g. the RTSP URL) is passed via command-line arguments. +""" + +import cv2 +import socket +import time +import logging +import os +import argparse + + +# Configure logging for the module. +logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") + + +class ProducerClient: + """ + ProducerClient connects to the central server as a producer. + It reads frames from an RTSP stream, saves them as images, and notifies the server + of newly produced images. + """ + + def __init__( + self, + rtsp_url: str, + server_ip: str, + server_port: int, + num_frames: int = 10, + output_dir: str = "./", + capture_interval: float = 2.0, + ): + """ + Initialize the ProducerClient. + + Args: + rtsp_url (str): The RTSP stream URL (sensitive data). + server_ip (str): The IP address of the central server. + server_port (int): The port number of the central server. + num_frames (int): The number of frames to capture. + output_dir (str): Directory to save the captured images. + capture_interval (float): Time (in seconds) to wait between captures. + """ + self.rtsp_url = rtsp_url + self.server_ip = server_ip + self.server_port = server_port + self.num_frames = num_frames + self.output_dir = output_dir + self.capture_interval = capture_interval + self.socket = None + + def connect(self) -> None: + """ + Establish a connection to the central server and register as a producer. + """ + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.server_ip, self.server_port)) + logging.info(f"Connected to server at {self.server_ip}:{self.server_port}") + # Register with the server as a producer. + self.socket.sendall("ROLE:producer".encode()) + except Exception as e: + logging.error(f"Failed to connect or register with the server: {e}") + raise + + def send_message(self, message: str) -> None: + """ + Send a message to the server over the established socket connection. + + Args: + message (str): The message to send. + """ + try: + self.socket.sendall(message.encode()) + logging.info(f"Sent message to server: {message}") + except Exception as e: + logging.error(f"Error sending message to server: {e}") + + def produce_frames(self) -> None: + """ + Capture frames from the RTSP stream, save them as images, and notify the server. + """ + cap = cv2.VideoCapture(self.rtsp_url) + if not cap.isOpened(): + logging.error(f"Error: Could not open RTSP stream: {self.rtsp_url}") + return + + os.makedirs(self.output_dir, exist_ok=True) + frame_count = 0 + + try: + while frame_count < self.num_frames: + ret, frame = cap.read() + if not ret: + logging.error("Error: Could not read frame from stream.") + break + + filename = f"frame_{frame_count:03d}.jpg" + filepath = os.path.join(self.output_dir, filename) + + if cv2.imwrite(filepath, frame): + logging.info(f"Saved frame {frame_count} to {filepath}") + else: + logging.error(f"Failed to save frame {frame_count} to {filepath}") + continue # Optionally, decide if you want to break or try again + + # Send a message to the server notifying a new image is produced. + message = f"{filepath}" + self.send_message(message) + + frame_count += 1 + time.sleep(self.capture_interval) + + logging.info(f"Finished capturing. Total frames saved: {frame_count}") + except Exception as e: + logging.error(f"An error occurred during frame production: {e}") + finally: + cap.release() + + def disconnect(self) -> None: + """ + Disconnect from the central server. + """ + if self.socket: + self.socket.close() + logging.info("Disconnected from server.") + + def run(self) -> None: + """ + Connect to the server, capture frames, and then disconnect. + """ + try: + self.connect() + self.produce_frames() + except Exception as e: + logging.error(f"Error in ProducerClient: {e}") + finally: + self.disconnect() + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Producer Client for capturing RTSP stream frames and notifying a central server." + ) + parser.add_argument( + "--rtsp_url", + type=str, + required=True, + help="RTSP stream URL (sensitive data; do not hard code)", + ) + parser.add_argument( + "--server_ip", type=str, default="127.0.0.1", help="Central server IP address" + ) + parser.add_argument( + "--server_port", type=int, default=5000, help="Central server port number" + ) + parser.add_argument( + "--num_frames", type=int, default=10, help="Number of frames to capture" + ) + parser.add_argument( + "--output_dir", + type=str, + default="./", + help="Directory to save captured frames", + ) + parser.add_argument( + "--capture_interval", + type=float, + default=2.0, + help="Interval (in seconds) between frame captures", + ) + + args = parser.parse_args() + + producer = ProducerClient( + rtsp_url=args.rtsp_url, + server_ip=args.server_ip, + server_port=args.server_port, + num_frames=args.num_frames, + output_dir=args.output_dir, + capture_interval=args.capture_interval, + ) + producer.run() + + +if __name__ == "__main__": + main() diff --git a/src/server/__init__.py b/src/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/server.py b/src/server/server.py new file mode 100644 index 0000000..9ed1d22 --- /dev/null +++ b/src/server/server.py @@ -0,0 +1,144 @@ +# server.py +import socket +import threading +import queue +import time + +class ImageServer: + def __init__(self, host='127.0.0.1', port=5000): + self.server_ip = host + self.server_port = port + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.bind((self.server_ip, self.server_port)) + self.server_socket.listen() + print(f"Server listening on {self.server_ip}:{self.server_port}") + + # Queue for new image tasks (the image paths) + self.image_queue = queue.Queue() + + # Registry for connected clients. + # Processors are stored as: {client_socket: {'address': addr, 'status': 'idle'/'busy'}} + self.processors = {} + # (Optional) keep track of producers if needed. + self.producers = [] + + # A lock to protect shared data structures + self.lock = threading.Lock() + + def start(self): + # Start a thread that continuously dispatches image tasks to idle processors. + dispatcher_thread = threading.Thread(target=self.dispatcher, daemon=True) + dispatcher_thread.start() + + while True: + client_socket, addr = self.server_socket.accept() + print(f"[Server] New connection from {addr}") + threading.Thread(target=self.handle_client, args=(client_socket, addr), daemon=True).start() + + def handle_client(self, client_socket, addr): + """ + First message from the client must be its registration, + for example: "ROLE:producer" or "ROLE:processor" + """ + try: + reg_data = client_socket.recv(1024).decode().strip() + if reg_data.startswith("ROLE:"): + role = reg_data.split(":", 1)[1].strip() + if role == "producer": + with self.lock: + self.producers.append(client_socket) + print(f"[Server] Registered producer from {addr}") + self.handle_producer(client_socket, addr) + elif role == "processor": + with self.lock: + self.processors[client_socket] = {'address': addr, 'status': 'idle'} + print(f"[Server] Registered processor from {addr}") + self.handle_processor(client_socket, addr) + else: + print(f"[Server] Unknown role '{role}' from {addr}. Disconnecting.") + client_socket.close() + else: + print(f"[Server] No valid registration from {addr}. Disconnecting.") + client_socket.close() + except Exception as e: + print(f"[Server] Error during registration from {addr}: {e}") + client_socket.close() + + def handle_producer(self, client_socket, addr): + """ + Receives image paths from the producer and enqueues them. + """ + try: + while True: + data = client_socket.recv(1024).decode().strip() + if not data: + break # connection closed + # Expect data to be an image path (or any image-notification) + print(f"[Server] Received image path from producer {addr}: {data}") + self.image_queue.put(data) + except Exception as e: + print(f"[Server] Producer {addr} error: {e}") + finally: + print(f"[Server] Producer {addr} disconnected.") + with self.lock: + if client_socket in self.producers: + self.producers.remove(client_socket) + client_socket.close() + + def handle_processor(self, client_socket, addr): + """ + Listens for processor messages (e.g. a "DONE" notification after processing). + """ + try: + while True: + data = client_socket.recv(1024).decode().strip() + if not data: + break + print(f"[Server] Received from processor {addr}: {data}") + if data.upper() == "DONE": + with self.lock: + if client_socket in self.processors: + self.processors[client_socket]['status'] = 'idle' + print(f"[Server] Processor {addr} set to idle.") + except Exception as e: + print(f"[Server] Processor {addr} error: {e}") + finally: + print(f"[Server] Processor {addr} disconnected.") + with self.lock: + if client_socket in self.processors: + del self.processors[client_socket] + client_socket.close() + + def dispatcher(self): + """ + Waits for new image tasks and assigns them to an idle processor. + If no idle processor is available, waits (polling every 0.5 sec). + """ + while True: + image_path = self.image_queue.get() # Wait for a new image task + assigned = False + while not assigned: + with self.lock: + # Find any idle processor + idle_processors = [ + sock for sock, info in self.processors.items() + if info['status'] == 'idle' + ] + if idle_processors: + processor_socket = idle_processors[0] + try: + processor_socket.sendall(image_path.encode()) + self.processors[processor_socket]['status'] = 'busy' + print(f"[Server] Dispatched image '{image_path}' to processor {self.processors[processor_socket]['address']}") + assigned = True + except Exception as e: + print(f"[Server] Error sending image to processor {self.processors[processor_socket]['address']}: {e}") + # Remove this processor if there's an error + del self.processors[processor_socket] + # If no processor is idle, we simply wait a little before trying again. + if not assigned: + time.sleep(0.5) + +if __name__ == "__main__": + server = ImageServer(host="127.0.0.1", port=5000) + server.start()