#!/usr/bin/env python
# encoding: utf-8
# The MIT License (MIT)
# Copyright (c) 2014-2021 CNRS
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# AUTHORS
# Hervé BREDIN - http://herve.niderb.fr
# Paul LERNER
"""
##########
Annotation
##########
.. plot:: pyplots/annotation.py
:class:`pyannote.core.Annotation` instances are ordered sets of non-empty
tracks:
- ordered, because segments are sorted by start time (and end time in case of tie)
- set, because one cannot add twice the same track
- non-empty, because one cannot add empty track
A track is a (support, name) pair where `support` is a Segment instance,
and `name` is an additional identifier so that it is possible to add multiple
tracks with the same support.
To define the annotation depicted above:
.. code-block:: ipython
In [1]: from pyannote.core import Annotation, Segment
In [6]: annotation = Annotation()
...: annotation[Segment(1, 5)] = 'Carol'
...: annotation[Segment(6, 8)] = 'Bob'
...: annotation[Segment(12, 18)] = 'Carol'
...: annotation[Segment(7, 20)] = 'Alice'
...:
which is actually a shortcut for
.. code-block:: ipython
In [6]: annotation = Annotation()
...: annotation[Segment(1, 5), '_'] = 'Carol'
...: annotation[Segment(6, 8), '_'] = 'Bob'
...: annotation[Segment(12, 18), '_'] = 'Carol'
...: annotation[Segment(7, 20), '_'] = 'Alice'
...:
where all tracks share the same (default) name ``'_'``.
In case two tracks share the same support, use a different track name:
.. code-block:: ipython
In [6]: annotation = Annotation(uri='my_video_file', modality='speaker')
...: annotation[Segment(1, 5), 1] = 'Carol' # track name = 1
...: annotation[Segment(1, 5), 2] = 'Bob' # track name = 2
...: annotation[Segment(12, 18)] = 'Carol'
...:
The track name does not have to be unique over the whole set of tracks.
.. note::
The optional *uri* and *modality* keywords argument can be used to remember
which document and modality (e.g. speaker or face) it describes.
Several convenient methods are available. Here are a few examples:
.. code-block:: ipython
In [9]: annotation.labels() # sorted list of labels
Out[9]: ['Bob', 'Carol']
In [10]: annotation.chart() # label duration chart
Out[10]: [('Carol', 10), ('Bob', 4)]
In [11]: list(annotation.itertracks())
Out[11]: [(<Segment(1, 5)>, 1), (<Segment(1, 5)>, 2), (<Segment(12, 18)>, u'_')]
In [12]: annotation.label_timeline('Carol')
Out[12]: <Timeline(uri=my_video_file, segments=[<Segment(1, 5)>, <Segment(12, 18)>])>
See :class:`pyannote.core.Annotation` for the complete reference.
"""
import itertools
import warnings
from collections import defaultdict
from typing import (
Hashable,
Optional,
Dict,
Union,
Iterable,
List,
Set,
TextIO,
Tuple,
Iterator,
Text,
TYPE_CHECKING,
)
import numpy as np
from sortedcontainers import SortedDict
from . import (
PYANNOTE_URI,
PYANNOTE_MODALITY,
PYANNOTE_SEGMENT,
PYANNOTE_TRACK,
PYANNOTE_LABEL,
)
from .json import PYANNOTE_JSON, PYANNOTE_JSON_CONTENT
from .segment import Segment, SlidingWindow
from .timeline import Timeline
from .feature import SlidingWindowFeature
from .utils.generators import string_generator, int_generator
from .utils.types import Label, Key, Support, LabelGenerator, TrackName, CropMode
if TYPE_CHECKING:
import pandas as pd
[docs]class Annotation:
"""Annotation
Parameters
----------
uri : string, optional
name of annotated resource (e.g. audio or video file)
modality : string, optional
name of annotated modality
Returns
-------
annotation : Annotation
New annotation
"""
@classmethod
def from_df(
cls,
df: "pd.DataFrame",
uri: Optional[str] = None,
modality: Optional[str] = None,
) -> "Annotation":
df = df[[PYANNOTE_SEGMENT, PYANNOTE_TRACK, PYANNOTE_LABEL]]
return Annotation.from_records(df.itertuples(index=False), uri, modality)
[docs] def __init__(self, uri: Optional[str] = None, modality: Optional[str] = None):
self._uri: Optional[str] = uri
self.modality: Optional[str] = modality
# sorted dictionary
# keys: annotated segments
# values: {track: label} dictionary
self._tracks: Dict[Segment, Dict[TrackName, Label]] = SortedDict()
# dictionary
# key: label
# value: timeline
self._labels: Dict[Label, Timeline] = {}
self._labelNeedsUpdate: [Label, bool] = {}
# timeline meant to store all annotated segments
self._timeline: Timeline = None
self._timelineNeedsUpdate: bool = True
@property
def uri(self):
return self._uri
@uri.setter
def uri(self, uri: str):
# update uri for all internal timelines
for label in self.labels():
timeline = self.label_timeline(label, copy=False)
timeline.uri = uri
timeline = self.get_timeline(copy=False)
timeline.uri = uri
self._uri = uri
def _updateLabels(self):
# list of labels that needs to be updated
update = set(
label for label, update in self._labelNeedsUpdate.items() if update
)
# accumulate segments for updated labels
_segments = {label: [] for label in update}
for segment, track, label in self.itertracks(yield_label=True):
if label in update:
_segments[label].append(segment)
# create timeline with accumulated segments for updated labels
for label in update:
if _segments[label]:
self._labels[label] = Timeline(segments=_segments[label], uri=self.uri)
self._labelNeedsUpdate[label] = False
else:
self._labels.pop(label, None)
self._labelNeedsUpdate.pop(label, None)
[docs] def __len__(self):
"""Number of segments
>>> len(annotation) # annotation contains three segments
3
"""
return len(self._tracks)
def __nonzero__(self):
return self.__bool__()
[docs] def __bool__(self):
"""Emptiness
>>> if annotation:
... # annotation is not empty
... else:
... # annotation is empty
"""
return len(self._tracks) > 0
[docs] def itersegments(self):
"""Iterate over segments (in chronological order)
>>> for segment in annotation.itersegments():
... # do something with the segment
See also
--------
:class:`pyannote.core.Segment` describes how segments are sorted.
"""
return iter(self._tracks)
[docs] def itertracks(
self, yield_label: bool = False
) -> Iterator[Union[Tuple[Segment, TrackName], Tuple[Segment, TrackName, Label]]]:
"""Iterate over tracks (in chronological order)
Parameters
----------
yield_label : bool, optional
When True, yield (segment, track, label) tuples, such that
annotation[segment, track] == label. Defaults to yielding
(segment, track) tuple.
Examples
--------
>>> for segment, track in annotation.itertracks():
... # do something with the track
>>> for segment, track, label in annotation.itertracks(yield_label=True):
... # do something with the track and its label
"""
for segment, tracks in self._tracks.items():
for track, lbl in sorted(
tracks.items(), key=lambda tl: (str(tl[0]), str(tl[1]))
):
if yield_label:
yield segment, track, lbl
else:
yield segment, track
def _updateTimeline(self):
self._timeline = Timeline(segments=self._tracks, uri=self.uri)
self._timelineNeedsUpdate = False
[docs] def get_timeline(self, copy: bool = True) -> Timeline:
"""Get timeline made of all annotated segments
Parameters
----------
copy : bool, optional
Defaults (True) to returning a copy of the internal timeline.
Set to False to return the actual internal timeline (faster).
Returns
-------
timeline : Timeline
Timeline made of all annotated segments.
Note
----
In case copy is set to False, be careful **not** to modify the returned
timeline, as it may lead to weird subsequent behavior of the annotation
instance.
"""
if self._timelineNeedsUpdate:
self._updateTimeline()
if copy:
return self._timeline.copy()
return self._timeline
[docs] def __eq__(self, other: "Annotation"):
"""Equality
>>> annotation == other
Two annotations are equal if and only if their tracks and associated
labels are equal.
"""
pairOfTracks = itertools.zip_longest(
self.itertracks(yield_label=True), other.itertracks(yield_label=True)
)
return all(t1 == t2 for t1, t2 in pairOfTracks)
[docs] def __ne__(self, other: "Annotation"):
"""Inequality"""
pairOfTracks = itertools.zip_longest(
self.itertracks(yield_label=True), other.itertracks(yield_label=True)
)
return any(t1 != t2 for t1, t2 in pairOfTracks)
[docs] def __contains__(self, included: Union[Segment, Timeline]):
"""Inclusion
Check whether every segment of `included` does exist in annotation.
Parameters
----------
included : Segment or Timeline
Segment or timeline being checked for inclusion
Returns
-------
contains : bool
True if every segment in `included` exists in timeline,
False otherwise
"""
return included in self.get_timeline(copy=False)
[docs] def write_rttm(self, file: TextIO):
"""Dump annotation to file using RTTM format
Parameters
----------
file : file object
Usage
-----
>>> with open('file.rttm', 'w') as file:
... annotation.write_rttm(file)
"""
uri = self.uri if self.uri else "<NA>"
if isinstance(uri, Text) and " " in uri:
msg = (
f"Space-separated RTTM file format does not allow file URIs "
f'containing spaces (got: "{uri}").'
)
raise ValueError(msg)
for segment, _, label in self.itertracks(yield_label=True):
if isinstance(label, Text) and " " in label:
msg = (
f"Space-separated RTTM file format does not allow labels "
f'containing spaces (got: "{label}").'
)
raise ValueError(msg)
line = (
f"SPEAKER {uri} 1 {segment.start:.3f} {segment.duration:.3f} "
f"<NA> <NA> {label} <NA> <NA>\n"
)
file.write(line)
[docs] def crop(self, support: Support, mode: CropMode = "intersection") -> "Annotation":
"""Crop annotation to new support
Parameters
----------
support : Segment or Timeline
If `support` is a `Timeline`, its support is used.
mode : {'strict', 'loose', 'intersection'}, optional
Controls how segments that are not fully included in `support` are
handled. 'strict' mode only keeps fully included segments. 'loose'
mode keeps any intersecting segment. 'intersection' mode keeps any
intersecting segment but replace them by their actual intersection.
Returns
-------
cropped : Annotation
Cropped annotation
Note
----
In 'intersection' mode, the best is done to keep the track names
unchanged. However, in some cases where two original segments are
cropped into the same resulting segments, conflicting track names are
modified to make sure no track is lost.
"""
# TODO speed things up by working directly with annotation internals
if isinstance(support, Segment):
support = Timeline(segments=[support], uri=self.uri)
return self.crop(support, mode=mode)
elif isinstance(support, Timeline):
# if 'support' is a `Timeline`, we use its support
support = support.support()
cropped = self.__class__(uri=self.uri, modality=self.modality)
if mode == "loose":
_tracks = {}
_labels = set([])
for segment, _ in self.get_timeline(copy=False).co_iter(support):
tracks = dict(self._tracks[segment])
_tracks[segment] = tracks
_labels.update(tracks.values())
cropped._tracks = SortedDict(_tracks)
cropped._labelNeedsUpdate = {label: True for label in _labels}
cropped._labels = {label: None for label in _labels}
cropped._timelineNeedsUpdate = True
cropped._timeline = None
return cropped
elif mode == "strict":
_tracks = {}
_labels = set([])
for segment, other_segment in self.get_timeline(copy=False).co_iter(
support
):
if segment not in other_segment:
continue
tracks = dict(self._tracks[segment])
_tracks[segment] = tracks
_labels.update(tracks.values())
cropped._tracks = SortedDict(_tracks)
cropped._labelNeedsUpdate = {label: True for label in _labels}
cropped._labels = {label: None for label in _labels}
cropped._timelineNeedsUpdate = True
cropped._timeline = None
return cropped
elif mode == "intersection":
for segment, other_segment in self.get_timeline(copy=False).co_iter(
support
):
intersection = segment & other_segment
for track, label in self._tracks[segment].items():
track = cropped.new_track(intersection, candidate=track)
cropped[intersection, track] = label
return cropped
else:
raise NotImplementedError("unsupported mode: '%s'" % mode)
[docs] def extrude(
self, removed: Support, mode: CropMode = "intersection"
) -> "Annotation":
"""Remove segments that overlap `removed` support.
A simple illustration:
annotation
A |------| |------|
B |----------|
C |--------------| |------|
removed `Timeline`
|-------| |-----------|
extruded Annotation with mode="intersection"
B |---|
C |--| |------|
extruded Annotation with mode="loose"
C |------|
extruded Annotation with mode="strict"
A |------|
B |----------|
C |--------------| |------|
Parameters
----------
removed : Segment or Timeline
If `support` is a `Timeline`, its support is used.
mode : {'strict', 'loose', 'intersection'}, optional
Controls how segments that are not fully included in `removed` are
handled. 'strict' mode only removes fully included segments. 'loose'
mode removes any intersecting segment. 'intersection' mode removes
the overlapping part of any intersecting segment.
Returns
-------
extruded : Annotation
Extruded annotation
Note
----
In 'intersection' mode, the best is done to keep the track names
unchanged. However, in some cases where two original segments are
cropped into the same resulting segments, conflicting track names are
modified to make sure no track is lost.
"""
if isinstance(removed, Segment):
removed = Timeline([removed])
extent_tl = Timeline([self.get_timeline().extent()], uri=self.uri)
truncating_support = removed.gaps(support=extent_tl)
# loose for truncate means strict for crop and vice-versa
if mode == "loose":
mode = "strict"
elif mode == "strict":
mode = "loose"
return self.crop(truncating_support, mode=mode)
[docs] def get_overlap(self, labels: Optional[Iterable[Label]] = None) -> "Timeline":
"""Get overlapping parts of the annotation.
A simple illustration:
annotation
A |------| |------| |----|
B |--| |-----| |----------|
C |--------------| |------|
annotation.get_overlap()
|------| |-----| |--------|
annotation.get_overlap(for_labels=["A", "B"])
|--| |--| |----|
Parameters
----------
labels : optional list of labels
Labels for which to consider the overlap
Returns
-------
overlap : `pyannote.core.Timeline`
Timeline of the overlaps.
"""
if labels:
annotation = self.subset(labels)
else:
annotation = self
overlaps_tl = Timeline(uri=annotation.uri)
for (s1, t1), (s2, t2) in annotation.co_iter(annotation):
# if labels are the same for the two segments, skipping
if self[s1, t1] == self[s2, t2]:
continue
overlaps_tl.add(s1 & s2)
return overlaps_tl.support()
[docs] def get_tracks(self, segment: Segment) -> Set[TrackName]:
"""Query tracks by segment
Parameters
----------
segment : Segment
Query
Returns
-------
tracks : set
Set of tracks
Note
----
This will return an empty set if segment does not exist.
"""
return set(self._tracks.get(segment, {}).keys())
[docs] def has_track(self, segment: Segment, track: TrackName) -> bool:
"""Check whether a given track exists
Parameters
----------
segment : Segment
Query segment
track :
Query track
Returns
-------
exists : bool
True if track exists for segment
"""
return track in self._tracks.get(segment, {})
[docs] def copy(self) -> "Annotation":
"""Get a copy of the annotation
Returns
-------
annotation : Annotation
Copy of the annotation
"""
# create new empty annotation
copied = self.__class__(uri=self.uri, modality=self.modality)
# deep copy internal track dictionary
_tracks, _labels = [], set([])
for key, value in self._tracks.items():
_labels.update(value.values())
_tracks.append((key, dict(value)))
copied._tracks = SortedDict(_tracks)
copied._labels = {label: None for label in _labels}
copied._labelNeedsUpdate = {label: True for label in _labels}
copied._timeline = None
copied._timelineNeedsUpdate = True
return copied
[docs] def new_track(
self,
segment: Segment,
candidate: Optional[TrackName] = None,
prefix: Optional[str] = None,
) -> TrackName:
"""Generate a new track name for given segment
Ensures that the returned track name does not already
exist for the given segment.
Parameters
----------
segment : Segment
Segment for which a new track name is generated.
candidate : any valid track name, optional
When provided, try this candidate name first.
prefix : str, optional
Track name prefix. Defaults to the empty string ''.
Returns
-------
name : str
New track name
"""
# obtain list of existing tracks for segment
existing_tracks = set(self._tracks.get(segment, {}))
# if candidate is provided, check whether it already exists
# in case it does not, use it
if (candidate is not None) and (candidate not in existing_tracks):
return candidate
# no candidate was provided or the provided candidate already exists
# we need to create a brand new one
# by default (if prefix is not provided), use ''
if prefix is None:
prefix = ""
# find first non-existing track name for segment
# eg. if '0' exists, try '1', then '2', ...
count = 0
while ("%s%d" % (prefix, count)) in existing_tracks:
count += 1
# return first non-existing track name
return "%s%d" % (prefix, count)
[docs] def __str__(self):
"""Human-friendly representation"""
# TODO: use pandas.DataFrame
return "\n".join(
["%s %s %s" % (s, t, l) for s, t, l in self.itertracks(yield_label=True)]
)
[docs] def __delitem__(self, key: Key):
"""Delete one track
>>> del annotation[segment, track]
Delete all tracks of a segment
>>> del annotation[segment]
"""
# del annotation[segment]
if isinstance(key, Segment):
# Pop segment out of dictionary
# and get corresponding tracks
# Raises KeyError if segment does not exist
tracks = self._tracks.pop(key)
# mark timeline as modified
self._timelineNeedsUpdate = True
# mark every label in tracks as modified
for track, label in tracks.items():
self._labelNeedsUpdate[label] = True
# del annotation[segment, track]
elif isinstance(key, tuple) and len(key) == 2:
# get segment tracks as dictionary
# if segment does not exist, get empty dictionary
# Raises KeyError if segment does not exist
tracks = self._tracks[key[0]]
# pop track out of tracks dictionary
# and get corresponding label
# Raises KeyError if track does not exist
label = tracks.pop(key[1])
# mark label as modified
self._labelNeedsUpdate[label] = True
# if tracks dictionary is now empty,
# remove segment as well
if not tracks:
self._tracks.pop(key[0])
self._timelineNeedsUpdate = True
else:
raise NotImplementedError(
"Deletion only works with Segment or (Segment, track) keys."
)
# label = annotation[segment, track]
[docs] def __getitem__(self, key: Key) -> Label:
"""Get track label
>>> label = annotation[segment, track]
Note
----
``annotation[segment]`` is equivalent to ``annotation[segment, '_']``
"""
if isinstance(key, Segment):
key = (key, "_")
return self._tracks[key[0]][key[1]]
# annotation[segment, track] = label
[docs] def __setitem__(self, key: Key, label: Label):
"""Add new or update existing track
>>> annotation[segment, track] = label
If (segment, track) does not exist, it is added.
If (segment, track) already exists, it is updated.
Note
----
``annotation[segment] = label`` is equivalent to ``annotation[segment, '_'] = label``
Note
----
If `segment` is empty, it does nothing.
"""
if isinstance(key, Segment):
key = (key, "_")
segment, track = key
# do not add empty track
if not segment:
return
# in case we create a new segment
# mark timeline as modified
if segment not in self._tracks:
self._tracks[segment] = {}
self._timelineNeedsUpdate = True
# in case we modify an existing track
# mark old label as modified
if track in self._tracks[segment]:
old_label = self._tracks[segment][track]
self._labelNeedsUpdate[old_label] = True
# mark new label as modified
self._tracks[segment][track] = label
self._labelNeedsUpdate[label] = True
[docs] def empty(self) -> "Annotation":
"""Return an empty copy
Returns
-------
empty : Annotation
Empty annotation using the same 'uri' and 'modality' attributes.
"""
return self.__class__(uri=self.uri, modality=self.modality)
[docs] def labels(self) -> List[Label]:
"""Get sorted list of labels
Returns
-------
labels : list
Sorted list of labels
"""
if any([lnu for lnu in self._labelNeedsUpdate.values()]):
self._updateLabels()
return sorted(self._labels, key=str)
[docs] def get_labels(
self, segment: Segment, unique: bool = True
) -> Union[Set[Label], List[Label]]:
"""Query labels by segment
Parameters
----------
segment : Segment
Query
unique : bool, optional
When False, return the list of (possibly repeated) labels.
Defaults to returning the set of labels.
Returns
-------
labels : set or list
Set (resp. list) of labels for `segment` if it exists, empty set (resp. list) otherwise
if unique (resp. if not unique).
Examples
--------
>>> annotation = Annotation()
>>> segment = Segment(0, 2)
>>> annotation[segment, 'speaker1'] = 'Bernard'
>>> annotation[segment, 'speaker2'] = 'John'
>>> print sorted(annotation.get_labels(segment))
set(['Bernard', 'John'])
>>> print annotation.get_labels(Segment(1, 2))
set([])
"""
labels = self._tracks.get(segment, {}).values()
if unique:
return set(labels)
return list(labels)
[docs] def subset(self, labels: Iterable[Label], invert: bool = False) -> "Annotation":
"""Filter annotation by labels
Parameters
----------
labels : iterable
List of filtered labels
invert : bool, optional
If invert is True, extract all but requested labels
Returns
-------
filtered : Annotation
Filtered annotation
"""
labels = set(labels)
if invert:
labels = set(self.labels()) - labels
else:
labels = labels & set(self.labels())
sub = self.__class__(uri=self.uri, modality=self.modality)
_tracks, _labels = {}, set([])
for segment, tracks in self._tracks.items():
sub_tracks = {
track: label for track, label in tracks.items() if label in labels
}
if sub_tracks:
_tracks[segment] = sub_tracks
_labels.update(sub_tracks.values())
sub._tracks = SortedDict(_tracks)
sub._labelNeedsUpdate = {label: True for label in _labels}
sub._labels = {label: None for label in _labels}
sub._timelineNeedsUpdate = True
sub._timeline = None
return sub
[docs] def update(self, annotation: "Annotation", copy: bool = False) -> "Annotation":
"""Add every track of an existing annotation (in place)
Parameters
----------
annotation : Annotation
Annotation whose tracks are being added
copy : bool, optional
Return a copy of the annotation. Defaults to updating the
annotation in-place.
Returns
-------
self : Annotation
Updated annotation
Note
----
Existing tracks are updated with the new label.
"""
result = self.copy() if copy else self
# TODO speed things up by working directly with annotation internals
for segment, track, label in annotation.itertracks(yield_label=True):
result[segment, track] = label
return result
[docs] def label_timeline(self, label: Label, copy: bool = True) -> Timeline:
"""Query segments by label
Parameters
----------
label : object
Query
copy : bool, optional
Defaults (True) to returning a copy of the internal timeline.
Set to False to return the actual internal timeline (faster).
Returns
-------
timeline : Timeline
Timeline made of all segments for which at least one track is
annotated as label
Note
----
If label does not exist, this will return an empty timeline.
Note
----
In case copy is set to False, be careful **not** to modify the returned
timeline, as it may lead to weird subsequent behavior of the annotation
instance.
"""
if label not in self.labels():
return Timeline(uri=self.uri)
if self._labelNeedsUpdate[label]:
self._updateLabels()
if copy:
return self._labels[label].copy()
return self._labels[label]
[docs] def label_support(self, label: Label) -> Timeline:
"""Label support
Equivalent to ``Annotation.label_timeline(label).support()``
Parameters
----------
label : object
Query
Returns
-------
support : Timeline
Label support
See also
--------
:func:`~pyannote.core.Annotation.label_timeline`
:func:`~pyannote.core.Timeline.support`
"""
return self.label_timeline(label, copy=False).support()
[docs] def label_duration(self, label: Label) -> float:
"""Label duration
Equivalent to ``Annotation.label_timeline(label).duration()``
Parameters
----------
label : object
Query
Returns
-------
duration : float
Duration, in seconds.
See also
--------
:func:`~pyannote.core.Annotation.label_timeline`
:func:`~pyannote.core.Timeline.duration`
"""
return self.label_timeline(label, copy=False).duration()
[docs] def chart(self, percent: bool = False) -> List[Tuple[Label, float]]:
"""Get labels chart (from longest to shortest duration)
Parameters
----------
percent : bool, optional
Return list of (label, percentage) tuples.
Defaults to returning list of (label, duration) tuples.
Returns
-------
chart : list
List of (label, duration), sorted by duration in decreasing order.
"""
chart = sorted(
((L, self.label_duration(L)) for L in self.labels()),
key=lambda x: x[1],
reverse=True,
)
if percent:
total = np.sum([duration for _, duration in chart])
chart = [(label, duration / total) for (label, duration) in chart]
return chart
[docs] def argmax(self, support: Optional[Support] = None) -> Optional[Label]:
"""Get label with longest duration
Parameters
----------
support : Segment or Timeline, optional
Find label with longest duration within provided support.
Defaults to whole extent.
Returns
-------
label : any existing label or None
Label with longest intersection
Examples
--------
>>> annotation = Annotation(modality='speaker')
>>> annotation[Segment(0, 10), 'speaker1'] = 'Alice'
>>> annotation[Segment(8, 20), 'speaker1'] = 'Bob'
>>> print "%s is such a talker!" % annotation.argmax()
Bob is such a talker!
>>> segment = Segment(22, 23)
>>> if not annotation.argmax(support):
... print "No label intersecting %s" % segment
No label intersection [22 --> 23]
"""
cropped = self
if support is not None:
cropped = cropped.crop(support, mode="intersection")
if not cropped:
return None
return max(
((_, cropped.label_duration(_)) for _ in cropped.labels()),
key=lambda x: x[1],
)[0]
[docs] def rename_tracks(self, generator: LabelGenerator = "string") -> "Annotation":
"""Rename all tracks
Parameters
----------
generator : 'string', 'int', or iterable, optional
If 'string' (default) rename tracks to 'A', 'B', 'C', etc.
If 'int', rename tracks to 0, 1, 2, etc.
If iterable, use it to generate track names.
Returns
-------
renamed : Annotation
Copy of the original annotation where tracks are renamed.
Example
-------
>>> annotation = Annotation()
>>> annotation[Segment(0, 1), 'a'] = 'a'
>>> annotation[Segment(0, 1), 'b'] = 'b'
>>> annotation[Segment(1, 2), 'a'] = 'a'
>>> annotation[Segment(1, 3), 'c'] = 'c'
>>> print(annotation)
[ 00:00:00.000 --> 00:00:01.000] a a
[ 00:00:00.000 --> 00:00:01.000] b b
[ 00:00:01.000 --> 00:00:02.000] a a
[ 00:00:01.000 --> 00:00:03.000] c c
>>> print(annotation.rename_tracks(generator='int'))
[ 00:00:00.000 --> 00:00:01.000] 0 a
[ 00:00:00.000 --> 00:00:01.000] 1 b
[ 00:00:01.000 --> 00:00:02.000] 2 a
[ 00:00:01.000 --> 00:00:03.000] 3 c
"""
renamed = self.__class__(uri=self.uri, modality=self.modality)
if generator == "string":
generator = string_generator()
elif generator == "int":
generator = int_generator()
# TODO speed things up by working directly with annotation internals
for s, _, label in self.itertracks(yield_label=True):
renamed[s, next(generator)] = label
return renamed
[docs] def rename_labels(
self,
mapping: Optional[Dict] = None,
generator: LabelGenerator = "string",
copy: bool = True,
) -> "Annotation":
"""Rename labels
Parameters
----------
mapping : dict, optional
{old_name: new_name} mapping dictionary.
generator : 'string', 'int' or iterable, optional
If 'string' (default) rename label to 'A', 'B', 'C', ... If 'int',
rename to 0, 1, 2, etc. If iterable, use it to generate labels.
copy : bool, optional
Set to True to return a copy of the annotation. Set to False to
update the annotation in-place. Defaults to True.
Returns
-------
renamed : Annotation
Annotation where labels have been renamed
Note
----
Unmapped labels are kept unchanged.
Note
----
Parameter `generator` has no effect when `mapping` is provided.
"""
if mapping is None:
if generator == "string":
generator = string_generator()
elif generator == "int":
generator = int_generator()
# generate mapping
mapping = {label: next(generator) for label in self.labels()}
renamed = self.copy() if copy else self
for old_label, new_label in mapping.items():
renamed._labelNeedsUpdate[old_label] = True
renamed._labelNeedsUpdate[new_label] = True
for segment, tracks in self._tracks.items():
new_tracks = {
track: mapping.get(label, label) for track, label in tracks.items()
}
renamed._tracks[segment] = new_tracks
return renamed
[docs] def relabel_tracks(self, generator: LabelGenerator = "string") -> "Annotation":
"""Relabel tracks
Create a new annotation where each track has a unique label.
Parameters
----------
generator : 'string', 'int' or iterable, optional
If 'string' (default) relabel tracks to 'A', 'B', 'C', ... If 'int'
relabel to 0, 1, 2, ... If iterable, use it to generate labels.
Returns
-------
renamed : Annotation
New annotation with relabeled tracks.
"""
if generator == "string":
generator = string_generator()
elif generator == "int":
generator = int_generator()
relabeled = self.empty()
for s, t, _ in self.itertracks(yield_label=True):
relabeled[s, t] = next(generator)
return relabeled
[docs] def support(self, collar: float = 0.0) -> "Annotation":
"""Annotation support
The support of an annotation is an annotation where contiguous tracks
with same label are merged into one unique covering track.
A picture is worth a thousand words::
collar
|---|
annotation
|--A--| |--A--| |-B-|
|-B-| |--C--| |----B-----|
annotation.support(collar)
|------A------| |------B------|
|-B-| |--C--|
Parameters
----------
collar : float, optional
Merge tracks with same label and separated by less than `collar`
seconds. This is why 'A' tracks are merged in above figure.
Defaults to 0.
Returns
-------
support : Annotation
Annotation support
Note
----
Track names are lost in the process.
"""
generator = string_generator()
# initialize an empty annotation
# with same uri and modality as original
support = self.empty()
for label in self.labels():
# get timeline for current label
timeline = self.label_timeline(label, copy=True)
# fill the gaps shorter than collar
timeline = timeline.support(collar)
# reconstruct annotation with merged tracks
for segment in timeline.support():
support[segment, next(generator)] = label
return support
[docs] def co_iter(
self, other: "Annotation"
) -> Iterator[Tuple[Tuple[Segment, TrackName], Tuple[Segment, TrackName]]]:
"""Iterate over pairs of intersecting tracks
Parameters
----------
other : Annotation
Second annotation
Returns
-------
iterable : (Segment, object), (Segment, object) iterable
Yields pairs of intersecting tracks, in chronological (then
alphabetical) order.
See also
--------
:func:`~pyannote.core.Timeline.co_iter`
"""
timeline = self.get_timeline(copy=False)
other_timeline = other.get_timeline(copy=False)
for s, S in timeline.co_iter(other_timeline):
tracks = sorted(self.get_tracks(s), key=str)
other_tracks = sorted(other.get_tracks(S), key=str)
for t, T in itertools.product(tracks, other_tracks):
yield (s, t), (S, T)
[docs] def __mul__(self, other: "Annotation") -> np.ndarray:
"""Cooccurrence (or confusion) matrix
>>> matrix = annotation * other
Parameters
----------
other : Annotation
Second annotation
Returns
-------
cooccurrence : (n_self, n_other) np.ndarray
Cooccurrence matrix where `n_self` (resp. `n_other`) is the number
of labels in `self` (resp. `other`).
"""
if not isinstance(other, Annotation):
raise TypeError(
"computing cooccurrence matrix only works with Annotation " "instances."
)
i_labels = self.labels()
j_labels = other.labels()
I = {label: i for i, label in enumerate(i_labels)}
J = {label: j for j, label in enumerate(j_labels)}
matrix = np.zeros((len(I), len(J)))
# iterate over intersecting tracks and accumulate durations
for (segment, track), (other_segment, other_track) in self.co_iter(other):
i = I[self[segment, track]]
j = J[other[other_segment, other_track]]
duration = (segment & other_segment).duration
matrix[i, j] += duration
return matrix
[docs] def discretize(
self,
support: Optional[Segment] = None,
resolution: Union[float, SlidingWindow] = 0.01,
labels: Optional[List[Hashable]] = None,
duration: Optional[float] = None,
):
"""Discretize
Parameters
----------
support : Segment, optional
Part of annotation to discretize.
Defaults to annotation full extent.
resolution : float or SlidingWindow, optional
Defaults to 10ms frames.
labels : list of labels, optional
Defaults to self.labels()
duration : float, optional
Overrides support duration and ensures that the number of
returned frames is fixed (which might otherwise not be the case
because of rounding errors).
Returns
-------
discretized : SlidingWindowFeature
(num_frames, num_labels)-shaped binary features.
"""
if support is None:
support = self.get_timeline().extent()
start_time, end_time = support
cropped = self.crop(support, mode="intersection")
if labels is None:
labels = cropped.labels()
if isinstance(resolution, SlidingWindow):
resolution = SlidingWindow(
start=start_time, step=resolution.step, duration=resolution.duration
)
else:
resolution = SlidingWindow(
start=start_time, step=resolution, duration=resolution
)
start_frame = resolution.closest_frame(start_time)
if duration is None:
end_frame = resolution.closest_frame(end_time)
num_frames = end_frame - start_frame
else:
num_frames = int(round(duration / resolution.step))
data = np.zeros((num_frames, len(labels)), dtype=np.uint8)
for k, label in enumerate(labels):
segments = cropped.label_timeline(label)
for start, stop in resolution.crop(
segments, mode="center", return_ranges=True
):
data[max(0, start) : min(stop, num_frames), k] += 1
data = np.minimum(data, 1, out=data)
return SlidingWindowFeature(data, resolution, labels=labels)
[docs] def for_json(self) -> Dict:
"""Serialization
See also
--------
:mod:`pyannote.core.json`
"""
data = {PYANNOTE_JSON: self.__class__.__name__}
content = [
{PYANNOTE_SEGMENT: s.for_json(), PYANNOTE_TRACK: t, PYANNOTE_LABEL: l}
for s, t, l in self.itertracks(yield_label=True)
]
data[PYANNOTE_JSON_CONTENT] = content
if self.uri:
data[PYANNOTE_URI] = self.uri
if self.modality:
data[PYANNOTE_MODALITY] = self.modality
return data
[docs] @classmethod
def from_json(cls, data: Dict) -> "Annotation":
"""Deserialization
See also
--------
:mod:`pyannote.core.json`
"""
uri = data.get(PYANNOTE_URI, None)
modality = data.get(PYANNOTE_MODALITY, None)
records = []
for record_dict in data[PYANNOTE_JSON_CONTENT]:
segment = Segment.from_json(record_dict[PYANNOTE_SEGMENT])
track = record_dict[PYANNOTE_TRACK]
label = record_dict[PYANNOTE_LABEL]
records.append((segment, track, label))
return Annotation.from_records(records, uri, modality)
[docs] @classmethod
def from_records(
cls,
records: Iterator[Tuple[Segment, TrackName, Label]],
uri: Optional[str] = None,
modality: Optional[str] = None,
) -> "Annotation":
"""Annotation
Parameters
----------
records : iterator of tuples
(segment, track, label) tuples
uri : string, optional
name of annotated resource (e.g. audio or video file)
modality : string, optional
name of annotated modality
Returns
-------
annotation : Annotation
New annotation
"""
annotation = cls(uri=uri, modality=modality)
tracks = defaultdict(dict)
labels = set()
for segment, track, label in records:
tracks[segment][track] = label
labels.add(label)
annotation._tracks = SortedDict(tracks)
annotation._labels = {label: None for label in labels}
annotation._labelNeedsUpdate = {label: True for label in annotation._labels}
annotation._timeline = None
annotation._timelineNeedsUpdate = True
return annotation
def _repr_png_(self):
"""IPython notebook support
See also
--------
:mod:`pyannote.core.notebook`
"""
from .notebook import MATPLOTLIB_IS_AVAILABLE, MATPLOTLIB_WARNING
if not MATPLOTLIB_IS_AVAILABLE:
warnings.warn(MATPLOTLIB_WARNING.format(klass=self.__class__.__name__))
return None
from .notebook import repr_annotation
return repr_annotation(self)