[RFC][PATCH 1/2] sstate: use the python3 ThreadPoolExecutor instead of the OE ThreadedPool


Jose Quaresma
 

for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

for the progress bar we need an adictional task counter
that is protected with thread lock as it runs inside the
ThreadPoolExecutor.

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
1 file changed, 28 insertions(+), 16 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..0ede078770 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
localdata.delVar('BB_NO_NETWORK')

from bb.fetch2 import FetchConnectionCache
- def checkstatus_init(thread_worker):
- thread_worker.connection_cache = FetchConnectionCache()
-
- def checkstatus_end(thread_worker):
- thread_worker.connection_cache.close_connections()
-
- def checkstatus(thread_worker, arg):
+ def checkstatus_init():
+ while not connection_cache_pool.full():
+ connection_cache_pool.put(FetchConnectionCache())
+
+ def checkstatus_end():
+ while not connection_cache_pool.empty():
+ connection_cache = connection_cache_pool.get()
+ connection_cache.close_connections()
+
+ import threading
+ _lock = threading.Lock()
+ def checkstatus(arg):
(tid, sstatefile) = arg

+ connection_cache = connection_cache_pool.get()
+
localdata2 = bb.data.createCopy(localdata)
srcuri = "file://" + sstatefile
localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,

try:
fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
- connection_cache=thread_worker.connection_cache)
+ connection_cache=connection_cache)
fetcher.checkstatus()
bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
found.add(tid)
@@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
except Exception as e:
bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))

+ connection_cache_pool.put(connection_cache)
+
if progress:
- bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
+ with _lock:
+ tasks -= 1
+ bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)

tasklist = []
for tid in missed:
@@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
if tasklist:
nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))

+ tasks = len(tasklist)
progress = len(tasklist) >= 100
if progress:
msg = "Checking sstate mirror object availability"
@@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
fetcherenv = bb.fetch2.get_fetcher_environment(d)
with bb.utils.environment(**fetcherenv):
bb.event.enable_threadlock()
- pool = oe.utils.ThreadedPool(nproc, len(tasklist),
- worker_init=checkstatus_init, worker_end=checkstatus_end,
- name="sstate_checkhashes-")
- for t in tasklist:
- pool.add_task(checkstatus, t)
- pool.start()
- pool.wait_completion()
+ import concurrent.futures
+ from queue import Queue
+ connection_cache_pool = Queue(nproc)
+ checkstatus_init()
+ with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
+ executor.map(checkstatus, tasklist)
+ checkstatus_end()
bb.event.disable_threadlock()

if progress:
--
2.35.3


Jose Quaresma
 

Hi,

One thing that yet doesn't work and I don't know why is the bitbake debug messages inside the ThreadPoolExecutor.

Jose

Jose Quaresma via lists.openembedded.org <quaresma.jose=gmail.com@...> escreveu no dia sábado, 16/04/2022 à(s) 21:24:

for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

for the progress bar we need an adictional task counter
that is protected with thread lock as it runs inside the
ThreadPoolExecutor.

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
 meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
 1 file changed, 28 insertions(+), 16 deletions(-)

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..0ede078770 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             localdata.delVar('BB_NO_NETWORK')

         from bb.fetch2 import FetchConnectionCache
-        def checkstatus_init(thread_worker):
-            thread_worker.connection_cache = FetchConnectionCache()
-
-        def checkstatus_end(thread_worker):
-            thread_worker.connection_cache.close_connections()
-
-        def checkstatus(thread_worker, arg):
+        def checkstatus_init():
+            while not connection_cache_pool.full():
+                connection_cache_pool.put(FetchConnectionCache())
+
+        def checkstatus_end():
+            while not connection_cache_pool.empty():
+                connection_cache = connection_cache_pool.get()
+                connection_cache.close_connections()
+
+        import threading
+        _lock = threading.Lock()
+        def checkstatus(arg):
             (tid, sstatefile) = arg

