[RFC][PATCH v2 1/4] sstate: add a LockedSet class as python set() is not thread safe


Jose Quaresma
 



Richard Purdie <richard.purdie@...> escreveu no dia domingo, 17/04/2022 à(s) 21:09:
On Sun, 2022-04-17 at 14:41 +0100, Richard Purdie via lists.openembedded.org
wrote:
> On Sun, 2022-04-17 at 14:39 +0100, Richard Purdie via lists.openembedded.org
> wrote:
> > On Sat, 2022-04-16 at 23:28 +0100, Jose Quaresma wrote:
> > > With this LockedSet python class we can call the 'add' and 'remove'
> > > safely inside the ThreadedPool.
> > >
> > > This piece of code is taen from the stackoverflow
> > > https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe
> > >
> > > Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> > >
> > > Signed-off-by: Jose Quaresma <quaresma.jose@...>
> > > ---
> > >  meta/classes/sstate.bbclass | 24 ++++++++++++++++++++++--
> > >  1 file changed, 22 insertions(+), 2 deletions(-)
> > >
> > > diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> > > index 1c0cae4893..a3ba748a1e 100644
> > > --- a/meta/classes/sstate.bbclass
> > > +++ b/meta/classes/sstate.bbclass
> > > @@ -918,8 +918,28 @@ sstate_unpack_package () {
> > >  BB_HASHCHECK_FUNCTION = "sstate_checkhashes"
> > > 
> > >  def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, **kwargs):
> > > -    found = set()
> > > -    missed = set()
> > > +    # https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe
> > > +    import threading
> > > +    class LockedSet(set):
> > > +        """A set where add(), remove(), and 'in' operator are thread-safe"""
> > > +        def __init__(self, *args, **kwargs):
> > > +            self._lock = threading.Lock()
> > > +            super(LockedSet, self).__init__(*args, **kwargs)
> > > +
> > > +        def add(self, elem):
> > > +            with self._lock:
> > > +                super(LockedSet, self).add(elem)
> > > +
> > > +        def remove(self, elem):
> > > +            with self._lock:
> > > +                super(LockedSet, self).remove(elem)
> > > +
> > > +        def __contains__(self, elem):
> > > +            with self._lock:
> > > +                super(LockedSet, self).__contains__(elem)
> > > +
> > > +    found = LockedSet()
> > > +    missed = LockedSet()
> > > 
> > >      def gethash(task):
> > >          return sq_data['unihash'][task]
> >
> >
> > The series blew up on the autobuilder:
> >
> > testsdkest:
> > https://autobuilder.yoctoproject.org/typhoon/#/builders/53/builds/5079/steps/17/logs/stdio
> > oe-selftest:
> > https://autobuilder.yoctoproject.org/typhoon/#/builders/79/builds/3461/steps/15/logs/stdio
> > and many more, see:
> > https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3535
>
> That last one should be:
> https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3539

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 7a18d9d554..99d125b8c5 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -1058,7 +1058,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
                 connection_cache_pool = Queue(nproc)
                 checkstatus_init()
                 with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
-                    executor.map(checkstatus, tasks)
+                    executor.map(checkstatus, tasks.copy())
                 checkstatus_end()
                 bb.event.disable_threadlock()

fixes things.

Cheers,

Richard


Thanks for testing and fixing, this can be because the tasks collection has been changed inside the thread pool.
I'll wait a few more days to see if there are any more comments and I hope to be able to finish during this week.

--
Best regards,

José Quaresma


Richard Purdie
 

On Sun, 2022-04-17 at 14:41 +0100, Richard Purdie via lists.openembedded.org
wrote:
On Sun, 2022-04-17 at 14:39 +0100, Richard Purdie via lists.openembedded.org
wrote:
On Sat, 2022-04-16 at 23:28 +0100, Jose Quaresma wrote:
With this LockedSet python class we can call the 'add' and 'remove'
safely inside the ThreadedPool.

This piece of code is taen from the stackoverflow
https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 24 ++++++++++++++++++++++--
1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..a3ba748a1e 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -918,8 +918,28 @@ sstate_unpack_package () {
BB_HASHCHECK_FUNCTION = "sstate_checkhashes"

def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, **kwargs):
- found = set()
- missed = set()
+ # https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe
+ import threading
+ class LockedSet(set):
+ """A set where add(), remove(), and 'in' operator are thread-safe"""
+ def __init__(self, *args, **kwargs):
+ self._lock = threading.Lock()
+ super(LockedSet, self).__init__(*args, **kwargs)
+
+ def add(self, elem):
+ with self._lock:
+ super(LockedSet, self).add(elem)
+
+ def remove(self, elem):
+ with self._lock:
+ super(LockedSet, self).remove(elem)
+
+ def __contains__(self, elem):
+ with self._lock:
+ super(LockedSet, self).__contains__(elem)
+
+ found = LockedSet()
+ missed = LockedSet()

def gethash(task):
return sq_data['unihash'][task]

The series blew up on the autobuilder:

