Open5

ffmpeg で RTSP MP4配信サーバー環境の構築

PINTOPINTO
Dockerfile.ffmpegrtsp
FROM alpine:3.8

RUN apk add --no-cache ffmpeg
EXPOSE 8554 8090
ENTRYPOINT ["ffserver"]
docker-compose.yml
version: '3'

services:
    rtspserver-ffmpeg:
        build: .
        container_name: rtspserver-ffmpeg
        image: rtspserver-ffmpeg:latest
        ports:
            - "8554:8554"
            - "8090:8090"
        volumes:
            - ./ffserver.conf:/etc/ffserver.conf
        restart: always
ffserver.conf
HTTPPort 8090
HTTPBindAddress 0.0.0.0

RTSPPort 8554
RTSPBindAddress 0.0.0.0

MaxClients 100
MaxBandwidth 100000
CustomLog -

<Feed feed.ffm>
File /home/user/feed.ffm
</Feed>

<Stream unicast>
Format rtp
Feed feed.ffm
VideoCodec libx264
VideoFrameRate 24
VideoBitRate 100
VideoSize 640x480
AVPresetVideo default
AVPresetVideo baseline
AVOptionVideo flags +global_header
NoAudio
ACL allow 127.0.0.1
ACL allow 192.168.0.0 192.168.255.255
</Stream>

<Stream status.html>
Format status
ACL allow 127.0.0.1
ACL allow 192.168.0.0 192.168.255.255
</Stream>
PINTOPINTO
docker build -t pinto0309/rtspserver-ffmpeg:latest -f Dockerfile.ffmpegrtsp .
docker push pinto0309/rtspserver-ffmpeg:latest
PINTOPINTO

1. docker compose パターン

  • docker compose 経由でコンテナを起動
docker compose up -d
  • docker compose 経由でコンテナを起動したあとに docker compose exec コマンドを使用して ffmpeg によるRTSP配信開始指示、-stream_loop -1 は無限ループ再生オプション
docker compose exec rtspserver-ffmpeg \
ffmpeg -re -stream_loop -1 -i video.mp4 http://localhost:8090/feed.ffm
  • docker compose 経由でコンテナを起動したあとに配信用コンテナを終了
docker compose down

2. docker run パターン

  • docker run 経由でデーモンとしてコンテナをバックエンド起動
docker run --rm -d \
-p 8554:8554 \
-p 8090:8090 \
-v ${PWD}/ffserver.conf:/etc/ffserver.conf \
-v ${PWD}:/home/user/ \
--net=host \
--name rtspserver-ffmpeg \
pinto0309/rtspserver-ffmpeg:latest
  • docker run 経由でコンテナを起動したあとに docker exec コマンドを使用して ffmpeg によるRTSP配信開始指示、-stream_loop -1 は無限ループ再生オプション
docker exec rtspserver-ffmpeg \
ffmpeg -re -stream_loop -1 -i video.mp4 http://localhost:8090/feed.ffm
  • docker run 経由でコンテナを起動したあとに配信用コンテナを終了
docker stop rtspserver-ffmpeg
PINTOPINTO
  • vlc を使用したRTSP配信内容の確認
vlc rtsp://0.0.0.0:8554/unicast
  • opencv を使用したRTSP配信内容の確認
video.py
video.py
from pathlib import Path
from enum import Enum
from collections import deque
from urllib.parse import urlparse
import subprocess
import threading
import logging
import cv2
from typing import Tuple


LOGGER = logging.getLogger(__name__)
WITH_GSTREAMER = False # True


class Protocol(Enum):
    IMAGE = 0
    VIDEO = 1
    CSI   = 2
    V4L2  = 3
    RTSP  = 4
    HTTP  = 5


