Hi,
One thing that yet doesn't work and I don't know why is the bitbake debug messages inside the ThreadPoolExecutor.
Jose
toggle quoted messageShow quoted text
for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.
for the progress bar we need an adictional task counter
that is protected with thread lock as it runs inside the
ThreadPoolExecutor.
Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
Signed-off-by: Jose Quaresma <quaresma.jose@...>
---
meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
1 file changed, 28 insertions(+), 16 deletions(-)
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..0ede078770 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
localdata.delVar('BB_NO_NETWORK')
from bb.fetch2 import FetchConnectionCache
- def checkstatus_init(thread_worker):
- thread_worker.connection_cache = FetchConnectionCache()
-
- def checkstatus_end(thread_worker):
- thread_worker.connection_cache.close_connections()
-
- def checkstatus(thread_worker, arg):
+ def checkstatus_init():
+ while not connection_cache_pool.full():
+ connection_cache_pool.put(FetchConnectionCache())
+
+ def checkstatus_end():
+ while not connection_cache_pool.empty():
+ connection_cache = connection_cache_pool.get()
+ connection_cache.close_connections()
+
+ import threading
+ _lock = threading.Lock()
+ def checkstatus(arg):
(tid, sstatefile) = arg
+ connection_cache = connection_cache_pool.get()
+
localdata2 = bb.data.createCopy(localdata)
srcuri = "file://" + sstatefile
localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
try:
fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
- connection_cache=thread_worker.connection_cache)
+ connection_cache=connection_cache)
fetcher.checkstatus()
bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
found.add(tid)
@@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
except Exception as e:
bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))
+ connection_cache_pool.put(connection_cache)
+
if progress:
- bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
+ with _lock:
+ tasks -= 1
+ bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)
tasklist = []
for tid in missed:
@@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
if tasklist:
nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))
+ tasks = len(tasklist)
progress = len(tasklist) >= 100
if progress:
msg = "Checking sstate mirror object availability"
@@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
fetcherenv = bb.fetch2.get_fetcher_environment(d)
with bb.utils.environment(**fetcherenv):
bb.event.enable_threadlock()
- pool = oe.utils.ThreadedPool(nproc, len(tasklist),
- worker_init=checkstatus_init, worker_end=checkstatus_end,
- name="sstate_checkhashes-")
- for t in tasklist:
- pool.add_task(checkstatus, t)
- pool.start()
- pool.wait_completion()
+ import concurrent.futures
+ from queue import Queue
+ connection_cache_pool = Queue(nproc)
+ checkstatus_init()
+ with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
+ executor.map(checkstatus, tasklist)
+ checkstatus_end()
bb.event.disable_threadlock()
if progress:
--
2.35.3
-- Best regards, José Quaresma
|