Source code for pyannote.metrics.diarization

#!/usr/bin/env python
# encoding: utf-8

# The MIT License (MIT)

# Copyright (c) 2012-2019 CNRS

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# AUTHORS
# Hervé BREDIN - http://herve.niderb.fr

"""Metrics for diarization"""

import numpy as np

from .matcher import HungarianMapper
from .matcher import GreedyMapper

from .base import BaseMetric, f_measure
from .utils import UEMSupportMixin
from .identification import IdentificationErrorRate

DER_NAME = 'diarization error rate'


[docs]class DiarizationErrorRate(IdentificationErrorRate): """Diarization error rate First, the optimal mapping between reference and hypothesis labels is obtained using the Hungarian algorithm. Then, the actual diarization error rate is computed as the identification error rate with each hypothesis label translated into the corresponding reference label. Parameters ---------- collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). Usage ----- * Diarization error rate between `reference` and `hypothesis` annotations >>> metric = DiarizationErrorRate() >>> reference = Annotation(...) # doctest: +SKIP >>> hypothesis = Annotation(...) # doctest: +SKIP >>> value = metric(reference, hypothesis) # doctest: +SKIP * Compute global diarization error rate and confidence interval over multiple documents >>> for reference, hypothesis in ... # doctest: +SKIP ... metric(reference, hypothesis) # doctest: +SKIP >>> global_value = abs(metric) # doctest: +SKIP >>> mean, (lower, upper) = metric.confidence_interval() # doctest: +SKIP * Get diarization error rate detailed components >>> components = metric(reference, hypothesis, detailed=True) #doctest +SKIP * Get accumulated components >>> components = metric[:] # doctest: +SKIP >>> metric['confusion'] # doctest: +SKIP See Also -------- :class:`pyannote.metric.base.BaseMetric`: details on accumulation :class:`pyannote.metric.identification.IdentificationErrorRate`: identification error rate """ @classmethod def metric_name(cls): return DER_NAME def __init__(self, collar=0.0, skip_overlap=False, **kwargs): super(DiarizationErrorRate, self).__init__( collar=collar, skip_overlap=skip_overlap, **kwargs) self.mapper_ = HungarianMapper()
[docs] def optimal_mapping(self, reference, hypothesis, uem=None): """Optimal label mapping Parameters ---------- reference : Annotation hypothesis : Annotation Reference and hypothesis diarization uem : Timeline Evaluation map Returns ------- mapping : dict Mapping between hypothesis (key) and reference (value) labels """ # NOTE that this 'uemification' will not be called when # 'optimal_mapping' is called from 'compute_components' as it # has already been done in 'compute_components' if uem: reference, hypothesis = self.uemify(reference, hypothesis, uem=uem) # call hungarian mapper return self.mapper_(hypothesis, reference)
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): # crop reference and hypothesis to evaluated regions (uem) # remove collars around reference segment boundaries # remove overlap regions (if requested) reference, hypothesis, uem = self.uemify( reference, hypothesis, uem=uem, collar=self.collar, skip_overlap=self.skip_overlap, returns_uem=True) # NOTE that this 'uemification' must be done here because it # might have an impact on the search for the optimal mapping. # make sure reference only contains string labels ('A', 'B', ...) reference = reference.rename_labels(generator='string') # make sure hypothesis only contains integer labels (1, 2, ...) hypothesis = hypothesis.rename_labels(generator='int') # optimal (int --> str) mapping mapping = self.optimal_mapping(reference, hypothesis) # compute identification error rate based on mapped hypothesis # NOTE that collar is set to 0.0 because 'uemify' has already # been applied (same reason for setting skip_overlap to False) mapped = hypothesis.rename_labels(mapping=mapping) return super(DiarizationErrorRate, self)\ .compute_components(reference, mapped, uem=uem, collar=0.0, skip_overlap=False, **kwargs)
[docs]class GreedyDiarizationErrorRate(IdentificationErrorRate): """Greedy diarization error rate First, the greedy mapping between reference and hypothesis labels is obtained. Then, the actual diarization error rate is computed as the identification error rate with each hypothesis label translated into the corresponding reference label. Parameters ---------- collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). Usage ----- * Greedy diarization error rate between `reference` and `hypothesis` annotations >>> metric = GreedyDiarizationErrorRate() >>> reference = Annotation(...) # doctest: +SKIP >>> hypothesis = Annotation(...) # doctest: +SKIP >>> value = metric(reference, hypothesis) # doctest: +SKIP * Compute global greedy diarization error rate and confidence interval over multiple documents >>> for reference, hypothesis in ... # doctest: +SKIP ... metric(reference, hypothesis) # doctest: +SKIP >>> global_value = abs(metric) # doctest: +SKIP >>> mean, (lower, upper) = metric.confidence_interval() # doctest: +SKIP * Get greedy diarization error rate detailed components >>> components = metric(reference, hypothesis, detailed=True) #doctest +SKIP * Get accumulated components >>> components = metric[:] # doctest: +SKIP >>> metric['confusion'] # doctest: +SKIP See Also -------- :class:`pyannote.metric.base.BaseMetric`: details on accumulation """ @classmethod def metric_name(cls): return DER_NAME def __init__(self, collar=0.0, skip_overlap=False, **kwargs): super(GreedyDiarizationErrorRate, self).__init__( collar=collar, skip_overlap=skip_overlap, **kwargs) self.mapper_ = GreedyMapper()
[docs] def greedy_mapping(self, reference, hypothesis, uem=None): """Greedy label mapping Parameters ---------- reference : Annotation hypothesis : Annotation Reference and hypothesis diarization uem : Timeline Evaluation map Returns ------- mapping : dict Mapping between hypothesis (key) and reference (value) labels """ if uem: reference, hypothesis = self.uemify(reference, hypothesis, uem=uem) return self.mapper_(hypothesis, reference)
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): # crop reference and hypothesis to evaluated regions (uem) # remove collars around reference segment boundaries # remove overlap regions (if requested) reference, hypothesis, uem = self.uemify( reference, hypothesis, uem=uem, collar=self.collar, skip_overlap=self.skip_overlap, returns_uem=True) # NOTE that this 'uemification' must be done here because it # might have an impact on the search for the greedy mapping. # make sure reference only contains string labels ('A', 'B', ...) reference = reference.rename_labels(generator='string') # make sure hypothesis only contains integer labels (1, 2, ...) hypothesis = hypothesis.rename_labels(generator='int') # greedy (int --> str) mapping mapping = self.greedy_mapping(reference, hypothesis) # compute identification error rate based on mapped hypothesis # NOTE that collar is set to 0.0 because 'uemify' has already # been applied (same reason for setting skip_overlap to False) mapped = hypothesis.rename_labels(mapping=mapping) return super(GreedyDiarizationErrorRate, self)\ .compute_components(reference, mapped, uem=uem, collar=0.0, skip_overlap=False, **kwargs)
JER_NAME = 'jaccard error rate' JER_SPEAKER_ERROR = 'speaker error' JER_SPEAKER_COUNT = 'speaker count'
[docs]class JaccardErrorRate(DiarizationErrorRate): """Jaccard error rate Reference --------- Second DIHARD Challenge Evaluation Plan. Version 1.1 N. Ryant, K. Church, C. Cieri, A. Cristia, J. Du, S. Ganapathy, M. Liberman https://coml.lscp.ens.fr/dihard/2019/second_dihard_eval_plan_v1.1.pdf "The Jaccard error rate is based on the Jaccard index, a similarity measure used to evaluate the output of image segmentation systems. An optimal mapping between reference and system speakers is determined and for each pair the Jaccard index is computed. The Jaccard error rate is then defined as 1 minus the average of these scores. While similar to DER, it weights every speaker’s contribution equally, regardless of how much speech they actually produced. More concretely, assume we have N reference speakers and M system speakers. An optimal mapping between speakers is determined using the Hungarian algorithm so that each reference speaker is paired with at most one system speaker and each system speaker with at most one reference speaker. Then, for each reference speaker ref the speaker-specific Jaccard error rate JERref is computed as JERref = (FA + MISS) / TOTAL where * TOTAL is the duration of the union of reference and system speaker segments; if the reference speaker was not paired with a system speaker, it is the duration of all reference speaker segments * FA is the total system speaker time not attributed to the reference speaker; if the reference speaker was not paired with a system speaker, it is 0 * MISS is the total reference speaker time not attributed to the system speaker; if the reference speaker was not paired with a system speaker, it is equal to TOTAL The Jaccard error rate then is the average of the speaker specific Jaccard error rates. JER and DER are highly correlated with JER typically being higher, especially in recordings where one or more speakers is particularly dominant. Where it tends to track DER is in outliers where the diarization is especially bad, resulting in one or more unmapped system speakers whose speech is not then penalized. In these cases, where DER can easily exceed 500%, JER will never exceed 100% and may be far lower if the reference speakers are handled correctly." Parameters ---------- collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). Usage ----- >>> metric = JaccardErrorRate() >>> reference = Annotation(...) # doctest: +SKIP >>> hypothesis = Annotation(...) # doctest: +SKIP >>> jer = metric(reference, hypothesis) # doctest: +SKIP """ @classmethod def metric_name(cls): return JER_NAME @classmethod def metric_components(cls): return [ JER_SPEAKER_COUNT, JER_SPEAKER_ERROR, ] def __init__(self, collar=0.0, skip_overlap=False, **kwargs): super().__init__( collar=collar, skip_overlap=skip_overlap, **kwargs) self.mapper_ = HungarianMapper()
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): # crop reference and hypothesis to evaluated regions (uem) # remove collars around reference segment boundaries # remove overlap regions (if requested) reference, hypothesis, uem = self.uemify( reference, hypothesis, uem=uem, collar=self.collar, skip_overlap=self.skip_overlap, returns_uem=True) # NOTE that this 'uemification' must be done here because it # might have an impact on the search for the optimal mapping. # make sure reference only contains string labels ('A', 'B', ...) reference = reference.rename_labels(generator='string') # make sure hypothesis only contains integer labels (1, 2, ...) hypothesis = hypothesis.rename_labels(generator='int') # optimal (str --> int) mapping mapping = self.optimal_mapping(hypothesis, reference) detail = self.init_components() for ref_speaker in reference.labels(): hyp_speaker = mapping.get(ref_speaker, None) if hyp_speaker is None: # if the reference speaker was not paired with a system speaker # [total] is the duration of all reference speaker segments # if the reference speaker was not paired with a system speaker # [fa] is 0 # if the reference speaker was not paired with a system speaker # [miss] is equal to total # overall: jer = (fa + miss) / total = (0 + total) / total = 1 jer = 1. else: # total is the duration of the union of reference and system # speaker segments r = reference.label_timeline(ref_speaker) h = hypothesis.label_timeline(hyp_speaker) total = r.union(h).support().duration() # fa is the total system speaker time not attributed to the # reference speaker fa = h.duration() - h.crop(r).duration() # miss is the total reference speaker time not attributed to # the system speaker miss = r.duration() - r.crop(h).duration() jer = (fa + miss) / total detail[JER_SPEAKER_COUNT] += 1 detail[JER_SPEAKER_ERROR] += jer return detail
[docs] def compute_metric(self, detail): return detail[JER_SPEAKER_ERROR] / detail[JER_SPEAKER_COUNT]
PURITY_NAME = 'purity' PURITY_TOTAL = 'total' PURITY_CORRECT = 'correct'
[docs]class DiarizationPurity(UEMSupportMixin, BaseMetric): """Cluster purity A hypothesized annotation has perfect purity if all of its labels overlap only segments which are members of a single reference label. Parameters ---------- weighted : bool, optional When True (default), each cluster is weighted by its overall duration. collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). """ @classmethod def metric_name(cls): return PURITY_NAME @classmethod def metric_components(cls): return [PURITY_TOTAL, PURITY_CORRECT] def __init__(self, collar=0.0, skip_overlap=False, weighted=True, **kwargs): super(DiarizationPurity, self).__init__(**kwargs) self.weighted = weighted self.collar = collar self.skip_overlap = skip_overlap
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): detail = self.init_components() # crop reference and hypothesis to evaluated regions (uem) reference, hypothesis = self.uemify( reference, hypothesis, uem=uem, collar=self.collar, skip_overlap=self.skip_overlap) if not reference: return detail # cooccurrence matrix matrix = reference * hypothesis # duration of largest class in each cluster largest = matrix.max(axis=0) duration = matrix.sum(axis=0) if self.weighted: detail[PURITY_CORRECT] = 0. if np.prod(matrix.shape): detail[PURITY_CORRECT] = largest.sum() detail[PURITY_TOTAL] = duration.sum() else: detail[PURITY_CORRECT] = (largest / duration).sum() detail[PURITY_TOTAL] = len(largest) return detail
[docs] def compute_metric(self, detail): if detail[PURITY_TOTAL] > 0.: return detail[PURITY_CORRECT] / detail[PURITY_TOTAL] return 1.
COVERAGE_NAME = 'coverage'
[docs]class DiarizationCoverage(DiarizationPurity): """Cluster coverage A hypothesized annotation has perfect coverage if all segments from a given reference label are clustered in the same cluster. Parameters ---------- weighted : bool, optional When True (default), each cluster is weighted by its overall duration. collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). """ @classmethod def metric_name(cls): return COVERAGE_NAME def __init__(self, collar=0.0, skip_overlap=False, weighted=True, **kwargs): super(DiarizationCoverage, self).__init__( collar=collar, skip_overlap=skip_overlap, weighted=weighted, **kwargs)
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): return super(DiarizationCoverage, self)\ .compute_components(hypothesis, reference, uem=uem, **kwargs)
PURITY_COVERAGE_NAME = 'F[purity|coverage]' PURITY_COVERAGE_LARGEST_CLASS = 'largest_class' PURITY_COVERAGE_TOTAL_CLUSTER = 'total_cluster' PURITY_COVERAGE_LARGEST_CLUSTER = 'largest_cluster' PURITY_COVERAGE_TOTAL_CLASS = 'total_class'
[docs]class DiarizationPurityCoverageFMeasure(UEMSupportMixin, BaseMetric): """Compute diarization purity and coverage, and return their F-score. Parameters ---------- weighted : bool, optional When True (default), each cluster/class is weighted by its overall duration. collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). beta : float, optional When beta > 1, greater importance is given to coverage. When beta < 1, greater importance is given to purity. Defaults to 1. See also -------- pyannote.metrics.diarization.DiarizationPurity pyannote.metrics.diarization.DiarizationCoverage pyannote.metrics.base.f_measure """ @classmethod def metric_name(cls): return PURITY_COVERAGE_NAME @classmethod def metric_components(cls): return [PURITY_COVERAGE_LARGEST_CLASS, PURITY_COVERAGE_TOTAL_CLUSTER, PURITY_COVERAGE_LARGEST_CLUSTER, PURITY_COVERAGE_TOTAL_CLASS] def __init__(self, collar=0.0, skip_overlap=False, weighted=True, beta=1., **kwargs): super(DiarizationPurityCoverageFMeasure, self).__init__(**kwargs) self.collar = collar self.skip_overlap = skip_overlap self.weighted = weighted self.beta = beta
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): detail = self.init_components() # crop reference and hypothesis to evaluated regions (uem) reference, hypothesis = self.uemify( reference, hypothesis, uem=uem, collar=self.collar, skip_overlap=self.skip_overlap) # cooccurrence matrix matrix = reference * hypothesis # duration of largest class in each cluster largest_class = matrix.max(axis=0) # duration of clusters duration_cluster = matrix.sum(axis=0) # duration of largest cluster in each class largest_cluster = matrix.max(axis=1) # duration of classes duration_class = matrix.sum(axis=1) if self.weighted: # compute purity components detail[PURITY_COVERAGE_LARGEST_CLASS] = 0. if np.prod(matrix.shape): detail[PURITY_COVERAGE_LARGEST_CLASS] = largest_class.sum() detail[PURITY_COVERAGE_TOTAL_CLUSTER] = duration_cluster.sum() # compute coverage components detail[PURITY_COVERAGE_LARGEST_CLUSTER] = 0. if np.prod(matrix.shape): detail[PURITY_COVERAGE_LARGEST_CLUSTER] = largest_cluster.sum() detail[PURITY_COVERAGE_TOTAL_CLASS] = duration_class.sum() else: # compute purity components detail[PURITY_COVERAGE_LARGEST_CLASS] = (largest_class / duration_cluster).sum() detail[PURITY_COVERAGE_TOTAL_CLUSTER] = len(largest_class) # compute coverage components detail[PURITY_COVERAGE_LARGEST_CLUSTER] = (largest_cluster / duration_class).sum() detail[PURITY_COVERAGE_TOTAL_CLASS] = len(largest_cluster) # compute purity detail[PURITY_NAME] = \ 1. if detail[PURITY_COVERAGE_TOTAL_CLUSTER] == 0. \ else detail[PURITY_COVERAGE_LARGEST_CLASS] / detail[PURITY_COVERAGE_TOTAL_CLUSTER] # compute coverage detail[COVERAGE_NAME] = \ 1. if detail[PURITY_COVERAGE_TOTAL_CLASS] == 0. \ else detail[PURITY_COVERAGE_LARGEST_CLUSTER] / detail[PURITY_COVERAGE_TOTAL_CLASS] return detail
[docs] def compute_metric(self, detail): _, _, value = self.compute_metrics(detail=detail) return value
def compute_metrics(self, detail=None): detail = self.accumulated_ if detail is None else detail purity = \ 1. if detail[PURITY_COVERAGE_TOTAL_CLUSTER] == 0. \ else detail[PURITY_COVERAGE_LARGEST_CLASS] / detail[PURITY_COVERAGE_TOTAL_CLUSTER] coverage = \ 1. if detail[PURITY_COVERAGE_TOTAL_CLASS] == 0. \ else detail[PURITY_COVERAGE_LARGEST_CLUSTER] / detail[PURITY_COVERAGE_TOTAL_CLASS] return purity, coverage, f_measure(purity, coverage, beta=self.beta)
HOMOGENEITY_NAME = 'homogeneity' HOMOGENEITY_ENTROPY = 'entropy' HOMOGENEITY_CROSS_ENTROPY = 'cross-entropy'
[docs]class DiarizationHomogeneity(UEMSupportMixin, BaseMetric): """Cluster homogeneity Parameters ---------- collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). """ @classmethod def metric_name(cls): return HOMOGENEITY_NAME @classmethod def metric_components(cls): return [HOMOGENEITY_ENTROPY, HOMOGENEITY_CROSS_ENTROPY] def __init__(self, collar=0.0, skip_overlap=False, **kwargs): super(DiarizationHomogeneity, self).__init__(**kwargs) self.collar = collar self.skip_overlap = skip_overlap
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): detail = self.init_components() # crop reference and hypothesis to evaluated regions (uem) reference, hypothesis = self.uemify( reference, hypothesis, uem=uem, collar=self.collar, skip_overlap=self.skip_overlap) # cooccurrence matrix matrix = reference * hypothesis duration = np.sum(matrix) rduration = np.sum(matrix, axis=1) hduration = np.sum(matrix, axis=0) # reference entropy and reference/hypothesis cross-entropy ratio = np.ma.divide(rduration, duration).filled(0.) detail[HOMOGENEITY_ENTROPY] = \ -np.sum(ratio * np.ma.log(ratio).filled(0.)) ratio = np.ma.divide(matrix, duration).filled(0.) hratio = np.ma.divide(matrix, hduration).filled(0.) detail[HOMOGENEITY_CROSS_ENTROPY] = \ -np.sum(ratio * np.ma.log(hratio).filled(0.)) return detail
[docs] def compute_metric(self, detail): numerator = 1. * detail[HOMOGENEITY_CROSS_ENTROPY] denominator = 1. * detail[HOMOGENEITY_ENTROPY] if denominator == 0.: if numerator == 0: return 1. else: return 0. else: return 1. - numerator / denominator
COMPLETENESS_NAME = 'completeness'
[docs]class DiarizationCompleteness(DiarizationHomogeneity): """Cluster completeness Parameters ---------- collar : float, optional Duration (in seconds) of collars removed from evaluation around boundaries of reference segments. skip_overlap : bool, optional Set to True to not evaluate overlap regions. Defaults to False (i.e. keep overlap regions). """ @classmethod def metric_name(cls): return COMPLETENESS_NAME
[docs] def compute_components(self, reference, hypothesis, uem=None, **kwargs): return super(DiarizationCompleteness, self)\ .compute_components(hypothesis, reference, uem=uem, **kwargs)