In modern data-driven enterprises, a workflow scheduling system is the "central nervous system" of the data pipeline. From ETL tasks to machine learning training, from report generation to real-time monitoring, almost all critical business processes rely on a stable, efficient, and scalable scheduling engine.
The author believes that Apache DolphinScheduler 3.1.9 is a stable and widely used version, so this article focuses on this version, analyzing the relevant processes related to the Master service startup, deeply exploring its core source code, architectural design, module division, and key implementation mechanisms. The goal is to help developers understand how the Master works and lay a foundation for further secondary development or performance optimization.
This series of articles is divided into three parts: the Master Server startup process, the Worker server startup process, and related process diagrams. This is the first part.
org.apache.dolphinscheduler.server.master.MasterServer#run public void run() throws SchedulerException { // 1. Initialize rpc server this.masterRPCServer.start(); // 2. Install task plugin this.taskPluginManager.loadPlugin(); // 3. Self-tolerant this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); // 4. Master scheduling this.masterSchedulerBootstrap.init(); this.masterSchedulerBootstrap.start(); // 5. Event execution service this.eventExecuteService.start(); // 6. Fault tolerance mechanism this.failoverExecuteThread.start(); // 7. Quartz scheduling this.schedulerApi.start(); ... }
org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer#start public void start() { ... // Task execution request processor this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); // Task execution result request processor this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor); // Task termination request processor this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); ... this.nettyRemotingServer.start(); logger.info("Started Master RPC Server..."); }
org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient#startpublic void start() { try { this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient); // 1. Register itself to the registry; registry(); // 2. Listen to the connection state with the registry; registryClient.addConnectionStateListener( new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy)); // 3. Listen to the status of other masters and workers in the registry and perform fault-tolerant work registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { throw new RegistryException("Master registry client start up error", e); } }
command table in the database and performs different operations based on command types. This is the core logic for workflow startup, instance fault tolerance, etc.org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#runpublic void run() { while (!ServerLifeCycleManager.isStopped()) { try { // If the server is not in running status, it cannot consume commands if (!ServerLifeCycleManager.isRunning()) { logger.warn("The current server {} is not at running status, cannot consume commands.", this.masterAddress); Thread.sleep(Constants.SLEEP_TIME_MILLIS); } // Handle workload overload (CPU/memory) boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); if (isOverload) { logger.warn("The current server {} is overload, cannot consume commands.", this.masterAddress); MasterServerMetrics.incMasterOverload(); Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } // Get commands from the database List<Command> commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } // Convert commands to process instances and handle the workflow logic List<ProcessInstance> processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } // Handle workflow instance execution processInstances.forEach(processInstance -> { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, ...); processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }); } catch (InterruptedException interruptedException) { logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException); Thread.currentThread().interrupt(); break; } catch (Exception e) { logger.error("Master schedule workflow error", e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } }
org.apache.dolphinscheduler.server.master.runner.EventExecuteService#runpublic void run() { while (!ServerLifeCycleManager.isStopped()) { try { // Handle workflow execution events workflowEventHandler(); // Handle stream task execution events streamTaskEventHandler(); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); } ... } }
org.apache.dolphinscheduler.server.master.service.MasterFailoverService#checkMasterFailover public void checkMasterFailover() { List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost() .stream() .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER)) .distinct() .collect(Collectors.toList()); if (CollectionUtils.isEmpty(needFailoverMasterHosts)) { return; } for (String needFailoverMasterHost : needFailoverMasterHosts) { failoverMaster(needFailoverMasterHost); } }
The article provides an in-depth look at the Apache DolphinScheduler 3.1.9 Master service startup process, fault tolerance mechanisms, and the overall architecture. Further articles will explore the Worker startup process and interactions between Master and Worker.


