Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,5 @@ pip-selfcheck.json

# Other
MANIFEST
*.env
.vscode/*
94 changes: 67 additions & 27 deletions ffprobe/ffprobe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import platform
import re
import subprocess
from typing import Optional

from ffprobe.exceptions import FFProbeError

Expand All @@ -29,38 +30,37 @@ def __init__(self, path_to_video):

if os.path.isfile(self.path_to_video) or self.path_to_video.startswith('http'):
if platform.system() == 'Windows':
cmd = ["ffprobe", "-show_streams", self.path_to_video]
cmd = ["ffprobe", "-show_streams", "-show_format", self.path_to_video]
else:
cmd = ["ffprobe -show_streams " + pipes.quote(self.path_to_video)]

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

stream = False
ignoreLine = False
aggregate_lines = False
self.streams = []
self.video = []
self.audio = []
self.subtitle = []
self.attachment = []
self.timecode = None

for line in iter(p.stdout.readline, b''):
line = line.decode('UTF-8', 'ignore')

if '[STREAM]' in line:
stream = True
ignoreLine = False
aggregate_lines = True
data_lines = []
elif '[/STREAM]' in line and stream:
elif '[/STREAM]' in line and aggregate_lines:
stream = False
ignoreLine = False
# noinspection PyUnboundLocalVariable
self.streams.append(FFStream(data_lines))
elif stream:
if '[SIDE_DATA]' in line:
ignoreLine = True
elif '[/SIDE_DATA]' in line:
ignoreLine = False
elif ignoreLine == False:
elif '[FORMAT]' in line:
aggregate_lines = True
data_lines = []
elif '[/FORMAT]' in line and aggregate_lines:
aggregate_lines = False
self.format = FFFormat(data_lines)
elif aggregate_lines and '=' in line:
data_lines.append(line)

self.metadata = {}
Expand All @@ -84,14 +84,25 @@ def __init__(self, path_to_video):
self.metadata[m.groups()[0]] = m.groups()[1].strip()

if '[STREAM]' in line:
stream = True
aggregate_lines = True
data_lines = []
elif '[/STREAM]' in line and stream:
stream = False
aggregate_lines = False
self.streams.append(FFStream(data_lines))
elif stream:
elif '[FORMAT]' in line:
aggregate_lines = True
data_lines = []
elif '[/FORMAT]' in line and aggregate_lines:
aggregate_lines = False
self.format = FFFormat(data_lines)
elif aggregate_lines and '=' in line:
data_lines.append(line)

if 'timecode' in line:
if match := re.search('(\d\d:\d\d:\d\d:\d\d)', line, re.IGNORECASE):
self.timecode = match.group(1)


p.stdout.close()
p.stderr.close()

Expand All @@ -108,9 +119,17 @@ def __init__(self, path_to_video):
raise IOError('No such media file or stream is not responding: ' + self.path_to_video)

def __repr__(self):
return "<FFprobe: {metadata}, {video}, {audio}, {subtitle}, {attachment}>".format(**vars(self))
return "<FFprobe: {metadata}, {video}, {audio}, {subtitle}, {attachment}, {timecode}>".format(**vars(self))


class FFFormat:
"""
An object representation of the overall container format of a multimedia file.
"""
def __init__(self, data_lines):
for line in data_lines:
self.__dict__.update({key: value for key, value, *_ in [line.strip().split('=')]})

class FFStream:
"""
An object representation of an individual stream in a multimedia file.
Expand All @@ -120,17 +139,11 @@ def __init__(self, data_lines):
for line in data_lines:
self.__dict__.update({key: value for key, value, *_ in [line.strip().split('=')]})

try:
self.__dict__['framerate'] = round(
functools.reduce(
operator.truediv, map(int, self.__dict__.get('avg_frame_rate', '').split('/'))
)
)
frame_rate = self._frame_rate_from('avg_frame_rate')
if frame_rate is None or frame_rate == 0 or frame_rate == "0/0":
frame_rate = self._frame_rate_from('r_frame_rate')

except ValueError:
self.__dict__['framerate'] = None
except ZeroDivisionError:
self.__dict__['framerate'] = 0
self.__dict__['framerate'] = frame_rate

def __repr__(self):
if self.is_video():
Expand All @@ -148,6 +161,17 @@ def __repr__(self):

return template.format(**self.__dict__)

def _frame_rate_from(self, metadata: str) -> Optional[float]:
try:
return round(
functools.reduce(operator.truediv, map(int, self.__dict__.get(metadata, '').split('/'))),
2
)
except ValueError:
return None
except ZeroDivisionError:
return 0

def is_audio(self):
"""
Is this stream labelled as an audio stream?
Expand Down Expand Up @@ -217,6 +241,12 @@ def frames(self):

return frame_count

def audio_channel_layout(self):
"""
Display a string of the audio channel, ie: mono, stereo, quad
"""
return self.__dict__.get('channel_layout', None)

def duration_seconds(self):
"""
Returns the runtime duration of the video stream as a floating point number of seconds.
Expand Down Expand Up @@ -264,3 +294,13 @@ def bit_rate(self):
return int(self.__dict__.get('bit_rate', ''))
except ValueError:
raise FFProbeError('None integer bit_rate')

def is_progressive(self):
if self.is_video() and self.__dict__.get('field_order', "") == "progressive":
return True
return False

def is_interlaced(self):
if self.is_video() and self.__dict__.get('field_order', "") in ["tt", "bb", "tb", "bt"]:
return True
return False
4 changes: 1 addition & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
setup(
name='ffprobe-python',
version='1.0.3',
description="""
A wrapper around ffprobe command to extract metadata from media files.
""",
description="A wrapper around ffprobe command to extract metadata from media files.",
long_description=long_description,
long_description_content_type="text/markdown",
author='Simon Hargreaves',
Expand Down
16 changes: 14 additions & 2 deletions tests/ffprobe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@ def test_video ():
try:
if stream.is_video():
frame_rate = stream.frames() / stream.duration_seconds()
print('\t\tFrame Rate:', frame_rate)
print('\t\tFrame Size:', stream.frame_size())
print('\t\tFrame Rate :', frame_rate)
print('\t\tFrame Size :', stream.frame_size())
print('\t\tField order:', stream.field_order)
print('\t\tProgressive:', stream.is_progressive())
print('\t\tInterlaced :', stream.is_interlaced())
print('\t\tDuration:', stream.duration_seconds())
print('\t\tFrames:', stream.frames())
print('\t\tIs video:', stream.is_video())
if stream.is_audio():
print('\t\tAudio channel layout:', stream.audio_channel_layout())
except FFProbeError as e:
print(e)
except Exception as e:
print(e)
print(f"\tFormat:\n\t\tDuration: {media.format.duration}")


def test_stream ():
for test_stream in test_streams:
Expand All @@ -49,9 +56,14 @@ def test_stream ():
frame_rate = stream.frames() / stream.duration_seconds()
print('\t\tFrame Rate:', frame_rate)
print('\t\tFrame Size:', stream.frame_size())
print('\t\tField order:', stream.field_order)
print('\t\tProgressive:', stream.is_progressive())
print('\t\tInterlaced :', stream.is_interlaced())
print('\t\tDuration:', stream.duration_seconds())
print('\t\tFrames:', stream.frames())
print('\t\tIs video:', stream.is_video())
if stream.is_audio():
print('\t\tAudio channel layout:', stream.audio_channel_layout())
except FFProbeError as e:
print(e)
except Exception as e:
Expand Down