Re: [RFC][PATCH 1/2] sstate: use the python3 ThreadPoolExecutor instead of the OE ThreadedPool


Jose Quaresma
 



Richard Purdie <richard.purdie@...> escreveu no dia sábado, 16/04/2022 à(s) 22:57:
On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> for the FetchConnectionCache use a queue where each thread can
> get an unsed connection_cache that is properly initialized before
> we fireup the ThreadPoolExecutor.
>
> for the progress bar we need an adictional task counter
> that is protected with thread lock as it runs inside the
> ThreadPoolExecutor.
>
> Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
>
> Signed-off-by: Jose Quaresma <quaresma.jose@...>
> ---
>  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
>  1 file changed, 28 insertions(+), 16 deletions(-)

Are there specific issues you see with oe.utils.ThreadedPool that this change
addresses? Were you able to reproduce the issue in 14775?

Looking deeper while testing the patch I think I found another bug in the sstate mirror handling.
The python set() is not thread safe and we use it inside the thread pool, I added a new python set class for that in my V2 

I don't know if it is related to 14775 but it can be, I can't reproduce the 14775 on my side,
maybe it's better to remove the 14775 mention from my commits, what do you think?


I'm a little concerned we swap one implementation where we know roughly what the
issues are for another where we dont :/.

I think there some issues on ThreadedPool in the worker_init and worker_end,
this functions is called in all workers and it seems to me that the right thing to do
is calling and reuse the previous ones connection_cache otherwise the connection_cache
does nothing.
 

I notice that ThreadPoolExecutor can take an initializer but you're doing this
using the queue instead. Is that because you suspect some issue with those being
setup in the separate threads?

I am using a queue as it is the easy way I find for reusing the FetchConnectionCache.
 

You also mentioned the debug messages not showing. That suggests something is
wrong with the event handlers in the new threading model and that errors
wouldn't propagate either so we need to check into that.

This is fixed in V2
 

This is definitely an interesting idea but I'm nervous about it :/.

It would be interesting if you could test it in the autobuilder.
On my side it is working well now, I will send a V2

Jose
 

Cheers,

Richard



> diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> index 1c0cae4893..0ede078770 100644
> --- a/meta/classes/sstate.bbclass
> +++ b/meta/classes/sstate.bbclass
> @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              localdata.delVar('BB_NO_NETWORK')

>          from bb.fetch2 import FetchConnectionCache
> -        def checkstatus_init(thread_worker):
> -            thread_worker.connection_cache = FetchConnectionCache()
> -
> -        def checkstatus_end(thread_worker):
> -            thread_worker.connection_cache.close_connections()
> -
> -        def checkstatus(thread_worker, arg):
> +        def checkstatus_init():
> +            while not connection_cache_pool.full():
> +                connection_cache_pool.put(FetchConnectionCache())
> +
> +        def checkstatus_end():
> +            while not connection_cache_pool.empty():
> +                connection_cache = connection_cache_pool.get()
> +                connection_cache.close_connections()
> +
> +        import threading
> +        _lock = threading.Lock()
> +        def checkstatus(arg):
>              (tid, sstatefile) = arg

> +            connection_cache = connection_cache_pool.get()
> +
>              localdata2 = bb.data.createCopy(localdata)
>              srcuri = "file://" + sstatefile
>              localdata2.setVar('SRC_URI', srcuri)
> @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,

>              try:
>                  fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> -                            connection_cache=thread_worker.connection_cache)
> +                            connection_cache=connection_cache)
>                  fetcher.checkstatus()
>                  bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
>                  found.add(tid)
> @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              except Exception as e:
>                  bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))

> +            connection_cache_pool.put(connection_cache)
> +
>              if progress:
> -                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
> +                with _lock:
> +                    tasks -= 1
> +                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)

>          tasklist = []
>          for tid in missed:
> @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>          if tasklist:
>              nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))

> +            tasks = len(tasklist)
>              progress = len(tasklist) >= 100
>              if progress:
>                  msg = "Checking sstate mirror object availability"
> @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              fetcherenv = bb.fetch2.get_fetcher_environment(d)
>              with bb.utils.environment(**fetcherenv):
>                  bb.event.enable_threadlock()
> -                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> -                        worker_init=checkstatus_init, worker_end=checkstatus_end,
> -                        name="sstate_checkhashes-")
> -                for t in tasklist:
> -                    pool.add_task(checkstatus, t)
> -                pool.start()
> -                pool.wait_completion()
> +                import concurrent.futures
> +                from queue import Queue
> +                connection_cache_pool = Queue(nproc)
> +                checkstatus_init()
> +                with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> +                    executor.map(checkstatus, tasklist)
> +                checkstatus_end()
>                  bb.event.disable_threadlock()

>              if progress:
>
>




--
Best regards,

José Quaresma

Join openembedded-core@lists.openembedded.org to automatically receive all group messages.