defmain(): """ Inspired from https://github.com/grpc/grpc/blob/master/examples/python/multiprocessing/server.py """ logger.info(f"Initializing server with {NUM_WORKERS} workers") with _reserve_port() as port: bind_address = f"[::]:{port}" logger.info(f"Binding to {bind_address}") sys.stdout.flush() workers = [] for _ inrange(NUM_WORKERS): worker = multiprocessing.Process(target=_run_server, args=(bind_address,)) worker.start() workers.append(worker) for worker in workers: worker.join()
@contextlib.contextmanager def_reserve_port(): """Find and reserve a port for all subprocesses to use""" sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: raise RuntimeError("Failed to set SO_REUSEPORT.") sock.bind(("", 13000)) try: yield sock.getsockname()[1] finally: sock.close()
defcompute_detections(batch: tp.List[bytes]) -> tp.List[str]: """ Start a pool of process to parallelize data processing across several workers. Args: batch: a list of images. Returns: the list of detected texts. """ server_address = "server:13000" with multiprocessing.Pool( processes=NUM_CLIENTS, initializer=_initialize_worker, initargs=(server_address,), ) as worker_pool: ocr_results = worker_pool.map( _run_worker_query, [pickle.dumps(img) for img in batch] ) return [txt for txt in ocr_results]
def_shutdown_worker(): """ Close the open gRPC channel. Returns: None """ if _worker_channel_singleton isnotNone: _worker_channel_singleton.stop()
单个 RPC 调用 _run_worker_query()
1 2 3 4 5 6 7 8 9 10 11 12
def_run_worker_query(img: bytes) -> str: """ Execute the call to the gRPC server. Args: img (bytes): bytes representation of the image Returns: detected text on the image """ response: image_ocr_pb2.OcrResult = _worker_stub_singleton.Detect( image_ocr_pb2.OcrCandidate(image=img) ) return response.text