testsdkest:
https://autobuilder.yoctoproject.org/typhoon/#/builders/53/builds/5079/steps/17/logs/stdio
oe-selftest:
https://autobuilder.yoctoproject.org/typhoon/#/builders/79/builds/3461/steps/15/logs/stdio
and many more, see:
https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3535
That last one should be:
https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3539
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 7a18d9d554..99d125b8c5 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -1058,7 +1058,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
connection_cache_pool = Queue(nproc)
checkstatus_init()
with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
- executor.map(checkstatus, tasks)
+ executor.map(checkstatus, tasks.copy())
checkstatus_end()
bb.event.disable_threadlock()

fixes things.

Cheers,

Richard


Richard Purdie
 

On Sun, 2022-04-17 at 14:39 +0100, Richard Purdie via lists.openembedded.org
wrote:
On Sat, 2022-04-16 at 23:28 +0100, Jose Quaresma wrote:
With this LockedSet python class we can call the 'add' and 'remove'
safely inside the ThreadedPool.

This piece of code is taen from the stackoverflow
https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 24 ++++++++++++++++++++++--
1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..a3ba748a1e 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -918,8 +918,28 @@ sstate_unpack_package () {
BB_HASHCHECK_FUNCTION = "sstate_checkhashes"

def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, **kwargs):
- found = set()
- missed = set()
+ # https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe
+ import threading
+ class LockedSet(set):
+ """A set where add(), remove(), and 'in' operator are thread-safe"""
+ def __init__(self, *args, **kwargs):
+ self._lock = threading.Lock()
+ super(LockedSet, self).__init__(*args, **kwargs)
+
+ def add(self, elem):
+ with self._lock:
+ super(LockedSet, self).add(elem)
+
+ def remove(self, elem):
+ with self._lock:
+ super(LockedSet, self).remove(elem)
+
+ def __contains__(self, elem):
+ with self._lock:
+ super(LockedSet, self).__contains__(elem)
+
+ found = LockedSet()
+ missed = LockedSet()

def gethash(task):
return sq_data['unihash'][task]

The series blew up on the autobuilder:

testsdkest:
https://autobuilder.yoctoproject.org/typhoon/#/builders/53/builds/5079/steps/17/logs/stdio
oe-selftest:
https://autobuilder.yoctoproject.org/typhoon/#/builders/79/builds/3461/steps/15/logs/stdio
and many more, see:
https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3535
That last one should be:
https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3539

Cheers,

Richard


Richard Purdie
 

On Sat, 2022-04-16 at 23:28 +0100, Jose Quaresma wrote:
With this LockedSet python class we can call the 'add' and 'remove'
safely inside the ThreadedPool.

This piece of code is taen from the stackoverflow
https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 24 ++++++++++++++++++++++--
1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..a3ba748a1e 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -918,8 +918,28 @@ sstate_unpack_package () {
BB_HASHCHECK_FUNCTION = "sstate_checkhashes"

def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, **kwargs):
- found = set()
- missed = set()
+ # https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe
+ import threading
+ class LockedSet(set):
+ """A set where add(), remove(), and 'in' operator are thread-safe"""
+ def __init__(self, *args, **kwargs):
+ self._lock = threading.Lock()
+ super(LockedSet, self).__init__(*args, **kwargs)
+
+ def add(self, elem):
+ with self._lock:
+ super(LockedSet, self).add(elem)
+
+ def remove(self, elem):
+ with self._lock:
+ super(LockedSet, self).remove(elem)
+
+ def __contains__(self, elem):
+ with self._lock:
+ super(LockedSet, self).__contains__(elem)
+
+ found = LockedSet()
+ missed = LockedSet()

def gethash(task):
return sq_data['unihash'][task]

The series blew up on the autobuilder:

testsdkest:
https://autobuilder.yoctoproject.org/typhoon/#/builders/53/builds/5079/steps/17/logs/stdio
oe-selftest:
https://autobuilder.yoctoproject.org/typhoon/#/builders/79/builds/3461/steps/15/logs/stdio
and many more, see:
https://autobuilder.yoctoproject.org/typhoon/#/builders/83/builds/3535

Cheers,

Richard


Jose Quaresma
 

With this LockedSet python class we can call the 'add' and 'remove'
safely inside the ThreadedPool.

This piece of code is taen from the stackoverflow
https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 24 ++++++++++++++++++++++--
1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..a3ba748a1e 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -918,8 +918,28 @@ sstate_unpack_package () {
BB_HASHCHECK_FUNCTION = "sstate_checkhashes"

def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, **kwargs):
- found = set()
- missed = set()
+ # https://stackoverflow.com/questions/13610654/how-to-make-built-in-containers-sets-dicts-lists-thread-safe
+ import threading
+ class LockedSet(set):
+ """A set where add(), remove(), and 'in' operator are thread-safe"""
+ def __init__(self, *args, **kwargs):
+ self._lock = threading.Lock()
+ super(LockedSet, self).__init__(*args, **kwargs)
+
+ def add(self, elem):
+ with self._lock:
+ super(LockedSet, self).add(elem)
+
+ def remove(self, elem):
+ with self._lock:
+ super(LockedSet, self).remove(elem)
+
+ def __contains__(self, elem):
+ with self._lock:
+ super(LockedSet, self).__contains__(elem)
+
+ found = LockedSet()
+ missed = LockedSet()

def gethash(task):
return sq_data['unihash'][task]
--
2.35.3