class VideoIO:
    def __init__(
        self,
        output_size: Tuple,
        input_uri: str,
        output_uri: str=None,
        output_fps: int=30,
        input_resolution: Tuple=(640, 480),
        frame_rate: int=30,
        buffer_size: int=10,
        proc_fps: int=30
    ):
        """Class for video capturing and output saving.
        Encoding, decoding, and scaling can be accelerated using the GStreamer backend.

        Parameters
        ----------
        output_size : tuple
            Width and height of each frame to output.
        input_uri : str
            URI to input stream. It could be image sequence (e.g. '%06d.jpg'), video file (e.g. 'file.mp4'),
            MIPI CSI camera (e.g. 'csi://0'), USB/V4L2 camera (e.g. '/dev/video0'),
            RTSP stream (e.g. 'rtsp://<user>:<password>@<ip>:<port>/<path>'),
            or HTTP live stream (e.g. 'http://<user>:<password>@<ip>:<port>/<path>')
        output_uri : str, optionals
            URI to an output video file.
        output_fps : int, optionals
            Output video recording frame rate. Specify a value less than 30.
        input_resolution : tuple, optional
            Original resolution of the input source.
            Useful to set a certain capture mode of a USB/CSI camera.
        frame_rate : int, optional
            Frame rate of the input source.
            Required if frame rate cannot be deduced, e.g. image sequence and/or RTSP.
            Useful to set a certain capture mode of a USB/CSI camera.
        buffer_size : int, optional
            Number of frames to buffer.
            For live sources, a larger buffer drops less frames but increases latency.
        proc_fps : int, optional
            Estimated processing speed that may limit the capture interval `cap_dt`.
            This depends on hardware and processing complexity.
        """
        self.size = output_size
        self.input_uri = input_uri
        self.output_uri = output_uri
        self.output_fps = output_fps
        self.resolution = input_resolution
        assert frame_rate > 0
        self.frame_rate = frame_rate
        assert buffer_size >= 1
        self.buffer_size = buffer_size
        assert proc_fps > 0
        self.proc_fps = proc_fps

        self.protocol = self._parse_uri(self.input_uri)
        if self.protocol == Protocol.V4L2:
            result = subprocess.check_output(
                [
                    'sudo', 'chmod', '777', fr'{self.input_uri[:-1]}0'
                ],
                stderr=subprocess.PIPE
            ).decode('utf-8')
            result = subprocess.check_output(
                [
                    'sudo', 'chmod', '777', fr'{self.input_uri[:-1]}1'
                ],
                stderr=subprocess.PIPE
            ).decode('utf-8')
        self.is_live = self.protocol != Protocol.IMAGE and self.protocol != Protocol.VIDEO
        if WITH_GSTREAMER:
            self.source = cv2.VideoCapture(self._gst_cap_pipeline(), cv2.CAP_GSTREAMER)
        else:
            self.source = cv2.VideoCapture(self.input_uri)

        self.frame_queue: deque = deque([], maxlen=self.buffer_size)
        self.cond = threading.Condition()
        self.exit_event = threading.Event()
        self.cap_thread = threading.Thread(target=self._capture_frames)

        ret, frame = self.source.read()
        if not ret:
            raise RuntimeError(f'Unable to read video stream: {self.input_uri}')
        self.frame_queue.append(frame)

        width = self.source.get(cv2.CAP_PROP_FRAME_WIDTH)
        height = self.source.get(cv2.CAP_PROP_FRAME_HEIGHT)
        self.cap_fps = self.source.get(cv2.CAP_PROP_FPS)
        self.do_resize = (width, height) != self.size
        if self.cap_fps == 0:
            self.cap_fps = self.frame_rate # fallback to config if unknown
        LOGGER.info('%dx%d stream @ %d FPS', width, height, self.cap_fps)

        if self.output_uri is not None:
            Path(self.output_uri).parent.mkdir(parents=True, exist_ok=True)
            output_fps = 1 / self.cap_dt
            if WITH_GSTREAMER:
                self.writer = cv2.VideoWriter(
                    self._gst_write_pipeline(),
                    cv2.CAP_GSTREAMER,
                    0,
                    fps=output_fps,
                    frameSize=self.size,
                    isColor=True
                )
            else:
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                self.writer = cv2.VideoWriter(
                    filename=self.output_uri,
                    fourcc=fourcc,
                    fps=output_fps if self.output_fps >= output_fps else self.output_fps,
                    frameSize=self.size,
                    isColor=True,
                )

    @property
    def cap_dt(self):
        # limit capture interval at processing latency for live sources
        return 1 / min(self.cap_fps, self.proc_fps) if self.is_live else 1 / self.cap_fps

    def start_capture(self):
        """Start capturing from file or device."""
        if not self.source.isOpened():
            self.source.open(self._gst_cap_pipeline(), cv2.CAP_GSTREAMER)
        if not self.cap_thread.is_alive():
            self.cap_thread.start()

    def stop_capture(self):
        """Stop capturing from file or device."""
        with self.cond:
            self.exit_event.set()
            self.cond.notify()
        self.frame_queue.clear()
        self.cap_thread.join()

    def read(self):
        """Reads the next video frame.

        Returns
        -------
        ndarray
            Returns None if there are no more frames.
        """
        with self.cond:
            while len(self.frame_queue) == 0 and not self.exit_event.is_set():
                self.cond.wait()
            if len(self.frame_queue) == 0 and self.exit_event.is_set():
                return None
            frame = self.frame_queue.popleft()
            self.cond.notify()
        if self.do_resize:
            frame = cv2.resize(frame, self.size)
        return frame

    def write(self, frame):
        """Writes the next video frame."""
        assert hasattr(self, 'writer')
        self.writer.write(frame)

    def release(self):
        """Cleans up input and output sources."""
        self.stop_capture()
        if hasattr(self, 'writer'):
            self.writer.release()
        self.source.release()

    def _gst_cap_pipeline(self):
        gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
        if 'nvvidconv' in gst_elements and self.protocol != Protocol.V4L2:
            # format conversion for hardware decoder
            cvt_pipeline = (
                'nvvidconv interpolation-method=5 ! '
                'video/x-raw, width=%d, height=%d, format=BGRx !'
                'videoconvert ! appsink sync=false'
                % self.size
            )
        else:
            cvt_pipeline = (
                'videoscale ! '
                'video/x-raw, width=%d, height=%d !'
                'videoconvert ! appsink sync=false'
                % self.size
            )

        if self.protocol == Protocol.IMAGE:
            pipeline = (
                'multifilesrc location=%s index=1 caps="image/%s,framerate=%d/1" ! decodebin ! '
                % (
                    self.input_uri,
                    self._img_format(self.input_uri),
                    self.frame_rate
                )
            )
        elif self.protocol == Protocol.VIDEO:
            pipeline = 'filesrc location=%s ! decodebin ! ' % self.input_uri
        elif self.protocol == Protocol.CSI:
            if 'nvarguscamerasrc' in gst_elements:
                pipeline = (
                    'nvarguscamerasrc sensor_id=%s ! '
                    'video/x-raw(memory:NVMM), width=%d, height=%d, '
                    'format=NV12, framerate=%d/1 ! '
                    % (
                        self.input_uri[6:],
                        *self.resolution,
                        self.frame_rate
                    )
                )
            else:
                raise RuntimeError('GStreamer CSI plugin not found')
        elif self.protocol == Protocol.V4L2:
            if 'v4l2src' in gst_elements:
                pipeline = (
                    'v4l2src device=%s ! '
                    'video/x-raw, width=%d, height=%d, '
                    'format=YUY2, framerate=%d/1 ! '
                    % (
                        self.input_uri,
                        *self.resolution,
                        self.frame_rate
                    )
                )
            else:
                raise RuntimeError('GStreamer V4L2 plugin not found')
        elif self.protocol == Protocol.RTSP:
            pipeline = (
                'rtspsrc location=%s latency=0 ! '
                'capsfilter caps=application/x-rtp,media=video ! decodebin ! ' % self.input_uri
            )
        elif self.protocol == Protocol.HTTP:
            pipeline = 'souphttpsrc location=%s is-live=true ! decodebin ! ' % self.input_uri

        """
        'v4l2src device=/dev/video0 ! video/x-raw, width=640, height=480, format=YUY2, framerate=30/1 ! videoscale ! video/x-raw, width=640, height=480 !videoconvert ! appsink sync=false'
        """
        return pipeline + cvt_pipeline

    def _gst_write_pipeline(self):
        gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
        # use hardware encoder if found
        if 'omxh264enc' in gst_elements:
            h264_encoder = 'omxh264enc preset-level=2'
        elif 'x264enc' in gst_elements:
            h264_encoder = 'x264enc pass=4'
        else:
            raise RuntimeError('GStreamer H.264 encoder not found')
        pipeline = (
            'appsrc ! autovideoconvert ! %s ! qtmux ! filesink location=%s '
            % (
                h264_encoder,
                self.output_uri
            )
        )
        return pipeline

    def _capture_frames(self):
        while not self.exit_event.is_set():
            ret, frame = self.source.read()
            with self.cond:
                if not ret:
                    self.exit_event.set()
                    self.cond.notify()
                    break
                # keep unprocessed frames in the buffer for file
                if not self.is_live:
                    while (len(self.frame_queue) == self.buffer_size and
                           not self.exit_event.is_set()):
                        self.cond.wait()
                self.frame_queue.append(frame)
                self.cond.notify()

    @staticmethod
    def _parse_uri(uri):
        result = urlparse(uri)
        if result.scheme == 'csi':
            protocol = Protocol.CSI
        elif result.scheme == 'rtsp':
            protocol = Protocol.RTSP
        elif result.scheme == 'http':
            protocol = Protocol.HTTP
        else:
            if '/dev/video' in result.path:
                protocol = Protocol.V4L2
            elif '%' in result.path:
                protocol = Protocol.IMAGE
            else:
                protocol = Protocol.VIDEO
        return protocol

    @staticmethod
    def _img_format(uri):
        img_format = Path(uri).suffix[1:]
        return 'jpeg' if img_format == 'jpg' else img_format
test_rtsp_recv.py
test_rtsp_recv.py
import cv2
from videoio import VideoIO

stream = VideoIO(
    output_size=(
        640,
        480
    ),
    input_uri=f'rtsp://0.0.0.0:8554/unicast',
    output_uri=None,
    output_fps=30,
    input_resolution=(
        640,
        480
    ),
    frame_rate=30,
    buffer_size=10,
)
stream.start_capture()

try:
    while True:
        frame = stream.read()
        key = cv2.waitKey(1)
        if key == 27:  # ESC
            break
        if frame is None:
            continue
        cv2.imshow('test', frame)
except Exception as ex:
    pass
finally:
    if stream:
        stream.release()
python test_rtsp_recv.py