So, while perusing lmdb.h I saw this:
Use an MDB_env* in the process which opened it, not after fork().
That was my problem. It had nothing to do with transactions. I need to open the environment and hold a separate handle in each process. Right?
Moving the mdb_env_create and mdb_env_open functions and related variables inside the get_() function got rid of the errors. In my dummy code below that is obviously terribly expensive, but my real application is running a pool of long-lived workers (HTTP server) and the cost of opening the environments only happens at server start.
Thanks, Stefano
On 08/19/2018 10:36 AM, Stefano Cossu wrote:
Hello, I am writing a framework in Cython using multi-process access to an LMDB database. I ran into a "Resource not available" error.
I isolated the problem in the following script (this is Cython calling the LMDB C API, so bear with the hybrid syntax):
import time cimport cylmdb as lmdb # This is a Cython header mirroring lmdb.h
import multiprocessing import threading
cdef: lmdb.MDB_env *env lmdb.MDB_dbi dbi
cdef void _check(int rc) except *: if rc != lmdb.MDB_SUCCESS: out_msg = 'LMDB Error ({}): {}'.format( rc, lmdb.mdb_strerror(rc).decode()) raise RuntimeError(out_msg)
cpdef void get_() except *: cdef: lmdb.MDB_txn *txn lmdb.MDB_val key_v, data_v
_check(lmdb.mdb_txn_begin(env, NULL, lmdb.MDB_RDONLY, &txn))
key_v.mv_data = b'a' key_v.mv_size = 1
_check(lmdb.mdb_get(txn, dbi, &key_v, &data_v)) print((<unsigned char *>data_v.mv_data)[:data_v.mv_size]) time.sleep(1) _check(lmdb.mdb_txn_commit(txn)) print('Thread {} in process {} done.'.format( threading.currentThread().getName(), multiprocessing.current_process().name))
def run(): cdef: unsigned int flags = 0 #unsigned int flags = lmdb.MDB_NOTLS # I tried this too. lmdb.MDB_txn *wtxn lmdb.MDB_val key_v, data_v
# Set up environment. _check(lmdb.mdb_env_create(&env)) _check(lmdb.mdb_env_set_maxreaders(env, 128)) _check(lmdb.mdb_env_open(env, '/tmp/test_mp', flags, 0o644))
# Create DB. _check(lmdb.mdb_txn_begin(env, NULL, 0, &wtxn)) _check(lmdb.mdb_dbi_open(wtxn, NULL, lmdb.MDB_CREATE, &dbi))
# Write something. key_v.mv_data = b'a' key_v.mv_size = 1 ts = str(time.time()).encode() data_v.mv_data = <unsigned char *>ts data_v.mv_size = len(ts) _check(lmdb.mdb_put(wtxn, dbi, &key_v, &data_v, 0)) _check(lmdb.mdb_txn_commit(wtxn))
print('Multiprocess jobs:') for i in range(5): multiprocessing.Process(target=get_).start() # Env should be closed only after all processes return. #lmdb.mdb_env_close(env)
If I execute run(), the first process runs successfully, but apparently it's holding on to some resources that the other processes need.
Multiprocess jobs: b'1534691578.4300401' Process Process-2: Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "lakesuperior/sandbox/threading_poc.pyx", line 23, in lakesuperior.sandbox.threading_poc.get_ cpdef void get_() except *: File "lakesuperior/sandbox/threading_poc.pyx", line 28, in lakesuperior.sandbox.threading_poc.get_ _check(lmdb.mdb_txn_begin(env, NULL, lmdb.MDB_RDONLY, &txn)) File "lakesuperior/sandbox/threading_poc.pyx", line 20, in lakesuperior.sandbox.threading_poc._check raise RuntimeError(out_msg) RuntimeError: LMDB Error (11): Resource temporarily unavailable Process Process-3: Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "lakesuperior/sandbox/threading_poc.pyx", line 23, in lakesuperior.sandbox.threading_poc.get_ cpdef void get_() except *: File "lakesuperior/sandbox/threading_poc.pyx", line 28, in lakesuperior.sandbox.threading_poc.get_ _check(lmdb.mdb_txn_begin(env, NULL, lmdb.MDB_RDONLY, &txn)) File "lakesuperior/sandbox/threading_poc.pyx", line 20, in lakesuperior.sandbox.threading_poc._check raise RuntimeError(out_msg) RuntimeError: LMDB Error (11): Resource temporarily unavailable Process Process-4:
Process Process-5: Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "lakesuperior/sandbox/threading_poc.pyx", line 23, in lakesuperior.sandbox.threading_poc.get_ cpdef void get_() except *: Traceback (most recent call last): File "lakesuperior/sandbox/threading_poc.pyx", line 28, in lakesuperior.sandbox.threading_poc.get_ _check(lmdb.mdb_txn_begin(env, NULL, lmdb.MDB_RDONLY, &txn)) File "lakesuperior/sandbox/threading_poc.pyx", line 20, in lakesuperior.sandbox.threading_poc._check raise RuntimeError(out_msg) File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "lakesuperior/sandbox/threading_poc.pyx", line 23, in lakesuperior.sandbox.threading_poc.get_ cpdef void get_() except *: RuntimeError: LMDB Error (11): Resource temporarily unavailable File "lakesuperior/sandbox/threading_poc.pyx", line 28, in lakesuperior.sandbox.threading_poc.get_ _check(lmdb.mdb_txn_begin(env, NULL, lmdb.MDB_RDONLY, &txn)) File "lakesuperior/sandbox/threading_poc.pyx", line 20, in lakesuperior.sandbox.threading_poc._check raise RuntimeError(out_msg) RuntimeError: LMDB Error (11): Resource temporarily unavailable
If I run the same function multiple times on the same process, everything is fine.
Can someone point out what is wrong with this script?
Thank you, Stefano