springcloud之eureka源码分析

Eureka是什么?

Eureka is a REST (Representational State Transfer) based service that is primarily used in the AWS cloud for locating services for the purpose of load balancing and failover of middle-tier servers. We call this service, the Eureka Server. Eureka also comes with a Java-based client component,the Eureka Client, which makes interactions with the service much easier. The client also has a built-in load balancer that does basic round-robin load balancing. At Netflix, a much more sophisticated load balancer wraps Eureka to provide weighted load balancing based on several factors like traffic, resource usage, error conditions etc to provide superior resiliency.

Eureka是服务注册和发现的开源框架

Eureka的优势

  • 提供了完整的服务注册和服务查找功能
  • SpringBoot的紧密结合

Eureka架构

eureka_architecture

eureka的基本架构由三个角色组成:

  1. Eureka Server

    • 提供服务注册和发现
  2. Service Provider

    • 服务提供方
    • 将自身服务注册到Eureka,从而使服务消费方能够找到
  3. Service Consumer

    • 服务消费方
    • 从Eureka获取注册服务列表, 从而能够消费服务

architecture-detail

  • ServiceProvider会向Eureka Server做Register(服务注册)、Renewal(服务续约)、Cancellation(服务下线)等操作
  • Eureka Server之间会做注册服务的同步,从而保证状态的同一
  • Service Consumer会向Eureka Server获取服务列表,并消费服务

服务注册

我们Application使用@EnableDiscoveryClient来进行注册,并且在application.properties中指定服务注册中心的位置

1
2
3
4
5
6
7
8
9
10
11
12
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

/**
* If true, the ServiceRegistry will automatically register the local server.
*/
boolean autoRegister() default true;
}

该注解用来开启DiscoveryClient实例

eureka_uml_01

从上图中可以看出服务发现的结构图,左侧是netflix的框架实现部分。

服务列表

我们先找到eureka server url列表配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Get the list of all eureka service urls from properties file for the eureka client to talk to.
*
* @param clientConfig the clientConfig to use
* @param instanceZone The zone in which the client resides
* @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
* @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order
*/
public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
Map<String, List<String>> orderedUrls = new LinkedHashMap<>();
String region = getRegion(clientConfig);
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
if (availZones == null || availZones.length == 0) {
availZones = new String[1];
availZones[0] = DEFAULT_ZONE;
}
logger.debug("The availability zone for the given region {} are {}", region, availZones);
int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);

String zone = availZones[myZoneOffset];
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
if (serviceUrls != null) {
orderedUrls.put(zone, serviceUrls);
}
int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
while (currentOffset != myZoneOffset) {
zone = availZones[currentOffset];
serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
if (serviceUrls != null) {
orderedUrls.put(zone, serviceUrls);
}
if (currentOffset == (availZones.length - 1)) {
currentOffset = 0;
} else {
currentOffset++;
}
}

if (orderedUrls.size() < 1) {
throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
}
return orderedUrls;
}

通过region获取zone,默认为default zone。根据zone获取service列表。

默认使用defaultZone。也可以使用eureka.client.region来配置zone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public List<String> getEurekaServerServiceUrls(String myZone) {
String serviceUrls = configInstance.getStringProperty(
namespace + CONFIG_EUREKA_SERVER_SERVICE_URL_PREFIX + "." + myZone, null).get();
if (serviceUrls == null || serviceUrls.isEmpty()) {
serviceUrls = configInstance.getStringProperty(
namespace + CONFIG_EUREKA_SERVER_SERVICE_URL_PREFIX + ".default", null).get();

}
if (serviceUrls != null) {
return Arrays.asList(serviceUrls.split(","));
}

return new ArrayList<String>();
}

先从指定的zone获取服务列表,如果为空,获取默认服务列表

服务注册

在DiscoveryClient的构造方法中可以找到各种初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);

// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}

@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}

如果需要获取注册信息,使用定时器刷新注册缓存。如果要注册服务,定时器定时发送心跳来续约。创建服务信息实例,其会执行定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
InstanceInfoReplicator
public void run() {
try {
discoveryClient.refreshInstanceInfo();

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

刷新实例信息。更新服务注册,如果注册时间过期,重新注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}

使用EurekaHttpClient注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}

使用post请求完成服务注册

服务获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}

void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}

boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
}
}

获取最新的region,然后根据最新的region获取最新的注册列表
服务续约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 /**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}

发送POST心跳来续约

服务注册中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}

// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}

registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}

对注册信息进行校验,然后使用PeerAwareInstanceRegistry来进行注册

1
2
3
4
5
6
7
8
9
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

检查租赁时间,注册指定时长的服务。同步信息到各个服务节点

总结

整个eureka的服务注册和发现内部都是用REST方式来进行通讯以达到信息同步的目的