+            connection_cache = connection_cache_pool.get()
+
             localdata2 = bb.data.createCopy(localdata)
             srcuri = "file://" + sstatefile
             localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,

             try:
                 fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
-                            connection_cache=thread_worker.connection_cache)
+                            connection_cache=connection_cache)
                 fetcher.checkstatus()
                 bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
                 found.add(tid)
@@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             except Exception as e:
                 bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))

+            connection_cache_pool.put(connection_cache)
+
             if progress:
-                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
+                with _lock:
+                    tasks -= 1
+                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)

         tasklist = []
         for tid in missed:
@@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
         if tasklist:
             nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))

+            tasks = len(tasklist)
             progress = len(tasklist) >= 100
             if progress:
                 msg = "Checking sstate mirror object availability"
@@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             fetcherenv = bb.fetch2.get_fetcher_environment(d)
             with bb.utils.environment(**fetcherenv):
                 bb.event.enable_threadlock()
-                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
-                        worker_init=checkstatus_init, worker_end=checkstatus_end,
-                        name="sstate_checkhashes-")
-                for t in tasklist:
-                    pool.add_task(checkstatus, t)
-                pool.start()
-                pool.wait_completion()
+                import concurrent.futures
+                from queue import Queue
+                connection_cache_pool = Queue(nproc)
+                checkstatus_init()
+                with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
+                    executor.map(checkstatus, tasklist)
+                checkstatus_end()
                 bb.event.disable_threadlock()

             if progress:
--
2.35.3






--
Best regards,

José Quaresma


Richard Purdie
 

On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

for the progress bar we need an adictional task counter
that is protected with thread lock as it runs inside the
ThreadPoolExecutor.

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
1 file changed, 28 insertions(+), 16 deletions(-)
Are there specific issues you see with oe.utils.ThreadedPool that this change
addresses? Were you able to reproduce the issue in 14775?

I'm a little concerned we swap one implementation where we know roughly what the
issues are for another where we dont :/.

I notice that ThreadPoolExecutor can take an initializer but you're doing this
using the queue instead. Is that because you suspect some issue with those being
setup in the separate threads?

You also mentioned the debug messages not showing. That suggests something is
wrong with the event handlers in the new threading model and that errors
wouldn't propagate either so we need to check into that.

This is definitely an interesting idea but I'm nervous about it :/.

Cheers,

Richard



diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..0ede078770 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
localdata.delVar('BB_NO_NETWORK')

from bb.fetch2 import FetchConnectionCache
- def checkstatus_init(thread_worker):
- thread_worker.connection_cache = FetchConnectionCache()
-
- def checkstatus_end(thread_worker):
- thread_worker.connection_cache.close_connections()
-
- def checkstatus(thread_worker, arg):
+ def checkstatus_init():
+ while not connection_cache_pool.full():
+ connection_cache_pool.put(FetchConnectionCache())
+
+ def checkstatus_end():
+ while not connection_cache_pool.empty():
+ connection_cache = connection_cache_pool.get()
+ connection_cache.close_connections()
+
+ import threading
+ _lock = threading.Lock()
+ def checkstatus(arg):
(tid, sstatefile) = arg

+ connection_cache = connection_cache_pool.get()
+
localdata2 = bb.data.createCopy(localdata)
srcuri = "file://" + sstatefile
localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,

try:
fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
- connection_cache=thread_worker.connection_cache)
+ connection_cache=connection_cache)
fetcher.checkstatus()
bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
found.add(tid)
@@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
except Exception as e:
bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))

+ connection_cache_pool.put(connection_cache)
+
if progress:
- bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
+ with _lock:
+ tasks -= 1
+ bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)

tasklist = []
for tid in missed:
@@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
if tasklist:
nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))

