zkevm-node 代码分析
git checkout https://github.com/0xPolygonHermez/zkevm-node.git
下载代码,之后列出目录
运行测试部署
#cd test && sudo make run
其中zkevm-json-rpc、zkevm-l2gaspricer、zkevm-aggregator、zkevm-sequence-sender、zkevm-sequencer、zkevm-eth-tx-manager。均由zkevm-node代码作为基础镜像。
并发现 zkevm-node是软件架构使用docker-compose做服务堆栈管理,并创建虚拟网络zkevm(桥接)。
根据需求,目前只需要知道zkevm-node中aggregator通讯方式的实现细节。
根据docker-compose.yml 可知
zkevm-aggregator: container_name: zkevm-aggregator image: zkevm-node ports: - 50081:50081 - 9093:9091 # needed if metrics enabled environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_AGGREGATOR_SENDER_ADDRESS=0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266 volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json command: - "/bin/sh" - "-c" - "/app/zkevm-node run --network custom --custom-network-file /app/genesis.json --cfg /app/config.toml --components aggregator
使用test.node.config.toml对zkevm-aggregator进行配置。 再来看看test.node.config.toml。
[Executor]
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000
可以看到aggregator使用prover的executor 服务,而最终是executor做聚合。
源码片段:https://github.com/0xPolygonHermez/zkevm-prover/blob/main/src/service/executor/executor_service.cpp
..... //从zkevm-node 中的 aggregator接收GRPC服务消息ProcessBatch ::grpc::Status ExecutorServiceImpl::ProcessBatch(::grpc::ServerContext* context, const ::executor::v1::ProcessBatchRequest* request, ::executor::v1::ProcessBatchResponse* response) { // If the process is exiting, do not start new activities if (bExitingProcess) { return Status::CANCELLED; } //TimerStart(EXECUTOR_PROCESS_BATCH); struct timeval EXECUTOR_PROCESS_BATCH_start; gettimeofday(&EXECUTOR_PROCESS_BATCH_start,NULL); #ifdef LOG_SERVICE zklog.info("ExecutorServiceImpl::ProcessBatch() got request:\n" + request->DebugString()); #endif #ifdef LOG_TIME lock(); if ( (firstTotalTime.tv_sec == 0) && (firstTotalTime.tv_usec == 0) ) { gettimeofday(&firstTotalTime, NULL); lastTotalTime = firstTotalTime; } unlock(); #endif // 创建了一个processBatch 类别的 ProverRequest ProverRequest proverRequest(fr, config, prt_processBatch); // Save request to file if (config.saveRequestToFile) { string2file(request->DebugString(), proverRequest.filePrefix + "executor_request.txt"); } // Get external request ID proverRequest.contextId = request->context_id(); ...... //prover处理ProverRequest prover.processBatch(&proverRequest); //TimerStart(EXECUTOR_PROCESS_BATCH_BUILD_RESPONSE); if (proverRequest.result != ZKR_SUCCESS) { zklog.error("ExecutorServiceImpl::ProcessBatch() detected proverRequest.result=" + to_string(proverRequest.result) + "=" + zkresult2string(proverRequest.result), &proverRequest.tags); } //这里设置 经过prover处理的proverRequest 返回结果,并设置到response上。 response->set_error(zkresult2error(proverRequest.result)); //如下:设置真实累计使用的gas费用 response->set_cumulative_gas_used(proverRequest.pFullTracer->get_cumulative_gas_used()); ...... // Calculate the throughput, for this ProcessBatch call, and for all calls //这里计算执行时间和数据大小,并汇总到日志中,不开启LOG_TIME不计算。 #ifdef LOG_TIME lock(); counter++; uint64_t execGas = response->cumulative_gas_used(); totalGas += execGas; uint64_t execBytes = request->batch_l2_data().size(); totalBytes += execBytes; uint64_t execTX = responses.size(); totalTX += execTX; double execTime = double(TimeDiff(EXECUTOR_PROCESS_BATCH_start, EXECUTOR_PROCESS_BATCH_stop))/1000000; totalTime += execTime; struct timeval now; gettimeofday(&now, NULL); double timeSinceLastTotal = zkmax(1, double(TimeDiff(lastTotalTime, now))/1000000); if (timeSinceLastTotal >= 10.0) { totalTPG = double(totalGas - lastTotalGas)/timeSinceLastTotal; totalTPB = double(totalBytes - lastTotalBytes)/timeSinceLastTotal; totalTPTX = double(totalTX - lastTotalTX)/timeSinceLastTotal; lastTotalGas = totalGas; lastTotalBytes = totalBytes; lastTotalTX = totalTX; lastTotalTime = now; } double timeSinceFirstTotal = zkmax(1, double(TimeDiff(firstTotalTime, now))/1000000); double TPG = double(totalGas)/timeSinceFirstTotal; double TPB = double(totalBytes)/timeSinceFirstTotal; double TPTX = double(totalTX)/timeSinceFirstTotal; uint64_t nfd = getNumberOfFileDescriptors(); zklog.info("ExecutorServiceImpl::ProcessBatch() done counter=" + to_string(counter) + " B=" + to_string(execBytes) + " TX=" + to_string(execTX) + " gas=" + to_string(execGas) + " time=" + to_string(execTime) + " TP=" + to_string(double(execBytes)/execTime) + "B/s=" + to_string(double(execTX)/execTime) + "TX/s=" + to_string(double(execGas)/execTime) + "gas/s=" + to_string(double(execGas)/double(execBytes)) + "gas/B" + " totalTP(10s)=" + to_string(totalTPB) + "B/s=" + to_string(totalTPTX) + "TX/s=" + to_string(totalTPG) + "gas/s=" + to_string(totalTPG/zkmax(1,totalTPB)) + "gas/B" + " totalTP(ever)=" + to_string(TPB) + "B/s=" + to_string(TPTX) + "TX/s=" + to_string(TPG) + "gas/s=" + to_string(TPG/zkmax(1,TPB)) + "gas/B" + " totalTime=" + to_string(totalTime) + " filedesc=" + to_string(nfd), &proverRequest.tags); // If the TP in gas/s is < threshold, log the input, unless it has been done before if (!config.logExecutorServerInput && (config.logExecutorServerInputGasThreshold > 0) && ((double(execGas)/execTime) < config.logExecutorServerInputGasThreshold)) { json inputJson; proverRequest.input.save(inputJson); string inputJsonString = inputJson.dump(); replace(inputJsonString.begin(), inputJsonString.end(), '"', '\''); zklog.info("TP=" + to_string(double(execGas)/execTime) + "gas/s Input=" + inputJsonString, &proverRequest.tags); } unlock(); #endif return Status::OK; }
源码片段 https://github.com/0xPolygonHermez/zkevm-prover/blob/main/src/prover/prover.cpp
void Prover::processBatch(ProverRequest *pProverRequest) { //TimerStart(PROVER_PROCESS_BATCH); zkassert(pProverRequest != NULL); zkassert(pProverRequest->type == prt_processBatch); if (config.runAggregatorClient) { zklog.info("Prover::processBatch() timestamp=" + pProverRequest->timestamp + " UUID=" + pProverRequest->uuid); } // Save input to <timestamp>.input.json, as provided by client if (config.saveInputToFile) { json inputJson; pProverRequest->input.save(inputJson); json2file(inputJson, pProverRequest->inputFile()); } // Log input if requested if (config.logExecutorServerInput) { json inputJson; pProverRequest->input.save(inputJson); zklog.info("Input=" + inputJson.dump()); } // Execute the program, in the process batch way // 这里调用进入executor (main_sm状态机处理)处理process_batch executor.process_batch(*pProverRequest); // Save input to <timestamp>.input.json after execution including dbReadLog if (config.saveDbReadsToFile) { json inputJsonEx; pProverRequest->input.save(inputJsonEx, *pProverRequest->dbReadLog); json2file(inputJsonEx, pProverRequest->inputDbFile()); } //TimerStopAndLog(PROVER_PROCESS_BATCH); }
至此aggregator处理聚合L1数据的过程分析结束,分析程序运行时的时序图如下: