Date
1 - 6 of 6
[RFC][PATCH 1/2] sstate: use the python3 ThreadPoolExecutor instead of the OE ThreadedPool
Jose Quaresma
for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before we fireup the ThreadPoolExecutor. for the progress bar we need an adictional task counter that is protected with thread lock as it runs inside the ThreadPoolExecutor. Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775 Signed-off-by: Jose Quaresma <quaresma.jose@...> --- meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass index 1c0cae4893..0ede078770 100644 --- a/meta/classes/sstate.bbclass +++ b/meta/classes/sstate.bbclass @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, localdata.delVar('BB_NO_NETWORK') from bb.fetch2 import FetchConnectionCache - def checkstatus_init(thread_worker): - thread_worker.connection_cache = FetchConnectionCache() - - def checkstatus_end(thread_worker): - thread_worker.connection_cache.close_connections() - - def checkstatus(thread_worker, arg): + def checkstatus_init(): + while not connection_cache_pool.full(): + connection_cache_pool.put(FetchConnectionCache()) + + def checkstatus_end(): + while not connection_cache_pool.empty(): + connection_cache = connection_cache_pool.get() + connection_cache.close_connections() + + import threading + _lock = threading.Lock() + def checkstatus(arg): (tid, sstatefile) = arg + connection_cache = connection_cache_pool.get() + localdata2 = bb.data.createCopy(localdata) srcuri = "file://" + sstatefile localdata2.setVar('SRC_URI', srcuri) @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, try: fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2, - connection_cache=thread_worker.connection_cache) + connection_cache=connection_cache) fetcher.checkstatus() bb.debug(2, "SState: Successful fetch test for %s" % srcuri) found.add(tid) @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, except Exception as e: bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc())) + connection_cache_pool.put(connection_cache) + if progress: - bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d) + with _lock: + tasks -= 1 + bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d) tasklist = [] for tid in missed: @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, if tasklist: nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist)) + tasks = len(tasklist) progress = len(tasklist) >= 100 if progress: msg = "Checking sstate mirror object availability" @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, fetcherenv = bb.fetch2.get_fetcher_environment(d) with bb.utils.environment(**fetcherenv): bb.event.enable_threadlock() - pool = oe.utils.ThreadedPool(nproc, len(tasklist), - worker_init=checkstatus_init, worker_end=checkstatus_end, - name="sstate_checkhashes-") - for t in tasklist: - pool.add_task(checkstatus, t) - pool.start() - pool.wait_completion() + import concurrent.futures + from queue import Queue + connection_cache_pool = Queue(nproc) + checkstatus_init() + with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor: + executor.map(checkstatus, tasklist) + checkstatus_end() bb.event.disable_threadlock() if progress: -- 2.35.3 |
|
Jose Quaresma
Hi, One thing that yet doesn't work and I don't know why is the bitbake debug messages inside the ThreadPoolExecutor. Jose Quaresma via lists.openembedded.org <quaresma.jose=gmail.com@...> escreveu no dia sábado, 16/04/2022 à(s) 21:24: for the FetchConnectionCache use a queue where each thread can --
Best regards, José Quaresma |
|
Richard Purdie
On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
for the FetchConnectionCache use a queue where each thread canAre there specific issues you see with oe.utils.ThreadedPool that this change addresses? Were you able to reproduce the issue in 14775? I'm a little concerned we swap one implementation where we know roughly what the issues are for another where we dont :/. I notice that ThreadPoolExecutor can take an initializer but you're doing this using the queue instead. Is that because you suspect some issue with those being setup in the separate threads? You also mentioned the debug messages not showing. That suggests something is wrong with the event handlers in the new threading model and that errors wouldn't propagate either so we need to check into that. This is definitely an interesting idea but I'm nervous about it :/. Cheers, Richard diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass |
|
Jose Quaresma
Richard Purdie <richard.purdie@...> escreveu no dia sábado, 16/04/2022 à(s) 22:57: On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote: Looking deeper while testing the patch I think I found another bug in the sstate mirror handling. The python set() is not thread safe and we use it inside the thread pool, I added a new python set class for that in my V2 I don't know if it is related to 14775 but it can be, I can't reproduce the 14775 on my side, maybe it's better to remove the 14775 mention from my commits, what do you think?
I think there some issues on ThreadedPool in the worker_init and worker_end, this functions is called in all workers and it seems to me that the right thing to do is calling and reuse the previous ones connection_cache otherwise the connection_cache does nothing.
I am using a queue as it is the easy way I find for reusing the FetchConnectionCache.
This is fixed in V2
It would be interesting if you could test it in the autobuilder. On my side it is working well now, I will send a V2 Jose
Best regards, José Quaresma |
|
Richard Purdie
On Sat, 2022-04-16 at 23:27 +0100, Jose Quaresma wrote:
That might explain things. I don't know if it is related to 14775 but it can be, I can't reproduce theI think you shouldn't say it fixes it as we simply don't know that. It may be related to so perhaps say that instead. It creates a connection_cache in each thread that is created. Once created in aI'm a little concerned we swap one implementation where we know roughly whatI think there some issues on ThreadedPool in the worker_init and worker_end, given thread, that connection cache is reused there? I'm not sure you can say it does nothing? I think that piece of code was working already?
What was the issue out of interest? The challenge with autobuilder testing of this is that there is only a small portion of the autobuilder tests which exercise this code (testsdkext for images). The current issues only occur intermittently so it is hard to know if any given change fixes anything (or introduces a new race). One more interesting test which may more quickly find issues would be to make everything use the http mirror on the autobuilder I guess. We'd need to figure out the configuration for that though. Cheers, Richard |
|
Jose Quaresma
Richard Purdie <richard.purdie@...> escreveu no dia domingo, 17/04/2022 à(s) 08:57: On Sat, 2022-04-16 at 23:27 +0100, Jose Quaresma wrote: I will do that.
I may have misunderstood this part and you may be right. I have to re-analyze more carefully.
it may be working fine on OE ThreadPool and I misunderstood that
I don't know but it starts working when I add the thread safe collections.
What you mean by this is that there are builds on the autobuilder that uses the sstate mirror and others that use some shared sstate cache filesystem? As you previously said and as the sstate mirror is available for the community, I think and I will try to add some tests for that. I still don't know how to do it but I'll think about it. Another thing about this RFC series is that I think I need to do it in a way that it can be backported for dunfell if we need to do that. I will spend more time on this during this week. Thanks for your always valuable comments. Jose
Best regards, José Quaresma |
|