+ tasks = len(tasklist)
progress = len(tasklist) >= 100
if progress:
msg = "Checking sstate mirror object availability"
@@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
fetcherenv = bb.fetch2.get_fetcher_environment(d)
with bb.utils.environment(**fetcherenv):
bb.event.enable_threadlock()
- pool = oe.utils.ThreadedPool(nproc, len(tasklist),
- worker_init=checkstatus_init, worker_end=checkstatus_end,
- name="sstate_checkhashes-")
- for t in tasklist:
- pool.add_task(checkstatus, t)
- pool.start()
- pool.wait_completion()
+ import concurrent.futures
+ from queue import Queue
+ connection_cache_pool = Queue(nproc)
+ checkstatus_init()
+ with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
+ executor.map(checkstatus, tasklist)
+ checkstatus_end()
bb.event.disable_threadlock()

if progress:


Jose Quaresma
 



Richard Purdie <richard.purdie@...> escreveu no dia sábado, 16/04/2022 à(s) 22:57:
On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> for the FetchConnectionCache use a queue where each thread can
> get an unsed connection_cache that is properly initialized before
> we fireup the ThreadPoolExecutor.
>
> for the progress bar we need an adictional task counter
> that is protected with thread lock as it runs inside the
> ThreadPoolExecutor.
>
> Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
>
> Signed-off-by: Jose Quaresma <quaresma.jose@...>
> ---
>  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
>  1 file changed, 28 insertions(+), 16 deletions(-)

Are there specific issues you see with oe.utils.ThreadedPool that this change
addresses? Were you able to reproduce the issue in 14775?

Looking deeper while testing the patch I think I found another bug in the sstate mirror handling.
The python set() is not thread safe and we use it inside the thread pool, I added a new python set class for that in my V2 

I don't know if it is related to 14775 but it can be, I can't reproduce the 14775 on my side,
maybe it's better to remove the 14775 mention from my commits, what do you think?


I'm a little concerned we swap one implementation where we know roughly what the
issues are for another where we dont :/.

I think there some issues on ThreadedPool in the worker_init and worker_end,
this functions is called in all workers and it seems to me that the right thing to do
is calling and reuse the previous ones connection_cache otherwise the connection_cache
does nothing.
 

I notice that ThreadPoolExecutor can take an initializer but you're doing this
using the queue instead. Is that because you suspect some issue with those being
setup in the separate threads?

I am using a queue as it is the easy way I find for reusing the FetchConnectionCache.
 

You also mentioned the debug messages not showing. That suggests something is
wrong with the event handlers in the new threading model and that errors
wouldn't propagate either so we need to check into that.

This is fixed in V2
 

This is definitely an interesting idea but I'm nervous about it :/.

It would be interesting if you could test it in the autobuilder.
On my side it is working well now, I will send a V2

Jose
 

Cheers,

Richard



> diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> index 1c0cae4893..0ede078770 100644
> --- a/meta/classes/sstate.bbclass
> +++ b/meta/classes/sstate.bbclass
> @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              localdata.delVar('BB_NO_NETWORK')

>          from bb.fetch2 import FetchConnectionCache
> -        def checkstatus_init(thread_worker):
> -            thread_worker.connection_cache = FetchConnectionCache()
> -
> -        def checkstatus_end(thread_worker):
> -            thread_worker.connection_cache.close_connections()
> -
> -        def checkstatus(thread_worker, arg):
> +        def checkstatus_init():
> +            while not connection_cache_pool.full():
> +                connection_cache_pool.put(FetchConnectionCache())
> +
> +        def checkstatus_end():
> +            while not connection_cache_pool.empty():
> +                connection_cache = connection_cache_pool.get()
> +                connection_cache.close_connections()
> +
> +        import threading
> +        _lock = threading.Lock()
> +        def checkstatus(arg):
>              (tid, sstatefile) = arg

> +            connection_cache = connection_cache_pool.get()
> +
>              localdata2 = bb.data.createCopy(localdata)
>              srcuri = "file://" + sstatefile
>              localdata2.setVar('SRC_URI', srcuri)
> @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,

