diff --git a/.gitignore b/.gitignore index b1efb15..8ef98ae 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,5 @@ pip-selfcheck.json # Other MANIFEST +*.env +.vscode/* \ No newline at end of file diff --git a/ffprobe/ffprobe.py b/ffprobe/ffprobe.py index f1eb3dc..bd86de2 100644 --- a/ffprobe/ffprobe.py +++ b/ffprobe/ffprobe.py @@ -8,6 +8,7 @@ import platform import re import subprocess +from typing import Optional from ffprobe.exceptions import FFProbeError @@ -29,38 +30,37 @@ def __init__(self, path_to_video): if os.path.isfile(self.path_to_video) or self.path_to_video.startswith('http'): if platform.system() == 'Windows': - cmd = ["ffprobe", "-show_streams", self.path_to_video] + cmd = ["ffprobe", "-show_streams", "-show_format", self.path_to_video] else: cmd = ["ffprobe -show_streams " + pipes.quote(self.path_to_video)] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - stream = False - ignoreLine = False + aggregate_lines = False self.streams = [] self.video = [] self.audio = [] self.subtitle = [] self.attachment = [] + self.timecode = None for line in iter(p.stdout.readline, b''): line = line.decode('UTF-8', 'ignore') if '[STREAM]' in line: - stream = True - ignoreLine = False + aggregate_lines = True data_lines = [] - elif '[/STREAM]' in line and stream: + elif '[/STREAM]' in line and aggregate_lines: stream = False - ignoreLine = False # noinspection PyUnboundLocalVariable self.streams.append(FFStream(data_lines)) - elif stream: - if '[SIDE_DATA]' in line: - ignoreLine = True - elif '[/SIDE_DATA]' in line: - ignoreLine = False - elif ignoreLine == False: + elif '[FORMAT]' in line: + aggregate_lines = True + data_lines = [] + elif '[/FORMAT]' in line and aggregate_lines: + aggregate_lines = False + self.format = FFFormat(data_lines) + elif aggregate_lines and '=' in line: data_lines.append(line) self.metadata = {} @@ -84,14 +84,25 @@ def __init__(self, path_to_video): self.metadata[m.groups()[0]] = m.groups()[1].strip() if '[STREAM]' in line: - stream = True + aggregate_lines = True data_lines = [] elif '[/STREAM]' in line and stream: - stream = False + aggregate_lines = False self.streams.append(FFStream(data_lines)) - elif stream: + elif '[FORMAT]' in line: + aggregate_lines = True + data_lines = [] + elif '[/FORMAT]' in line and aggregate_lines: + aggregate_lines = False + self.format = FFFormat(data_lines) + elif aggregate_lines and '=' in line: data_lines.append(line) + if 'timecode' in line: + if match := re.search('(\d\d:\d\d:\d\d:\d\d)', line, re.IGNORECASE): + self.timecode = match.group(1) + + p.stdout.close() p.stderr.close() @@ -108,9 +119,17 @@ def __init__(self, path_to_video): raise IOError('No such media file or stream is not responding: ' + self.path_to_video) def __repr__(self): - return "".format(**vars(self)) + return "".format(**vars(self)) +class FFFormat: + """ + An object representation of the overall container format of a multimedia file. + """ + def __init__(self, data_lines): + for line in data_lines: + self.__dict__.update({key: value for key, value, *_ in [line.strip().split('=')]}) + class FFStream: """ An object representation of an individual stream in a multimedia file. @@ -120,17 +139,11 @@ def __init__(self, data_lines): for line in data_lines: self.__dict__.update({key: value for key, value, *_ in [line.strip().split('=')]}) - try: - self.__dict__['framerate'] = round( - functools.reduce( - operator.truediv, map(int, self.__dict__.get('avg_frame_rate', '').split('/')) - ) - ) + frame_rate = self._frame_rate_from('avg_frame_rate') + if frame_rate is None or frame_rate == 0 or frame_rate == "0/0": + frame_rate = self._frame_rate_from('r_frame_rate') - except ValueError: - self.__dict__['framerate'] = None - except ZeroDivisionError: - self.__dict__['framerate'] = 0 + self.__dict__['framerate'] = frame_rate def __repr__(self): if self.is_video(): @@ -148,6 +161,17 @@ def __repr__(self): return template.format(**self.__dict__) + def _frame_rate_from(self, metadata: str) -> Optional[float]: + try: + return round( + functools.reduce(operator.truediv, map(int, self.__dict__.get(metadata, '').split('/'))), + 2 + ) + except ValueError: + return None + except ZeroDivisionError: + return 0 + def is_audio(self): """ Is this stream labelled as an audio stream? @@ -217,6 +241,12 @@ def frames(self): return frame_count + def audio_channel_layout(self): + """ + Display a string of the audio channel, ie: mono, stereo, quad + """ + return self.__dict__.get('channel_layout', None) + def duration_seconds(self): """ Returns the runtime duration of the video stream as a floating point number of seconds. @@ -264,3 +294,13 @@ def bit_rate(self): return int(self.__dict__.get('bit_rate', '')) except ValueError: raise FFProbeError('None integer bit_rate') + + def is_progressive(self): + if self.is_video() and self.__dict__.get('field_order', "") == "progressive": + return True + return False + + def is_interlaced(self): + if self.is_video() and self.__dict__.get('field_order', "") in ["tt", "bb", "tb", "bt"]: + return True + return False diff --git a/setup.py b/setup.py index 8b496ab..7a0374b 100644 --- a/setup.py +++ b/setup.py @@ -9,9 +9,7 @@ setup( name='ffprobe-python', version='1.0.3', - description=""" - A wrapper around ffprobe command to extract metadata from media files. - """, + description="A wrapper around ffprobe command to extract metadata from media files.", long_description=long_description, long_description_content_type="text/markdown", author='Simon Hargreaves', diff --git a/tests/ffprobe_test.py b/tests/ffprobe_test.py index 9078f76..78d0e8d 100644 --- a/tests/ffprobe_test.py +++ b/tests/ffprobe_test.py @@ -27,15 +27,22 @@ def test_video (): try: if stream.is_video(): frame_rate = stream.frames() / stream.duration_seconds() - print('\t\tFrame Rate:', frame_rate) - print('\t\tFrame Size:', stream.frame_size()) + print('\t\tFrame Rate :', frame_rate) + print('\t\tFrame Size :', stream.frame_size()) + print('\t\tField order:', stream.field_order) + print('\t\tProgressive:', stream.is_progressive()) + print('\t\tInterlaced :', stream.is_interlaced()) print('\t\tDuration:', stream.duration_seconds()) print('\t\tFrames:', stream.frames()) print('\t\tIs video:', stream.is_video()) + if stream.is_audio(): + print('\t\tAudio channel layout:', stream.audio_channel_layout()) except FFProbeError as e: print(e) except Exception as e: print(e) + print(f"\tFormat:\n\t\tDuration: {media.format.duration}") + def test_stream (): for test_stream in test_streams: @@ -49,9 +56,14 @@ def test_stream (): frame_rate = stream.frames() / stream.duration_seconds() print('\t\tFrame Rate:', frame_rate) print('\t\tFrame Size:', stream.frame_size()) + print('\t\tField order:', stream.field_order) + print('\t\tProgressive:', stream.is_progressive()) + print('\t\tInterlaced :', stream.is_interlaced()) print('\t\tDuration:', stream.duration_seconds()) print('\t\tFrames:', stream.frames()) print('\t\tIs video:', stream.is_video()) + if stream.is_audio(): + print('\t\tAudio channel layout:', stream.audio_channel_layout()) except FFProbeError as e: print(e) except Exception as e: