Merge pull request #251 from yin1999/xds-server

feat(xds): add missing resources and implement ingress proxy
diff --git a/app/dubboctl/cmd/build.go b/app/dubboctl/cmd/build.go
index 68d9d82..44d9cb0 100644
--- a/app/dubboctl/cmd/build.go
+++ b/app/dubboctl/cmd/build.go
@@ -79,9 +79,6 @@
 	if err != nil {
 		return err
 	}
-	if err != nil {
-		return err
-	}
 	if !f.Initialized() {
 		return dubbo.NewErrNotInitialized(f.Root)
 	}
diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go
index 55122a4..d86617c 100644
--- a/pkg/core/runtime/builder.go
+++ b/pkg/core/runtime/builder.go
@@ -208,6 +208,10 @@
 	return b
 }
 
+func (b *Builder) MeshCache() *mesh.Cache {
+	return b.meshCache
+}
+
 func (b *Builder) WithDDSContext(ddsctx *dds_context.Context) *Builder {
 	b.ddsctx = ddsctx
 	return b
@@ -281,6 +285,9 @@
 	if b.dps == nil {
 		return nil, errors.Errorf("DpServer has not been configured")
 	}
+	if b.meshCache == nil {
+		return nil, errors.Errorf("MeshCache has not been configured")
+	}
 
 	return &runtime{
 		RuntimeInfo: b.runtimeInfo,
@@ -304,6 +311,7 @@
 			serviceDiscovery:     b.serviceDiscover,
 			rv:                   b.rv,
 			appCtx:               b.appCtx,
+			meshCache:            b.meshCache,
 			regClient:            b.regClient,
 		},
 		Manager: b.cm,
diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go
index e2e6496..4d380e1 100644
--- a/pkg/core/runtime/runtime.go
+++ b/pkg/core/runtime/runtime.go
@@ -44,6 +44,7 @@
 	dds_context "github.com/apache/dubbo-kubernetes/pkg/dds/context"
 	dp_server "github.com/apache/dubbo-kubernetes/pkg/dp-server/server"
 	"github.com/apache/dubbo-kubernetes/pkg/events"
+	"github.com/apache/dubbo-kubernetes/pkg/xds/cache/mesh"
 	xds_runtime "github.com/apache/dubbo-kubernetes/pkg/xds/runtime"
 )
 
@@ -87,6 +88,7 @@
 	// AppContext returns a context.Context which tracks the lifetime of the apps, it gets cancelled when the app is starting to shutdown.
 	AppContext() context.Context
 	XDS() xds_runtime.XDSRuntimeContext
+	MeshCache() *mesh.Cache
 }
 
 type ResourceValidators struct {
@@ -167,6 +169,7 @@
 	adminRegistry        *registry.Registry
 	governance           governance.GovernanceConfig
 	appCtx               context.Context
+	meshCache            *mesh.Cache
 	regClient            reg_client.RegClient
 	serviceDiscovery     dubboRegistry.ServiceDiscovery
 }
@@ -203,6 +206,10 @@
 	return b.metadataReportCenter
 }
 
+func (b *runtimeContext) MeshCache() *mesh.Cache {
+	return b.meshCache
+}
+
 func (rc *runtimeContext) DDSContext() *dds_context.Context {
 	return rc.ddsctx
 }
diff --git a/pkg/core/xds/types.go b/pkg/core/xds/types.go
index 3380391..4035d13 100644
--- a/pkg/core/xds/types.go
+++ b/pkg/core/xds/types.go
@@ -163,6 +163,7 @@
 type ExternalServiceDynamicPolicies map[ServiceName]PluginOriginatedPolicies
 
 type MeshIngressResources struct {
+	Mesh        *core_mesh.MeshResource
 	EndpointMap EndpointMap
 	Resources   map[core_model.ResourceType]core_model.ResourceList
 }
diff --git a/pkg/xds/context/aggregate_mesh_context.go b/pkg/xds/context/aggregate_mesh_context.go
new file mode 100644
index 0000000..fb2e5cb
--- /dev/null
+++ b/pkg/xds/context/aggregate_mesh_context.go
@@ -0,0 +1,57 @@
+package context
+
+import (
+	"context"
+)
+
+import (
+	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
+	core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
+	"github.com/apache/dubbo-kubernetes/pkg/xds/cache/sha256"
+)
+
+type meshContextFetcher = func(ctx context.Context, meshName string) (MeshContext, error)
+
+func AggregateMeshContexts(
+	ctx context.Context,
+	resManager manager.ReadOnlyResourceManager,
+	fetcher meshContextFetcher,
+) (AggregatedMeshContexts, error) {
+	var meshList core_mesh.MeshResourceList
+	if err := resManager.List(ctx, &meshList, core_store.ListOrdered()); err != nil {
+		return AggregatedMeshContexts{}, err
+	}
+
+	var meshContexts []MeshContext
+	meshContextsByName := map[string]MeshContext{}
+	for _, mesh := range meshList.Items {
+		meshCtx, err := fetcher(ctx, mesh.GetMeta().GetName())
+		if err != nil {
+			if core_store.IsResourceNotFound(err) {
+				// When the mesh no longer exists it's likely because it was removed since, let's just skip it.
+				continue
+			}
+			return AggregatedMeshContexts{}, err
+		}
+		meshContexts = append(meshContexts, meshCtx)
+		meshContextsByName[mesh.Meta.GetName()] = meshCtx
+	}
+
+	hash := aggregatedHash(meshContexts)
+
+	result := AggregatedMeshContexts{
+		Hash:               hash,
+		Meshes:             meshList.Items,
+		MeshContextsByName: meshContextsByName,
+	}
+	return result, nil
+}
+
+func aggregatedHash(meshContexts []MeshContext) string {
+	var hash string
+	for _, meshCtx := range meshContexts {
+		hash += meshCtx.Hash
+	}
+	return sha256.Hash(hash)
+}
diff --git a/pkg/xds/context/context.go b/pkg/xds/context/context.go
index ea3a57c..8537db0 100644
--- a/pkg/xds/context/context.go
+++ b/pkg/xds/context/context.go
@@ -55,6 +55,18 @@
 	return base64.StdEncoding.EncodeToString(g.hash)
 }
 
+// BaseMeshContext holds for a Mesh a set of resources that are changing less often (policies, external services...)
+type BaseMeshContext struct {
+	Mesh        *core_mesh.MeshResource
+	ResourceMap ResourceMap
+	hash        []byte
+}
+
+// Hash base64 version of the hash mostly useed for testing
+func (g BaseMeshContext) Hash() string {
+	return base64.StdEncoding.EncodeToString(g.hash)
+}
+
 type MeshContext struct {
 	Hash                string
 	Resource            *core_mesh.MeshResource
@@ -76,3 +88,36 @@
 	}
 	return core_mesh.ProtocolUnknown
 }
+
+// AggregatedMeshContexts is an aggregate of all MeshContext across all meshes
+type AggregatedMeshContexts struct {
+	Hash               string
+	Meshes             []*core_mesh.MeshResource
+	MeshContextsByName map[string]MeshContext
+}
+
+// MustGetMeshContext panics if there is no mesh context for given mesh. Call it when iterating over .Meshes
+// There is a guarantee that for every Mesh in .Meshes there is a MeshContext.
+func (m AggregatedMeshContexts) MustGetMeshContext(meshName string) MeshContext {
+	meshCtx, ok := m.MeshContextsByName[meshName]
+	if !ok {
+		panic("there should be a corresponding mesh context for every mesh in mesh contexts")
+	}
+	return meshCtx
+}
+
+func (m AggregatedMeshContexts) AllDataplanes() []*core_mesh.DataplaneResource {
+	var resources []*core_mesh.DataplaneResource
+	for _, mesh := range m.Meshes {
+		meshCtx := m.MustGetMeshContext(mesh.Meta.GetName())
+		resources = append(resources, meshCtx.Resources.Dataplanes().Items...)
+	}
+	return resources
+}
+
+func (m AggregatedMeshContexts) ZoneIngresses() []*core_mesh.ZoneIngressResource {
+	for _, meshCtx := range m.MeshContextsByName {
+		return meshCtx.Resources.ZoneIngresses().Items // all mesh contexts has the same list
+	}
+	return nil
+}
diff --git a/pkg/xds/context/mesh_context_builder.go b/pkg/xds/context/mesh_context_builder.go
index e17c737..733dc2d 100644
--- a/pkg/xds/context/mesh_context_builder.go
+++ b/pkg/xds/context/mesh_context_builder.go
@@ -21,7 +21,9 @@
 	"bytes"
 	"context"
 	"encoding/base64"
+	"fmt"
 	"hash/fnv"
+	"slices"
 )
 
 import (
@@ -85,7 +87,7 @@
 			return nil, err
 		}
 		if desc.Scope == core_model.ScopeGlobal && desc.Name != system.ConfigType { // For config we ignore them atm and prefer to rely on more specific filters.
-			rmap[t], err = m.fetchResourceList(ctx, t, nil)
+			rmap[t], err = m.fetchResourceList(ctx, t, nil, nil)
 			if err != nil {
 				return nil, errors.Wrap(err, "failed to build global context")
 			}
@@ -101,22 +103,77 @@
 	}, nil
 }
 
+func (m *meshContextBuilder) BuildBaseMeshContextIfChanged(ctx context.Context, meshName string, latest *BaseMeshContext) (*BaseMeshContext, error) {
+	mesh := core_mesh.NewMeshResource()
+	if err := m.rm.Get(ctx, mesh, core_store.GetByKey(meshName, core_model.NoMesh)); err != nil {
+		return nil, err
+	}
+	rmap := ResourceMap{}
+	// Add the mesh to the resourceMap
+	rmap[core_mesh.MeshType] = mesh.Descriptor().NewList()
+	_ = rmap[core_mesh.MeshType].AddItem(mesh)
+	rmap[core_mesh.MeshType].GetPagination().SetTotal(1)
+	for t := range m.typeSet {
+		desc, err := registry.Global().DescriptorFor(t)
+		if err != nil {
+			return nil, err
+		}
+		// Only pick the policies, gateways, external services and the vip config map
+		switch {
+		case desc.IsPolicy:
+			rmap[t], err = m.fetchResourceList(ctx, t, mesh, nil)
+		// ignore system.ConfigType for now
+		default:
+			// DO nothing we're not interested in this type
+		}
+		if err != nil {
+			return nil, errors.Wrap(err, "failed to build base mesh context")
+		}
+	}
+	newHash := rmap.Hash()
+	if latest != nil && bytes.Equal(newHash, latest.hash) {
+		return latest, nil
+	}
+	return &BaseMeshContext{
+		hash:        newHash,
+		Mesh:        mesh,
+		ResourceMap: rmap,
+	}, nil
+}
+
 func (m meshContextBuilder) BuildIfChanged(ctx context.Context, meshName string, latestMeshCtx *MeshContext) (*MeshContext, error) {
 	globalContext, err := m.BuildGlobalContextIfChanged(ctx, nil)
 	if err != nil {
 		return nil, err
 	}
+	baseMeshContext, err := m.BuildBaseMeshContextIfChanged(ctx, meshName, nil)
+	if err != nil {
+		return nil, err
+	}
 
+	var managedTypes []core_model.ResourceType // The types not managed by global
 	resources := NewResources()
 	for resType := range m.typeSet {
 		rl, ok := globalContext.ResourceMap[resType]
 		if ok {
 			// Exists in global context take it from there
 			resources.MeshLocalResources[resType] = rl
+		} else {
+			rl, ok = baseMeshContext.ResourceMap[resType]
+			if ok { // Exist in the baseMeshContext take it from there
+				resources.MeshLocalResources[resType] = rl
+			} else { // absent from all parent contexts get it now
+				managedTypes = append(managedTypes, resType)
+				rl, err = m.fetchResourceList(ctx, resType, baseMeshContext.Mesh, nil)
+				if err != nil {
+					return nil, errors.Wrap(err, fmt.Sprintf("could not fetch resources of type:%s", resType))
+				}
+				resources.MeshLocalResources[resType] = rl
+			}
 		}
 	}
 
-	newHash := base64.StdEncoding.EncodeToString(m.hash(globalContext))
+	newHash := base64.StdEncoding.EncodeToString(m.hash(globalContext, managedTypes, resources))
 	if latestMeshCtx != nil && newHash == latestMeshCtx.Hash {
 		return latestMeshCtx, nil
 	}
@@ -127,10 +184,13 @@
 		dataplanesByName[dp.Meta.GetName()] = dp
 	}
 