>              try:
>                  fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> -                            connection_cache=thread_worker.connection_cache)
> +                            connection_cache=connection_cache)
>                  fetcher.checkstatus()
>                  bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
>                  found.add(tid)
> @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              except Exception as e:
>                  bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))

> +            connection_cache_pool.put(connection_cache)
> +
>              if progress:
> -                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
> +                with _lock:
> +                    tasks -= 1
> +                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)

>          tasklist = []
>          for tid in missed:
> @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>          if tasklist:
>              nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))

> +            tasks = len(tasklist)
>              progress = len(tasklist) >= 100
>              if progress:
>                  msg = "Checking sstate mirror object availability"
> @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              fetcherenv = bb.fetch2.get_fetcher_environment(d)
>              with bb.utils.environment(**fetcherenv):
>                  bb.event.enable_threadlock()
> -                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> -                        worker_init=checkstatus_init, worker_end=checkstatus_end,
> -                        name="sstate_checkhashes-")
> -                for t in tasklist:
> -                    pool.add_task(checkstatus, t)
> -                pool.start()
> -                pool.wait_completion()
> +                import concurrent.futures
> +                from queue import Queue
> +                connection_cache_pool = Queue(nproc)
> +                checkstatus_init()
> +                with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> +                    executor.map(checkstatus, tasklist)
> +                checkstatus_end()
>                  bb.event.disable_threadlock()

>              if progress:
>
>




--
Best regards,

José Quaresma


Richard Purdie
 

On Sat, 2022-04-16 at 23:27 +0100, Jose Quaresma wrote:


Richard Purdie <richard.purdie@...> escreveu no dia sábado,
16/04/2022 à(s) 22:57:
On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

for the progress bar we need an adictional task counter
that is protected with thread lock as it runs inside the
ThreadPoolExecutor.

