#!/usr/bin/python3 ''' VideoRelay - Copy/transcode live stream to http stream. (c) 2012-2022 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Examples: http://localhost:9090/raw/mark http://localhost:9090/x264/ch=joj/b=3000 ''' from __future__ import absolute_import from __future__ import print_function import sys, socket, os, time, re, threading, htmlpage from httpserver import AuthHTTPRequestHandler, ThreadingHTTPServer, wrap_ssl from config import allowed_ips, auth if sys.version_info[0]>2: from io import BytesIO as StringIO from urllib.parse import unquote_plus else: from StringIO import StringIO from urllib import unquote_plus # Import gstreamer libraries import gi gi.require_version('Gst', '1.0') from gi.repository import GObject, Gio, Gst #GObject.threads_init() Gst.init(None) __version__ = "0.1" HOST = "" PORT = 9090 SSL = None GST_SOURCES = dict( file = """ filesrc location=%(filename)s ! queue """, multicast = """ udpsrc name=src address=%(ip)s port=%(port)s ! application/x-rtp, media=(string)video, clock-rate=90000, encoding-name=MP2T-ES, payload=33 #! queue """, http = """ curlhttpsrc location=%(filename)s ! queue """, udpxy = """ curlhttpsrc location=http://localhost:4022/udp/%(ip)s:%(port)s/ ! queue """, udpxy_rtp = """ curlhttpsrc location=http://localhost:4022/rtp/%(ip)s:%(port)s/ ! queue """ ) GST_DECODER = """ ! decodebin name=dec ! videoconvert ! avdeinterlace ! videoscale ! video/x-raw, width=%(w)s, height=%(h)s #! queue """ GST_VAAPI_DECODER = """ ! vaapidecode name=dec ! videoconvert ! avdeinterlace ! videoscale #! queue """ GST_SINK = """ #! progressreport ! multisocketsink name=sink #! multifdsink name=sink """ GST_ENCODERS = dict( ogv = GST_DECODER + """ ! theoraenc bitrate=%(kb)s ! mux. dec. ! audioconvert ! vorbisenc ! oggmux name=mux skeleton=true """ + GST_SINK,# + "max-lateness=1000000", vp8 = GST_DECODER + """ ! vp8enc target-bitrate=%(b)s cpu-used=4 threads=8 ! mux. dec. ! audioconvert ! vorbisenc ! webmmux name=mux streamable=true """ + GST_SINK,# + "max-lateness=1000000", vp9 = GST_DECODER + """ ! vp9enc target-bitrate=%(b)s cpu-used=4 threads=8 ! mux. dec. ! audioconvert ! vorbisenc ! webmmux name=mux streamable=true """ + GST_SINK,# + "max-lateness=1000000", x264 = GST_DECODER + """ ! x264enc bitrate=%(kb)s ! video/x-h264, profile=main, width=%(w)s, height=%(h)s ! mux. dec. ! audioconvert ! lamemp3enc #! mpegtsmux name=mux #! avmux_mp4 name=mux ! mp4mux name=mux streamable=true fragment-duration=10 presentation-time=true """ + GST_SINK, x264va = GST_DECODER + """ ! vaapiencode_h264 bitrate=%(kb)s ! video/x-h264, profile=main, width=%(w)s, height=%(h)s ! mux. dec. ! audioconvert ! lamemp3enc ! mpegtsmux name=mux """ + GST_SINK, mpeg4 = GST_DECODER + """ ! avenc_mpeg4 bitrate=%(b)s ! video/mpeg, profile=main, width=%(w)s, height=%(h)s ! mux. dec. ! audioconvert ! twolamemp2enc ! mpegtsmux name=mux """ + GST_SINK, mpeg2va = GST_VAAPI_DECODER + """ ! vaapiencode_mpeg2 bitrate=%(kb)s ! video/mpeg, profile=main, width=%(w)s, height=%(h)s ! mux. dec. ! audioconvert ! twolamemp2enc ! mpegtsmux name=mux """ + GST_SINK, mp3 = """ ! decodebin name=dec ! audioconvert ! lamemp3enc bitrate=%(kb)s name=mux """ + GST_SINK, oga = """ ! decodebin name=dec ! audioconvert ! vorbisenc bitrate=%(b)s ! oggmux name=mux """ + GST_SINK, raw = """ ! rtpmp2tdepay #! queue """ + GST_SINK, rawaudio = """ ! rtpmp2tdepay #! queue """ + GST_SINK, rawvideo = """ ! rtpmp2tdepay #! queue """ + GST_SINK ) GST_ENCODERS["auto"] = GST_ENCODERS["ogv"] def kmg(bytes, unit="B"): sis = [[3, 'G'], [2, 'M'], [1, 'k']] for size, si in sis: if bytes>1024**size: return "%4.2f%s%s" % (float(bytes)/(1024**size), si, unit) return str(bytes)+unit def dhm(seconds): if seconds>3600*24: return "%4.2f days" % (seconds/3600/24) elif seconds>3600: return "%4.2f hours" % (seconds/3600) return "%4.2f minutes" % (seconds/60) def hms(seconds): return "%d:%02d:%02d" % ( seconds/3600, seconds/60%60, seconds % 60 ) class player_class: reg_strip_comments = re.compile("^ *(#.*)\n", re.M) reg_strip_spaces = re.compile("[ \t\n]+") bufsize = 65536 def __init__(self, t='raw', ip=None, port=1234, file=None, udpxy=None, w=720, h=576, b=1000): if ip is None: raise ValueError("No channel selected!") # start stream self.args = dict( t=t, ip=ip, port=port, filename=file, w=w, h=h, b=int(b)*1024, kb=int(b) ) print(self.args) self.player_type = self.args['t'] self.source_type = "multicast" self.channel_name = "" if file: from config import file_browser_path if os.path.abspath(file).startswith(file_browser_path): self.source_type = "file" if udpxy: if udpxy=="rtp": self.source_type = "udpxy_rtp" else: self.source_type = "udpxy" self.pipeline = Gst.Pipeline() self.cmd = ( GST_SOURCES[self.source_type] + GST_ENCODERS[self.player_type] ) % self.args self.cmd = self.reg_strip_comments.sub("", self.cmd) print(self.cmd.strip()) self.playbin = Gst.parse_launch(self.cmd) self.src = self.playbin.get_by_name("src") self.sink = self.playbin.get_by_name("sink") self.sink_factory = self.sink.get_factory().get_name() self.clients = {} self.pipeline.add(self.playbin) self.pipeline.set_state(Gst.State.PAUSED) self.connect() def connect(self): self.sink.connect("client-added", self.add_client) if self.sink_factory=="multisocketsink": self.sink.connect("client-socket-removed", self.remove_client) else: self.sink.connect("client-fd-removed", self.remove_client) def handles(self): return self.sink.get_property("num-handles") def add_client(self, sink, gsock): print("Adding client:", gsock) self.pipeline.set_state(Gst.State.PLAYING) def remove_client(self, sink, gsock): print("Removing socket:", gsock) sink.emit("remove-flush", gsock) if gsock in self.clients: del self.clients[gsock] if self.handles()==0: print("Last client, stopping stream ...") sink.emit("clear") self.src.set_state(Gst.State.NULL) if self.pipeline: #self.playbin.set_state(Gst.State.NULL) #self.pipeline.set_state(Gst.State.NULL) self.pipeline.remove(self.playbin) self.playbin.unref() self.src.unref() self.pipeline.unref() self.pipeline = None #print self.src.get_state(10L**9) # 0.1 ns def remove_all_clients(self): for client in self.clients.keys(): self.remove_client(self.sink, client) def handle(self, wfile, args={}): if self.sink_factory=="multisocketsink": client_id = Gio.Socket().new_from_fd(wfile.fileno()) else: # multifdsink pipe = os.pipe() client_id = pipe[1] args["pipe"] = pipe self.sink.emit("add", client_id) self.clients[client_id] = args self.clients[client_id]["start_time"] = time.time() if self.channel_name is None and "channel_name" in args: self.channel_name = unquote_plus(args["channel_name"]) if self.sink_factory=="multifdsink": pipe = self.clients[client_id]["pipe"] while client_id in self.clients: data = '' while len(data)