-	endpointMap := xds_topology.BuildEdsEndpoint(m.zone, dataplanes, nil)
+	mesh := baseMeshContext.Mesh
+	zoneIngresses := resources.ZoneIngresses().Items
+	endpointMap := xds_topology.BuildEdsEndpoint(m.zone, dataplanes, zoneIngresses)
 
 	return &MeshContext{
 		Hash:             newHash,
+		Resource:         mesh,
 		Resources:        resources,
 		DataplanesByName: dataplanesByName,
 		EndpointMap:      endpointMap,
@@ -139,14 +199,34 @@
 
 type filterFn = func(rs core_model.Resource) bool
 
-func (m *meshContextBuilder) fetchResourceList(ctx context.Context, resType core_model.ResourceType, filterFn filterFn) (core_model.ResourceList, error) {
+func (m *meshContextBuilder) fetchResourceList(ctx context.Context, resType core_model.ResourceType, mesh *core_mesh.MeshResource, filterFn filterFn) (core_model.ResourceList, error) {
 	var listOptsFunc []core_store.ListOptionsFunc
 	desc, err := registry.Global().DescriptorFor(resType)
 	if err != nil {
 		return nil, err
 	}
+	switch desc.Scope {
+	case core_model.ScopeGlobal:
+	case core_model.ScopeMesh:
+		if mesh != nil {
+			listOptsFunc = append(listOptsFunc, core_store.ListByMesh(mesh.GetMeta().GetName()))
+		}
+	default:
+		return nil, fmt.Errorf("unknown resource scope:%s", desc.Scope)
+	}
 	listOptsFunc = append(listOptsFunc, core_store.ListOrdered())
 	list := desc.NewList()
+	// TODO: Currently, We only interested in Dataplane, Mapping, Mesh and MetaData
+	acceptedTypes := map[core_model.ResourceType]struct{}{
+		core_mesh.DataplaneType: {},
+		core_mesh.MappingType:   {},
+		core_mesh.MeshType:      {},
+		core_mesh.MetaDataType:  {},
+	}
+	if _, ok := acceptedTypes[resType]; !ok {
+		// ignore non-dataplane resources
+		return list, nil
+	}
 	if err := m.rm.List(ctx, list, listOptsFunc...); err != nil {
 		return nil, err
 	}
@@ -154,6 +234,7 @@
 		// No post processing stuff so return the list as is
 		return list, nil
 	}
+
 	list, err = modifyAllEntries(list, func(resource core_model.Resource) (core_model.Resource, error) {
 		if filterFn != nil && !filterFn(resource) {
 			return nil, nil
@@ -200,8 +281,13 @@
 	return newList, nil
 }
 
-func (m *meshContextBuilder) hash(globalContext *GlobalContext) []byte {
+func (m *meshContextBuilder) hash(globalContext *GlobalContext, managedTypes []core_model.ResourceType, resources Resources) []byte {
+	slices.Sort(managedTypes)
 	hasher := fnv.New128a()
 	_, _ = hasher.Write(globalContext.hash)
+	for _, resType := range managedTypes {
+		_, _ = hasher.Write(core_model.ResourceListHash(resources.MeshLocalResources[resType]))
+	}
+
 	return hasher.Sum(nil)
 }
diff --git a/pkg/xds/envoy/endpoints/endpoints.go b/pkg/xds/envoy/endpoints/endpoints.go
index 89afc9a..7e12baa 100644
--- a/pkg/xds/envoy/endpoints/endpoints.go
+++ b/pkg/xds/envoy/endpoints/endpoints.go
@@ -22,7 +22,7 @@
 )
 
 import (
-	"github.com/golang/protobuf/proto"
+	"google.golang.org/protobuf/proto"
 )
 
 import (
diff --git a/pkg/xds/generator/ingress_proxy_generator.go b/pkg/xds/generator/ingress_proxy_generator.go
index 943e3c2..1ae70db 100644
--- a/pkg/xds/generator/ingress_proxy_generator.go
+++ b/pkg/xds/generator/ingress_proxy_generator.go
@@ -19,21 +19,80 @@
 
 import (
 	"context"
+	"sort"
 )
 
 import (
+	envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+
+	"golang.org/x/exp/maps"
+)
+
+import (
+	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
 	"github.com/apache/dubbo-kubernetes/pkg/core"
+	core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
 	model "github.com/apache/dubbo-kubernetes/pkg/core/xds"
 	xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
+	envoy_listeners "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/listeners"
+	"github.com/apache/dubbo-kubernetes/pkg/xds/generator/zoneproxy"
 )
 
 var ingressLog = core.Log.WithName("ingress-proxy-generator")
 
 // Ingress is a marker to indicate by which ProxyGenerator resources were generated.
-const Ingress = "outbound"
+const Ingress = "ingress"
 
 type IngressGenerator struct{}
 
 func (g IngressGenerator) Generator(ctx context.Context, _ *model.ResourceSet, xdsCtx xds_context.Context, proxy *model.Proxy) (*model.ResourceSet, error) {
-	return nil, nil
+	resources := core_xds.NewResourceSet()
+
+	networking := proxy.ZoneIngressProxy.ZoneIngressResource.Spec.GetNetworking()
+	address, port := networking.GetAddress(), networking.GetPort()
+	listenerBuilder := envoy_listeners.NewInboundListenerBuilder(proxy.APIVersion, address, port, core_xds.SocketAddressProtocolTCP).
+		Configure(envoy_listeners.TLSInspector())
+
+	availableSvcsByMesh := map[string][]*mesh_proto.ZoneIngress_AvailableService{}
+	for _, service := range proxy.ZoneIngressProxy.ZoneIngressResource.Spec.AvailableServices {
+		availableSvcsByMesh[service.Mesh] = append(availableSvcsByMesh[service.Mesh], service)
+	}
+
+	for _, mr := range proxy.ZoneIngressProxy.MeshResourceList {
+		meshName := mr.Mesh.GetMeta().GetName()
+		serviceList := maps.Keys(mr.EndpointMap)
+		sort.Strings(serviceList)
+		dest := zoneproxy.BuildMeshDestinations(
+			availableSvcsByMesh[meshName],
+			xds_context.Resources{MeshLocalResources: mr.Resources},
+		)
+
+		services := zoneproxy.AddFilterChains(availableSvcsByMesh[meshName], proxy.APIVersion, listenerBuilder, dest, mr.EndpointMap)
+
+		cdsResources, err := zoneproxy.GenerateCDS(dest, services, proxy.APIVersion, meshName, Ingress)
+		if err != nil {
+			return nil, err
+		}
+		resources.Add(cdsResources...)
+
+		edsResources, err := zoneproxy.GenerateEDS(services, mr.EndpointMap, proxy.APIVersion, meshName, Ingress)
+		if err != nil {
+			return nil, err
+		}
+		resources.Add(edsResources...)
+	}
+
+	listener, err := listenerBuilder.Build()
+	if err != nil {
+		return nil, err
+	}
+	if len(listener.(*envoy_listener_v3.Listener).FilterChains) > 0 {
+		resources.Add(&core_xds.Resource{
+			Name:     listener.GetName(),
+			Origin:   Ingress,
+			Resource: listener,
+		})
+	}
+
+	return resources, nil
 }
diff --git a/pkg/xds/generator/zoneproxy/destinations.go b/pkg/xds/generator/zoneproxy/destinations.go
new file mode 100644
index 0000000..1c2ec56
--- /dev/null
+++ b/pkg/xds/generator/zoneproxy/destinations.go
@@ -0,0 +1,48 @@
+package zoneproxy
+
+import (
+	"reflect"
+)
+
+import (
+	"golang.org/x/exp/slices"
+)
+
+import (
+	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+	xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
+	envoy_tags "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
+)
+
+func BuildMeshDestinations(
+	availableServices []*mesh_proto.ZoneIngress_AvailableService, // available services for a single mesh
+	res xds_context.Resources,
+) map[string][]envoy_tags.Tags {
+	destForMesh := map[string][]envoy_tags.Tags{}
+	addTrafficFlowByDefaultDestination(destForMesh)
+	return destForMesh
+}
+
+// addTrafficFlowByDefaultDestination Make sure that when
+// at least one MeshHTTPRoute policy exists there will be a "match all"
+// destination pointing to all services (dubbo.io/service:* -> dubbo.io/service:*)
+func addTrafficFlowByDefaultDestination(
+	destinations map[string][]envoy_tags.Tags,
+) {
+	// We need to add a destination to route any service to any instance of
+	// that service
+	matchAllTags := envoy_tags.Tags{mesh_proto.ServiceTag: mesh_proto.MatchAllTag}
+	matchAllDestinations := destinations[mesh_proto.MatchAllTag]
+	foundAllServicesDestination := slices.ContainsFunc(
+		matchAllDestinations,
+		func(tagsElem envoy_tags.Tags) bool {
+			return reflect.DeepEqual(tagsElem, matchAllTags)
+		},
+	)
+
+	if !foundAllServicesDestination {
+		matchAllDestinations = append(matchAllDestinations, matchAllTags)
+	}
+
+	destinations[mesh_proto.MatchAllTag] = matchAllDestinations
+}
diff --git a/pkg/xds/generator/zoneproxy/generator.go b/pkg/xds/generator/zoneproxy/generator.go
new file mode 100644
index 0000000..9db2262
--- /dev/null
+++ b/pkg/xds/generator/zoneproxy/generator.go
@@ -0,0 +1,155 @@
+package zoneproxy
+
+import (
+	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+	core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
+	envoy_common "github.com/apache/dubbo-kubernetes/pkg/xds/envoy"
+	envoy_clusters "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/clusters"
+	envoy_endpoints "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/endpoints"
+	envoy_listeners "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/listeners"
+	envoy_names "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/names"
+	envoy_tags "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
+)
+
+func GenerateCDS(
+	destinationsPerService map[string][]envoy_tags.Tags,
+	services envoy_common.Services,
+	apiVersion core_xds.APIVersion,
+	meshName string,
+	origin string,
+) ([]*core_xds.Resource, error) {
+	matchAllDestinations := destinationsPerService[mesh_proto.MatchAllTag]
+
+	var resources []*core_xds.Resource
+	for _, service := range services.Sorted() {
+		clusters := services[service]
+
+		var tagsSlice envoy_tags.TagsSlice
+		for _, cluster := range clusters.Clusters() {
+			tagsSlice = append(tagsSlice, cluster.Tags())
+		}
+		tagSlice := append(tagsSlice, matchAllDestinations...)
+
+		tagKeySlice := tagSlice.ToTagKeysSlice().Transform(
+			envoy_tags.Without(mesh_proto.ServiceTag),
+		)
+
+		clusterName := envoy_names.GetMeshClusterName(meshName, service)
+		edsCluster, err := envoy_clusters.NewClusterBuilder(apiVersion, clusterName).
+			Configure(envoy_clusters.EdsCluster()).
+			Configure(envoy_clusters.LbSubset(tagKeySlice)).
+			Build()
+		if err != nil {
+			return nil, err
+		}
+		resources = append(resources, &core_xds.Resource{
+			Name:     clusterName,
+			Origin:   origin,
+			Resource: edsCluster,
+		})
+	}
+
+	return resources, nil
+}
+
+func GenerateEDS(
+	services envoy_common.Services,
+	endpointMap core_xds.EndpointMap,
+	apiVersion core_xds.APIVersion,
+	meshName string,
+	origin string,
+) ([]*core_xds.Resource, error) {
+	var resources []*core_xds.Resource
+
+	for _, service := range services.Sorted() {
+		endpoints := endpointMap[service]
+
+		clusterName := envoy_names.GetMeshClusterName(meshName, service)
+		cla, err := envoy_endpoints.CreateClusterLoadAssignment(clusterName, endpoints, apiVersion)
+		if err != nil {
+			return nil, err
+		}
+		resources = append(resources, &core_xds.Resource{
+			Name:     clusterName,
+			Origin:   origin,
+			Resource: cla,
+		})
+	}
+
+	return resources, nil
+}
+
+// AddFilterChains adds filter chains to a listener. We generate
+// FilterChainsMatcher for each unique destination. This approach has
+// a limitation: additional tags on outbound in Universal mode won't work across
+// different zones.
+func AddFilterChains(
+	availableServices []*mesh_proto.ZoneIngress_AvailableService,
+	apiVersion core_xds.APIVersion,
+	listenerBuilder *envoy_listeners.ListenerBuilder,
+	destinationsPerService map[string][]envoy_tags.Tags,
+	endpointMap core_xds.EndpointMap,
+) envoy_common.Services {
+	servicesAcc := envoy_common.NewServicesAccumulator(nil)
+
+	for _, service := range availableServices {
+		serviceName := service.Tags[mesh_proto.ServiceTag]
+		destinations := destinationsPerService[serviceName]
+		destinations = append(destinations, destinationsPerService[mesh_proto.MatchAllTag]...)
+		clusterName := envoy_names.GetMeshClusterName(service.Mesh, serviceName)
+		serviceEndpoints := endpointMap[serviceName]
+
+		for _, destination := range destinations {
+
+			// relevantTags is a set of tags for which it actually makes sense to do LB split on.
+			// If the endpoint list is the same with or without the tag, we should just not do the split.
+			// This solves the problem that Envoy deduplicate endpoints of the same address and different metadata.
+			// example 1:
+			// Ingress1 (10.0.0.1) supports service:a,version:1 and service:a,version:2
+			// Ingress2 (10.0.0.2) supports service:a,version:1 and service:a,version:2
+			// If we want to split by version, we don't need to do LB subset on version.
+			//
+			// example 2:
+			// Ingress1 (10.0.0.1) supports service:a,version:1
+			// Ingress2 (10.0.0.2) supports service:a,version:2
+			// If we want to split by version, we need LB subset.
+			relevantTags := envoy_tags.Tags{}
+			for key, value := range destination {
+				matchedTargets := map[string]struct{}{}
+				allTargets := map[string]struct{}{}
+				for _, endpoint := range serviceEndpoints {
+					address := endpoint.Address()
+					if endpoint.Tags[key] == value || value == mesh_proto.MatchAllTag {
+						matchedTargets[address] = struct{}{}
+					}
+					allTargets[address] = struct{}{}
+				}
+				if len(matchedTargets) < len(allTargets) {
+					relevantTags[key] = value
+				}
+			}
+
+			cluster := envoy_common.NewCluster(
+				envoy_common.WithName(clusterName),
+				envoy_common.WithService(serviceName),
+				envoy_common.WithTags(relevantTags),
+			)
+			cluster.SetMesh(service.Mesh)
+
+			filterChain := envoy_listeners.FilterChain(
+				envoy_listeners.NewFilterChainBuilder(apiVersion, envoy_common.AnonymousResource).Configure(
+					envoy_listeners.TcpProxyDeprecatedWithMetadata(
+						clusterName,
+						cluster,
+					),
+				),
+			)
+
+			listenerBuilder.Configure(filterChain)
+
+			servicesAcc.Add(cluster)
+		}
+	}
+
+	return servicesAcc.Services()
+}
diff --git a/pkg/xds/ingress/dataplane.go b/pkg/xds/ingress/dataplane.go
new file mode 100644
index 0000000..744bd48
--- /dev/null
+++ b/pkg/xds/ingress/dataplane.go
@@ -0,0 +1,116 @@
+package ingress
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"strings"
+)
+
+import (
+	"golang.org/x/exp/slices"
+
+	"google.golang.org/protobuf/proto"
+)
+
+import (
+	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
+	envoy "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
+)
+
+// tagSets represent map from tags (encoded as string) to number of instances
+type tagSets map[serviceKey]uint32
+
+type serviceKey struct {
+	mesh string
+	tags string
+}
+
+type serviceKeySlice []serviceKey
+
+func (s serviceKeySlice) Len() int { return len(s) }
+func (s serviceKeySlice) Less(i, j int) bool {
+	return s[i].mesh < s[j].mesh || (s[i].mesh == s[j].mesh && s[i].tags < s[j].tags)
+}
+func (s serviceKeySlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func (sk *serviceKey) String() string {
+	return fmt.Sprintf("%s.%s", sk.tags, sk.mesh)
+}
+
+func (s tagSets) addInstanceOfTags(mesh string, tags envoy.Tags) {
+	strTags := tags.String()
+	s[serviceKey{tags: strTags, mesh: mesh}]++
+}
+
+func (s tagSets) toAvailableServices() []*mesh_proto.ZoneIngress_AvailableService {
+	var result []*mesh_proto.ZoneIngress_AvailableService
+
+	var keys []serviceKey
+	for key := range s {
+		keys = append(keys, key)
+	}
+	sort.Sort(serviceKeySlice(keys))
+
+	for _, key := range keys {
+		tags, _ := envoy.TagsFromString(key.tags) // ignore error since we control how string looks like
+		result = append(result, &mesh_proto.ZoneIngress_AvailableService{
+			Tags:      tags,
+			Instances: s[key],
+			Mesh:      key.mesh,
+		})
+	}
+	return result
+}
+
+func UpdateAvailableServices(
+	ctx context.Context,
+	rm manager.ResourceManager,
+	ingress *core_mesh.ZoneIngressResource,
+	otherDataplanes []*core_mesh.DataplaneResource,
+	tagFilters []string,
+) error {
+	availableServices := GetIngressAvailableServices(otherDataplanes, tagFilters)
+
+	if availableServicesEqual(availableServices, ingress.Spec.GetAvailableServices()) {
+		return nil
+	}
+	ingress.Spec.AvailableServices = availableServices
+	if err := rm.Update(ctx, ingress); err != nil {
+		return err
+	}
+	return nil
+}
+
+func availableServicesEqual(services []*mesh_proto.ZoneIngress_AvailableService, other []*mesh_proto.ZoneIngress_AvailableService) bool {
+	if len(services) != len(other) {
+		return false
+	}
+	for i := range services {
+		if !proto.Equal(services[i], other[i]) {
+			return false
+		}
+	}
+	return true
+}
+
+func GetIngressAvailableServices(others []*core_mesh.DataplaneResource, tagFilters []string) []*mesh_proto.ZoneIngress_AvailableService {
+	tagSets := tagSets{}
+	for _, dp := range others {
+		for _, dpInbound := range dp.Spec.GetNetworking().GetHealthyInbounds() {
+			tags := map[string]string{}
+			for key, value := range dpInbound.Tags {
+				hasPrefix := func(tagFilter string) bool {
+					return strings.HasPrefix(key, tagFilter)
+				}
+				if len(tagFilters) == 0 || slices.ContainsFunc(tagFilters, hasPrefix) {
+					tags[key] = value
+				}
+			}
+			tagSets.addInstanceOfTags(dp.GetMeta().GetMesh(), tags)
+		}
+	}
+	return tagSets.toAvailableServices()
+}
diff --git a/pkg/xds/ingress/outbound.go b/pkg/xds/ingress/outbound.go
new file mode 100644
index 0000000..b128e88
--- /dev/null
+++ b/pkg/xds/ingress/outbound.go
@@ -0,0 +1,39 @@
+package ingress
+
+import (
+	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+	core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
+)
+
+func BuildEndpointMap(
+	destinations core_xds.DestinationMap,
+	dataplanes []*core_mesh.DataplaneResource,
+) core_xds.EndpointMap {
+	if len(destinations) == 0 {
+		return nil
+	}
+
+	outbound := core_xds.EndpointMap{}
+	for _, dataplane := range dataplanes {
+		for _, inbound := range dataplane.Spec.GetNetworking().GetHealthyInbounds() {
+			service := inbound.Tags[mesh_proto.ServiceTag]
+			selectors, ok := destinations[service]
+			if !ok {
+				continue
+			}
+			if !selectors.Matches(inbound.Tags) {
+				continue
+			}
+			iface := dataplane.Spec.GetNetworking().ToInboundInterface(inbound)
+			outbound[service] = append(outbound[service], core_xds.Endpoint{
+				Target: iface.DataplaneAdvertisedIP,
+				Port:   iface.DataplanePort,
+				Tags:   inbound.Tags,
+				Weight: 1,
+			})
+		}
+	}
+
+	return outbound
+}
diff --git a/pkg/xds/ingress/router.go b/pkg/xds/ingress/router.go
new file mode 100644
index 0000000..604961a
--- /dev/null
+++ b/pkg/xds/ingress/router.go
@@ -0,0 +1,19 @@
+package ingress
+
+import (
+	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+	core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
+)
+
+func BuildDestinationMap(mesh string, ingress *core_mesh.ZoneIngressResource) core_xds.DestinationMap {
+	destinations := core_xds.DestinationMap{}
+	for _, svc := range ingress.Spec.GetAvailableServices() {
+		if mesh != svc.GetMesh() {
+			continue
+		}
+		service := svc.Tags[mesh_proto.ServiceTag]
+		destinations[service] = destinations[service].Add(mesh_proto.MatchTags(svc.Tags))
+	}
+	return destinations
+}
diff --git a/pkg/xds/sync/componenets.go b/pkg/xds/sync/componenets.go
index 4a8e8be..cdf79c8 100644
--- a/pkg/xds/sync/componenets.go
+++ b/pkg/xds/sync/componenets.go
@@ -73,6 +73,7 @@
 		IngressProxyBuilder:   ingressProxyBuilder,
 		IngressReconciler:     ingressReconciler,
 		EnvoyCpCtx:            envoyCpCtx,
+		MeshCache:             rt.MeshCache(),
 		MetadataTracker:       metadataTracker,
 		ResManager:            rt.ReadOnlyResourceManager(),
 	}
diff --git a/pkg/xds/sync/dataplane_watchdog.go b/pkg/xds/sync/dataplane_watchdog.go
index e9b443d..72d239f 100644
--- a/pkg/xds/sync/dataplane_watchdog.go
+++ b/pkg/xds/sync/dataplane_watchdog.go
@@ -116,7 +116,43 @@
 }
 
 func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
-	return SyncResult{}, nil
+	envoyCtx := &xds_context.Context{
+		ControlPlane: d.EnvoyCpCtx,
+		Mesh:         xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
+	}
+
+	aggregatedMeshCtxs, err := xds_context.AggregateMeshContexts(ctx, d.ResManager, d.MeshCache.GetMeshContext)
+	if err != nil {
+		return SyncResult{}, errors.Wrap(err, "could not aggregate mesh contexts")
+	}
+
+	result := SyncResult{
+		ProxyType: mesh_proto.IngressProxyType,
+	}
+	syncForConfig := aggregatedMeshCtxs.Hash != d.lastHash
+	if !syncForConfig {
+		result.Status = SkipStatus
+		return result, nil
+	}
+
+	d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", aggregatedMeshCtxs.Hash)
+	d.lastHash = aggregatedMeshCtxs.Hash
+
+	proxy, err := d.IngressProxyBuilder.Build(ctx, d.key, aggregatedMeshCtxs)
+	if err != nil {
+		return SyncResult{}, errors.Wrap(err, "could not build ingress proxy")
+	}
+	proxy.Metadata = metadata
+	changed, err := d.IngressReconciler.Reconcile(ctx, *envoyCtx, proxy)
+	if err != nil {
+		return SyncResult{}, errors.Wrap(err, "could not reconcile")
+	}
+	if changed {
+		result.Status = ChangedStatus
+	} else {
+		result.Status = GeneratedStatus
+	}
+	return result, nil
 }
 
 func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
diff --git a/pkg/xds/sync/ingress_proxy_builder.go b/pkg/xds/sync/ingress_proxy_builder.go
index 6cdecb8..5657978 100644
--- a/pkg/xds/sync/ingress_proxy_builder.go
+++ b/pkg/xds/sync/ingress_proxy_builder.go
@@ -22,14 +22,20 @@
 )
 
 import (
+	"github.com/apache/dubbo-kubernetes/pkg/core/dns/lookup"
+	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
 	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
 	core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
+	core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
 	core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
 	xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
+	"github.com/apache/dubbo-kubernetes/pkg/xds/ingress"
+	xds_topology "github.com/apache/dubbo-kubernetes/pkg/xds/topology"
 )
 
 type IngressProxyBuilder struct {
 	ResManager manager.ResourceManager
+	LookupIP   lookup.LookupIPFunc
 
 	apiVersion        core_xds.APIVersion
 	zone              string
@@ -39,7 +45,86 @@
 func (p *IngressProxyBuilder) Build(
 	ctx context.Context,
 	key core_model.ResourceKey,
-	aggregatedMeshCtxs xds_context.MeshContext,
+	aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
 ) (*core_xds.Proxy, error) {
-	return &core_xds.Proxy{}, nil
+	zoneIngress, err := p.getZoneIngress(ctx, key, aggregatedMeshCtxs)
+	if err != nil {
+		return nil, err
+	}
+
+	zoneIngress, err = xds_topology.ResolveZoneIngressPublicAddress(p.LookupIP, zoneIngress)
+	if err != nil {
+		return nil, err
+	}
+
+	proxy := &core_xds.Proxy{
+		Id:               core_xds.FromResourceKey(key),
+		APIVersion:       p.apiVersion,
+		Zone:             p.zone,
+		ZoneIngressProxy: p.buildZoneIngressProxy(zoneIngress, aggregatedMeshCtxs),
+	}
+
+	return proxy, nil
+}
+
+func (p *IngressProxyBuilder) buildZoneIngressProxy(
+	zoneIngress *core_mesh.ZoneIngressResource,
+	aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
+) *core_xds.ZoneIngressProxy {
+	var meshResourceList []*core_xds.MeshIngressResources
+
+	for _, mesh := range aggregatedMeshCtxs.Meshes {
+		meshName := mesh.GetMeta().GetName()
+		meshCtx := aggregatedMeshCtxs.MustGetMeshContext(meshName)
+
+		meshResources := &core_xds.MeshIngressResources{
+			Mesh: mesh,
+			EndpointMap: ingress.BuildEndpointMap(
+				ingress.BuildDestinationMap(meshName, zoneIngress),
+				meshCtx.Resources.Dataplanes().Items,
+			),
+			Resources: meshCtx.Resources.MeshLocalResources,
+		}
+
+		meshResourceList = append(meshResourceList, meshResources)
+	}
+
+	return &core_xds.ZoneIngressProxy{
+		ZoneIngressResource: zoneIngress,
+		MeshResourceList:    meshResourceList,
+	}
+}
+
+func (p *IngressProxyBuilder) getZoneIngress(
+	ctx context.Context,
+	key core_model.ResourceKey,
+	aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
+) (*core_mesh.ZoneIngressResource, error) {
+	zoneIngress := core_mesh.NewZoneIngressResource()
+	if err := p.ResManager.Get(ctx, zoneIngress, core_store.GetBy(key)); err != nil {
+		return nil, err
+	}
+	// Update Ingress' Available Services
+	// This was placed as an operation of DataplaneWatchdog out of the convenience.
+	// Consider moving to the outside of this component
+	if err := p.updateIngress(ctx, zoneIngress, aggregatedMeshCtxs); err != nil {
+		return nil, err
+	}
+	return zoneIngress, nil
+}
+
+func (p *IngressProxyBuilder) updateIngress(
+	ctx context.Context, zoneIngress *core_mesh.ZoneIngressResource,
+	aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
+) error {
+	// Update Ingress' Available Services
+	// This was placed as an operation of DataplaneWatchdog out of the convenience.
+	// Consider moving to the outside of this component (follow the pattern of updating VIP outbounds)
+	return ingress.UpdateAvailableServices(
+		ctx,
+		p.ResManager,
+		zoneIngress,
+		aggregatedMeshCtxs.AllDataplanes(),
+		p.ingressTagFilters,
+	)
 }
diff --git a/pkg/xds/topology/dataplane.go b/pkg/xds/topology/dataplane.go
index 66683d5..9ece62c 100644
--- a/pkg/xds/topology/dataplane.go
+++ b/pkg/xds/topology/dataplane.go
@@ -65,6 +65,22 @@
 	return dataplane, nil
 }
 
+func ResolveZoneIngressPublicAddress(lookupIPFunc lookup.LookupIPFunc, zoneIngress *core_mesh.ZoneIngressResource) (*core_mesh.ZoneIngressResource, error) {
+	ip, err := lookupFirstIp(lookupIPFunc, zoneIngress.Spec.GetNetworking().GetAdvertisedAddress())
+	if err != nil {
+		return nil, err
+	}
+	if ip != "" { // only if we resolve any address, in most cases this is IP not a hostname
+		ziSpec := proto.Clone(zoneIngress.Spec).(*mesh_proto.ZoneIngress)
+		ziSpec.Networking.AdvertisedAddress = ip
+		return &core_mesh.ZoneIngressResource{
+			Meta: zoneIngress.Meta,
+			Spec: ziSpec,
+		}, nil
+	}
+	return zoneIngress, nil
+}
+
 func lookupFirstIp(lookupIPFunc lookup.LookupIPFunc, address string) (string, error) {
 	if address == "" || net.ParseIP(address) != nil { // There's either no address or it's already an ip so nothing to do
 		return "", nil