Fixes [YOCTO #14775] --
https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
  1 file changed, 28 insertions(+), 16 deletions(-)
Are there specific issues you see with oe.utils.ThreadedPool that this
change
addresses? Were you able to reproduce the issue in 14775?

Looking deeper while testing the patch I think I found another bug in the
sstate mirror handling.
The python set() is not thread safe and we use it inside the thread pool, I
added a new python set class for that in my V2 
That might explain things.

I don't know if it is related to 14775 but it can be, I can't reproduce the
14775 on my side, maybe it's better to remove the 14775 mention from my
commits, what do you think?
I think you shouldn't say it fixes it as we simply don't know that. It may be
related to so perhaps say that instead.

I'm a little concerned we swap one implementation where we know roughly what
the issues are for another where we dont :/.
I think there some issues on ThreadedPool in the worker_init and worker_end,
this functions is called in all workers and it seems to me that the
right thing to do is calling and reuse the previous ones connection_cache
otherwise the connection_cache does nothing.
It creates a connection_cache in each thread that is created. Once created in a
given thread, that connection cache is reused there? I'm not sure you can say it
does nothing?

 

I notice that ThreadPoolExecutor can take an initializer but you're doing
this
using the queue instead. Is that because you suspect some issue with those
being
setup in the separate threads?

I am using a queue as it is the easy way I find for reusing the
FetchConnectionCache.
I think that piece of code was working already?

 

You also mentioned the debug messages not showing. That suggests something
is
wrong with the event handlers in the new threading model and that errors
wouldn't propagate either so we need to check into that.

This is fixed in V2

What was the issue out of interest?

 

This is definitely an interesting idea but I'm nervous about it :/.

It would be interesting if you could test it in the autobuilder.
On my side it is working well now, I will send a V2
The challenge with autobuilder testing of this is that there is only a small
portion of the autobuilder tests which exercise this code (testsdkext for
images). The current issues only occur intermittently so it is hard to know if
any given change fixes anything (or introduces a new race).

One more interesting test which may more quickly find issues would be to make
everything use the http mirror on the autobuilder I guess. We'd need to figure
out the configuration for that though.

Cheers,

Richard


Jose Quaresma
 



Richard Purdie <richard.purdie@...> escreveu no dia domingo, 17/04/2022 à(s) 08:57:
On Sat, 2022-04-16 at 23:27 +0100, Jose Quaresma wrote:
>
>
> Richard Purdie <richard.purdie@...> escreveu no dia sábado,
> 16/04/2022 à(s) 22:57:
> > On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> > > for the FetchConnectionCache use a queue where each thread can
> > > get an unsed connection_cache that is properly initialized before
> > > we fireup the ThreadPoolExecutor.
> > >
> > > for the progress bar we need an adictional task counter
> > > that is protected with thread lock as it runs inside the
> > > ThreadPoolExecutor.
> > >
> > > Fixes [YOCTO #14775] --
> > https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> > >
> > > Signed-off-by: Jose Quaresma <quaresma.jose@...>
> > > ---
> > >   meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
> > >   1 file changed, 28 insertions(+), 16 deletions(-)
> >
> > Are there specific issues you see with oe.utils.ThreadedPool that this
> > change
> > addresses? Were you able to reproduce the issue in 14775?
> >
>
>
> Looking deeper while testing the patch I think I found another bug in the
> sstate mirror handling.
> The python set() is not thread safe and we use it inside the thread pool, I
> added a new python set class for that in my V2 

That might explain things.

> I don't know if it is related to 14775 but it can be, I can't reproduce the
> 14775 on my side, maybe it's better to remove the 14775 mention from my
> commits, what do you think?

I think you shouldn't say it fixes it as we simply don't know that. It may be
related to so perhaps say that instead.

I will do that.
 

> > I'm a little concerned we swap one implementation where we know roughly what
> > the issues are for another where we dont :/.
> >
>
> I think there some issues on ThreadedPool in the worker_init and worker_end,
> this functions is called in all workers and it seems to me that the
> right thing to do is calling and reuse the previous ones connection_cache
> otherwise the connection_cache does nothing.

It creates a connection_cache in each thread that is created. Once created in a
given thread, that connection cache is reused there? I'm not sure you can say it
does nothing?

I may have misunderstood this part and you may be right. I have to re-analyze more carefully.
 

>  
> >
> > I notice that ThreadPoolExecutor can take an initializer but you're doing
> > this
> > using the queue instead. Is that because you suspect some issue with those
> > being
> > setup in the separate threads?
> >
>
>
> I am using a queue as it is the easy way I find for reusing the
> FetchConnectionCache.

I think that piece of code was working already?

it may be working fine on OE ThreadPool and I misunderstood that
 

>  
> >
> > You also mentioned the debug messages not showing. That suggests something
> > is
> > wrong with the event handlers in the new threading model and that errors
> > wouldn't propagate either so we need to check into that.
> >
>
>
> This is fixed in V2


What was the issue out of interest?

I don't know but it starts working when I add the thread safe collections.
 

>  
> >
> > This is definitely an interesting idea but I'm nervous about it :/.
> >
>
>
> It would be interesting if you could test it in the autobuilder.
> On my side it is working well now, I will send a V2

The challenge with autobuilder testing of this is that there is only a small
portion of the autobuilder tests which exercise this code (testsdkext for
images). The current issues only occur intermittently so it is hard to know if
any given change fixes anything (or introduces a new race).

One more interesting test which may more quickly find issues would be to make
everything use the http mirror on the autobuilder I guess. We'd need to figure
out the configuration for that though.

What you mean by this is that there are builds on the autobuilder that uses the sstate mirror
and others that use some shared sstate cache filesystem?

As you previously said and as the sstate mirror is available for the community,
I think and I will try to add some tests for that.
I still don't know how to do it but I'll think about it.

Another thing about this RFC series is that I think I need to do it in a way
that it can be backported for dunfell if we need to do that.

I will spend more time on this during this week.
Thanks for your always valuable comments.

Jose


Cheers,

Richard





--
Best regards,

José Quaresma