# meu_framework.py
# Copyright (C) 2006, 2007 Martin Jansche
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, distribute with modifications, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE ABOVE COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
# THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Except as contained in this notice, the name(s) of the above copyright
# holders shall not be used in advertising or otherwise to promote the
# sale, use or other dealings in this Software without prior written
# authorization.
"""
A proof-of-concept implementation of the algorithms in the paper
"A Maximum Expected Utility Framework for Binary Sequence Labeling"
(ACL 2007).
"""
from __future__ import division
from itertools import izip
from math import exp, log, floor, ceil
import double
import select
######################################################################
##
## Decoding algorithms
##
# auxiliary method for exact decoding
def exact_expectF(z, p, beta=1):
assert len(z) == len(p)
g = F_Score(z, beta)
return evaluate_Eg(g, p)
# auxiliary method for inexact decoding
def inexact_expectF(z, p, beta=1, L=lambda n: 2*n+1):
assert len(z) == len(p)
g = F_Score(z, beta)
limit = max(1, L(len(z)))
E_approx,E_max,error,Q = evaluate_inexact_Eg(g, p, limit)
return E_approx
#
# This is Algorithm 1 in the paper:
#
def max_EF_decode(p, expectF=exact_expectF):
I = list(enumerate(p))
I.sort(lambda a,b: cmp(b[1], a[1]))
z = [0 for _ in p]
a = -1 # the paper uses 1-based indexing, but Python is 0-based
v = expectF(z, p)
for j in xrange(len(p)):
i,pi = I[j]
assert p[i] == pi
assert z[i] == 0
z[i] = 1 # switch on the i-th bit
u = expectF(z, p)
##print j, z, u
if u > v:
a = j
v = u
##print a
for zi in z:
assert zi == 1
for j in xrange(a+1, len(p)):
i,pi = I[j]
z[i] = 0 # switch off the i-th bit
return (z, v)
######################################################################
##
## Simple utility/loss functions
##
#
# This is Algorithm 2 in the paper:
#
class F_Score:
def __init__(self, z, beta=1):
self.z = z
self.beta = beta
return
def start(self):
(A,T) = (0,0)
assert self._state_invariant(A, T)
return (A,T)
def transition(self, (A,T), i, yi):
assert self._state_invariant(A, T)
if yi == 1:
T += 1
if self.z[i] == 1:
A += 1
assert self._state_invariant(A, T)
return (A,T)
def a(self, (A,T)):
assert self._state_invariant(A, T)
F = fdiv((self.beta + 1) * A,
dot(self.z, self.z) + self.beta * T)
assert 0 <= F and F <= 1
return F
def _state_invariant(self, A, T):
"""The invariant of the index/state set K."""
return (0 <= A
and A <= T
and T <= len(self.z)
and A <= dot(self.z, self.z))
#
# Hamming loss, discussed in Section 3.5 of the paper:
#
class Hamming_Loss:
def __init__(self, z, threshold=None):
self.z = z
if threshold is None:
self.threshold = len(z)
else:
self.threshold = threshold
assert self.threshold >= 1
return
def start(self):
return 0
def transition(self, k, i, yi):
assert k <= self.threshold
if self.z[i] != yi and k + 1 <= self.threshold:
return k + 1
else:
return k
def a(self, k):
assert k <= self.threshold
return k
#
# Zero-one loss, a special case of Hamming loss counting up to a
# threshold of one.
#
def ZeroOne_Loss(z):
return Hamming_Loss(z, threshold=1)
#
# P_k loss, also mentioned in Section 3.5 of the paper:
#
class Pk_Loss:
def __init__(self, z, k):
assert len(z) >= k
self.z = z
self.k = k
return
def start(self):
return (0, ('#',) * self.k)
def transition(self, (mf,window), i, yi):
if i >= self.k:
if 1 in self.z[i-self.k:i]:
if 1 not in window:
mf += 1
elif 1 in window:
mf += 1
window = window[1:] + (yi,)
return (mf, window)
def a(self, (mf,window)):
return fdiv(mf, len(self.z) - self.k)
######################################################################
##
## Evaluating a function and its expectation at a given point
##
#
# This is Algorithm 3 in the paper:
#
def evaluate_g(g, y):
k = g.start()
for i,yi in enumerate(y):
k = g.transition(k, i, yi)
return g.a(k)
#
# This is Algorithm 4 in the paper:
#
def evaluate_Eg(g, p):
# NB: This method is general and works with all loss/utility
# functions that fit into the present framework.
n = len(p)
M = {}
M[g.start()] = 1
for i in xrange(n):
N = {}
for (k,P) in M.iteritems():
# transition on yi = 0
k0 = g.transition(k, i, 0)
if k0 not in N:
N[k0] = 0
N[k0] += P * (1 - p[i])
# transition on yi = 1
k1 = g.transition(k, i, 1)
if k1 not in N:
N[k1] = 0
N[k1] += P * p[i]
M = N
E = 0
for (k,P) in M.iteritems():
E += g.a(k) * P
return E
#
# This is the inexact method discussed in Section 4:
#
def evaluate_inexact_Eg(g, p, lim):
# NB: This method is partially specialized to the F-score. It can
# be used with all loss/utility functions that fit into the
# present framework, but error estimates are only available for
# the expected F-score.
assert lim >= 1
def limit(M_items):
m = len(M_items) - int(lim)
if m > 0:
select.select(m, M_items, lambda (k1,P1),(k2,P2): cmp(P1,P2))
J = M_items[m:]
assert len(J) == int(lim)
else:
J = M_items
assert 1 <= len(J) and len(J) <= lim
return J
n = len(p)
Q = 0 # number of expanded states
M = {}
M[g.start()] = 1
for i in xrange(n):
assert len(M) <= 2 * lim
N = {}
for (k,P) in limit(M.items()):
Q += 1
# transition on yi = 0
k0 = g.transition(k, i, 0)
if k0 not in N:
N[k0] = 0
N[k0] += P * (1 - p[i])
# transition on yi = 1
k1 = g.transition(k, i, 1)
if k1 not in N:
N[k1] = 0
N[k1] += P * p[i]
M = N
T = 0 # total probability mass covered by the expanded states
E = 0
for (k,P) in limit(M.items()):
Q += 1
T += P
E += g.a(k) * P
if isinstance(g, F_Score): # HACK!
# For the F-score, we can estimate the error of the inexact
# computation as follows. In the worst case, the probability
# mass 1-T of unseen states corresponds to a single state with
# coefficient 1 (all coefficients a_k must lie in the interval
# [0;1]). This provides a loose upper bound E_max on the
# exact expectation.
E_max = min(1, E + max(0, 1 - T))
error = abs(E_max - E)
else:
E_max = None
error = None
# Check that the number of states is within the specified limit:
assert Q <= (n + 1) * lim
return E, E_max, error, Q
######################################################################
##
## Auxiliary functions
##
def fdiv(x, y):
"""Nonstandard floating point division, differing from IEEE
semantics in that 0/0 == 1. Needless to say this is inconsistent,
but makes sense when computing the F score, where 0/0 can arise as
a limiting case."""
if x == 0 and y == 0:
if double.signbit(x) == double.signbit(y):
return 1.0
else:
return -1.0
else:
return double.fdiv(x, y)
def dot(xs, ys):
"""Generic dot product in arbitrary coordinate space."""
d = 0
for x,y in izip(xs, ys):
d += x * y
return d
## eof