zkevm-node 代码分析
git checkout https://github.com/0xPolygonHermez/zkevm-node.git
下载代码,之后列出目录
运行测试部署
#cd test && sudo make run
其中zkevm-json-rpc、zkevm-l2gaspricer、zkevm-aggregator、zkevm-sequence-sender、zkevm-sequencer、zkevm-eth-tx-manager。均由zkevm-node代码作为基础镜像。
并发现 zkevm-node是软件架构使用docker-compose做服务堆栈管理,并创建虚拟网络zkevm(桥接)。
根据需求,目前只需要知道zkevm-node中aggregator通讯方式的实现细节。
根据docker-compose.yml 可知
zkevm-aggregator:
container_name: zkevm-aggregator
image: zkevm-node
ports:
- 50081:50081
- 9093:9091 # needed if metrics enabled
environment:
- ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db
- ZKEVM_NODE_AGGREGATOR_SENDER_ADDRESS=0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266
volumes:
- ./config/test.node.config.toml:/app/config.toml
- ./config/test.genesis.config.json:/app/genesis.json
command:
- "/bin/sh"
- "-c"
- "/app/zkevm-node run --network custom --custom-network-file /app/genesis.json --cfg /app/config.toml --components aggregator
使用test.node.config.toml对zkevm-aggregator进行配置。 再来看看test.node.config.toml。
[Executor]
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000
可以看到aggregator使用prover的executor 服务,而最终是executor做聚合。
源码片段:https://github.com/0xPolygonHermez/zkevm-prover/blob/main/src/service/executor/executor_service.cpp
.....
//从zkevm-node 中的 aggregator接收GRPC服务消息ProcessBatch
::grpc::Status ExecutorServiceImpl::ProcessBatch(::grpc::ServerContext* context, const ::executor::v1::ProcessBatchRequest* request, ::executor::v1::ProcessBatchResponse* response)
{
// If the process is exiting, do not start new activities
if (bExitingProcess)
{
return Status::CANCELLED;
}
//TimerStart(EXECUTOR_PROCESS_BATCH);
struct timeval EXECUTOR_PROCESS_BATCH_start;
gettimeofday(&EXECUTOR_PROCESS_BATCH_start,NULL);
#ifdef LOG_SERVICE
zklog.info("ExecutorServiceImpl::ProcessBatch() got request:\n" + request->DebugString());
#endif
#ifdef LOG_TIME
lock();
if ( (firstTotalTime.tv_sec == 0) && (firstTotalTime.tv_usec == 0) )
{
gettimeofday(&firstTotalTime, NULL);
lastTotalTime = firstTotalTime;
}
unlock();
#endif
// 创建了一个processBatch 类别的 ProverRequest
ProverRequest proverRequest(fr, config, prt_processBatch);
// Save request to file
if (config.saveRequestToFile)
{
string2file(request->DebugString(), proverRequest.filePrefix + "executor_request.txt");
}
// Get external request ID
proverRequest.contextId = request->context_id();
......
//prover处理ProverRequest
prover.processBatch(&proverRequest);
//TimerStart(EXECUTOR_PROCESS_BATCH_BUILD_RESPONSE);
if (proverRequest.result != ZKR_SUCCESS)
{
zklog.error("ExecutorServiceImpl::ProcessBatch() detected proverRequest.result=" + to_string(proverRequest.result) + "=" + zkresult2string(proverRequest.result), &proverRequest.tags);
}
//这里设置 经过prover处理的proverRequest 返回结果,并设置到response上。
response->set_error(zkresult2error(proverRequest.result));
//如下:设置真实累计使用的gas费用
response->set_cumulative_gas_used(proverRequest.pFullTracer->get_cumulative_gas_used());
......
// Calculate the throughput, for this ProcessBatch call, and for all calls
//这里计算执行时间和数据大小,并汇总到日志中,不开启LOG_TIME不计算。
#ifdef LOG_TIME
lock();
counter++;
uint64_t execGas = response->cumulative_gas_used();
totalGas += execGas;
uint64_t execBytes = request->batch_l2_data().size();
totalBytes += execBytes;
uint64_t execTX = responses.size();
totalTX += execTX;
double execTime = double(TimeDiff(EXECUTOR_PROCESS_BATCH_start, EXECUTOR_PROCESS_BATCH_stop))/1000000;
totalTime += execTime;
struct timeval now;
gettimeofday(&now, NULL);
double timeSinceLastTotal = zkmax(1, double(TimeDiff(lastTotalTime, now))/1000000);
if (timeSinceLastTotal >= 10.0)
{
totalTPG = double(totalGas - lastTotalGas)/timeSinceLastTotal;
totalTPB = double(totalBytes - lastTotalBytes)/timeSinceLastTotal;
totalTPTX = double(totalTX - lastTotalTX)/timeSinceLastTotal;
lastTotalGas = totalGas;
lastTotalBytes = totalBytes;
lastTotalTX = totalTX;
lastTotalTime = now;
}
double timeSinceFirstTotal = zkmax(1, double(TimeDiff(firstTotalTime, now))/1000000);
double TPG = double(totalGas)/timeSinceFirstTotal;
double TPB = double(totalBytes)/timeSinceFirstTotal;
double TPTX = double(totalTX)/timeSinceFirstTotal;
uint64_t nfd = getNumberOfFileDescriptors();
zklog.info("ExecutorServiceImpl::ProcessBatch() done counter=" + to_string(counter) + " B=" + to_string(execBytes) + " TX=" + to_string(execTX) + " gas=" + to_string(execGas) + " time=" + to_string(execTime) +
" TP=" + to_string(double(execBytes)/execTime) + "B/s=" + to_string(double(execTX)/execTime) + "TX/s=" + to_string(double(execGas)/execTime) + "gas/s=" + to_string(double(execGas)/double(execBytes)) + "gas/B" +
" totalTP(10s)=" + to_string(totalTPB) + "B/s=" + to_string(totalTPTX) + "TX/s=" + to_string(totalTPG) + "gas/s=" + to_string(totalTPG/zkmax(1,totalTPB)) + "gas/B" +
" totalTP(ever)=" + to_string(TPB) + "B/s=" + to_string(TPTX) + "TX/s=" + to_string(TPG) + "gas/s=" + to_string(TPG/zkmax(1,TPB)) + "gas/B" +
" totalTime=" + to_string(totalTime) +
" filedesc=" + to_string(nfd),
&proverRequest.tags);
// If the TP in gas/s is < threshold, log the input, unless it has been done before
if (!config.logExecutorServerInput && (config.logExecutorServerInputGasThreshold > 0) && ((double(execGas)/execTime) < config.logExecutorServerInputGasThreshold))
{
json inputJson;
proverRequest.input.save(inputJson);
string inputJsonString = inputJson.dump();
replace(inputJsonString.begin(), inputJsonString.end(), '"', '\'');
zklog.info("TP=" + to_string(double(execGas)/execTime) + "gas/s Input=" + inputJsonString, &proverRequest.tags);
}
unlock();
#endif
return Status::OK;
}
源码片段 https://github.com/0xPolygonHermez/zkevm-prover/blob/main/src/prover/prover.cpp
void Prover::processBatch(ProverRequest *pProverRequest)
{
//TimerStart(PROVER_PROCESS_BATCH);
zkassert(pProverRequest != NULL);
zkassert(pProverRequest->type == prt_processBatch);
if (config.runAggregatorClient)
{
zklog.info("Prover::processBatch() timestamp=" + pProverRequest->timestamp + " UUID=" + pProverRequest->uuid);
}
// Save input to <timestamp>.input.json, as provided by client
if (config.saveInputToFile)
{
json inputJson;
pProverRequest->input.save(inputJson);
json2file(inputJson, pProverRequest->inputFile());
}
// Log input if requested
if (config.logExecutorServerInput)
{
json inputJson;
pProverRequest->input.save(inputJson);
zklog.info("Input=" + inputJson.dump());
}
// Execute the program, in the process batch way
// 这里调用进入executor (main_sm状态机处理)处理process_batch
executor.process_batch(*pProverRequest);
// Save input to <timestamp>.input.json after execution including dbReadLog
if (config.saveDbReadsToFile)
{
json inputJsonEx;
pProverRequest->input.save(inputJsonEx, *pProverRequest->dbReadLog);
json2file(inputJsonEx, pProverRequest->inputDbFile());
}
//TimerStopAndLog(PROVER_PROCESS_BATCH);
}
至此aggregator处理聚合L1数据的过程分析结束,分析程序运行时的时序图如下:

