diff --git a/registry-metadata-sync/pom.xml b/registry-metadata-sync/pom.xml
index a9484f2..81f7122 100644
--- a/registry-metadata-sync/pom.xml
+++ b/registry-metadata-sync/pom.xml
@@ -184,6 +184,12 @@
gbif-common-mybatis
+ mysql
+ mysql-connector-java
+ 5.1.21
+ runtime
+
+
org.gbif
gbif-common-test
test
diff --git a/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/MetadataSynchronizerBase.java b/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/MetadataSynchronizerBase.java
index 4ba938a..ab6e298 100644
--- a/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/MetadataSynchronizerBase.java
+++ b/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/MetadataSynchronizerBase.java
@@ -33,6 +33,11 @@ import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
import com.google.common.base.Strings;
import com.google.inject.Inject;
@@ -61,21 +66,32 @@ import static org.gbif.registry.sync.util.RegistryFindUtils.selectServicesByUrl;
public class MetadataSynchronizerBase implements MetadataSynchronizer {
private static final Logger LOG = LoggerFactory.getLogger(MetadataSynchronizerBase.class);
- private static final DatasetTypeConverter DATASET_TYPE_CONVERTER = new DatasetTypeConverter();
+
@Inject
private EntityManager entityManager;
+
@Inject
private ServiceManager serviceManager;
+
@Inject
private AgentManager agentManager;
+
@Inject
private TechnicalInstallationManager techInstallationManager;
+
@Inject
private ContactManager contactManager;
+
private MetadataParser metadataParser;
+
private MetadataRequestHandler requestHandler;
+
private EndpointType endPointType;
+ private static final DatasetTypeConverter DATASET_TYPE_CONVERTER = new DatasetTypeConverter();
+
+ private static final Executor EXECUTOR = Executors.newFixedThreadPool(50);
+
/**
* Default constructor.
*/
@@ -87,9 +103,8 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
/**
* Full Constructor.
*/
- public MetadataSynchronizerBase(
- MetadataParser metadataParser, MetadataRequestHandler requestHandler, EndpointType endPointType
- ) {
+ public MetadataSynchronizerBase(MetadataParser metadataParser, MetadataRequestHandler requestHandler,
+ EndpointType endPointType) {
this.metadataParser = metadataParser;
this.requestHandler = requestHandler;
this.endPointType = endPointType;
@@ -116,7 +131,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
return entityManager;
}
- /**
+ /**
* @return the metadataParser
*/
public MetadataParser getMetadataParser() {
@@ -131,23 +146,16 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
}
/**
- * @param requestHandler the requestHandler to set
- */
- public void setRequestHandler(MetadataRequestHandler requestHandler) {
- this.requestHandler = requestHandler;
- }
-
- /**
* Find the first service of an agent.
*
* @param dataset that owns the service
- *
* @return the first service of the agent, if exists.
*/
public Service getServiceByAgent(Agent dataset) {
return serviceManager.listByAgentKeyAndType(dataset.getUuid().toString(), endPointType).get(0);
}
+
/**
* @return the serviceManager
*/
@@ -159,7 +167,6 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* Find the list of services of an agent.
*
* @param dataset that owns the service
- *
* @return services of the agent, if any.
*/
public List getServicesByAgent(Agent dataset) {
@@ -169,14 +176,13 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
@Override
public Response metadataRequest(String destination) {
return requestHandler.issueRequest(destination,
- requestHandler.getDefaultTemplateParams(destination, METADATA_REQUEST_TYPE));
+ requestHandler.getDefaultTemplateParams(destination, METADATA_REQUEST_TYPE));
}
/**
* Register the structure: agent technical installation, resources, services and contacts.
*
* @param agent root Agent that will endorses the new structure
- *
* @return a {@link Response} object with the new {@link TechnicalInstallation}, Response.ERROR in other case.
*/
@Override
@@ -191,6 +197,13 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
return response;
}
+ /**
+ * @param requestHandler the requestHandler to set
+ */
+ public void setRequestHandler(MetadataRequestHandler requestHandler) {
+ this.requestHandler = requestHandler;
+ }
+
@Override
public Response synchronize(TechnicalInstallation installation) {
Response response = new Response(Response.Status.OK);
@@ -229,6 +242,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
return response;
}
+
/**
* Adds the List of contacts to the agent object.
*
@@ -286,14 +300,13 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* The new agent will be attached to a technical installation and will reference a service based on the service
* parameter.
*
- * @param resource non-persisted agent/dataset.
- * @param service that will be used as template for creating the service related to new agent.
+ * @param resource non-persisted agent/dataset.
+ * @param service that will be used as template for creating the service related to new agent.
* @param agentTechnicalInstallation an existing (persisted) agent.
- * @param hostingOrg organization that will owns the resource
+ * @param hostingOrg organization that will owns the resource
*/
- protected void createResource(
- Agent resource, Service service, TechnicalInstallation agentTechnicalInstallation, Agent hostingOrg
- ) {
+ protected void createResource(Agent resource, Service service, TechnicalInstallation agentTechnicalInstallation,
+ Agent hostingOrg) {
resource.setType(EntityType.DATASET);
resource.setCategory(DATASET_TYPE_CONVERTER.fromEnum(DatasetType.OCCURRENCE));
// Insert the resource
@@ -301,9 +314,8 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
addMetadataProperties(resource);
// Insert contacts of the resource
if (CollectionUtils.isNotEmpty(resource.getContacts())) {
- setFirstContactPrimaryByType(resource.getContacts(),
- ContactType.ADMINISTRATIVE_POINT_OF_CONTACT,
- ContactType.TECHNICAL_POINT_OF_CONTACT);
+ setFirstContactPrimaryByType(resource.getContacts(), ContactType.ADMINISTRATIVE_POINT_OF_CONTACT,
+ ContactType.TECHNICAL_POINT_OF_CONTACT);
addContacts(resource, resource.getContacts());
}
@@ -403,9 +415,8 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* If both entities have the same syncKey none update operation is done.
* If key are different the update operation is executed.
*
- * @param entity that will be updated.
+ * @param entity that will be updated.
* @param entityRequest that contains the information obtained form the end-point.
- *
* @return true/false if the entity was updated
*/
protected boolean synchronizeAgentEntity(Agent entity, Agent entityRequest) throws MetadataSynchronizationException {
@@ -421,7 +432,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
}
modified = agentManager.update(entity) >= 1;
modified |= persistMetadataProperties(entity);
- modified |= persistExtendedProperties(entity);
+ modified = modified | this.persistExtendedProperties(entity);
}
} catch (BeanHandlingException e) {
LOG.error("Error copying bean properties", e);
@@ -434,9 +445,8 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* Performs the synchronization of contacts from the ones obtained after contacting the end-point and the existing
* ones.
*
- * @param agent existing agent.
+ * @param agent existing agent.
* @param agentRequest agent interpreted after a metadata request.
- *
* @return true if contacts were successfully synchronized, false in other case.
*/
protected boolean synchronizeContacts(Agent agent, Agent agentRequest) throws MetadataSynchronizationException {
@@ -457,7 +467,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
modified = synchronizeEntity(existingContact, contact, contactManager);
}
}
- modified |= syncNewContacts(agentRequest.getContacts(), agent);
+ modified = modified | this.syncNewContacts(agentRequest.getContacts(), agent);
setPrimaryContacts(agent);
return modified;
}
@@ -467,16 +477,15 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* If both entities have the same syncKey none update operation is done.
* If key are different the update operation is executed.
*
- * @param type of the entity to be persisted.
- * @param entity that will be updated.
+ * @param type of the entity to be persisted.
+ * @param entity that will be updated.
* @param entityRequest that contains the information obtained form the end-point.
- * @param manager generic CRUD manager used for perform the updated operation.
- *
+ * @param manager generic CRUD manager used for perform the updated operation.
* @return true/false if the entity was updated
*/
- protected boolean synchronizeEntity(
- T entity, T entityRequest, CRUDManager manager
- ) throws MetadataSynchronizationException {
+ protected boolean synchronizeEntity(T entity, T entityRequest,
+ CRUDManager manager)
+ throws MetadataSynchronizationException {
try {
if (entityRequest.getSyncKey().equals(entity.getSyncKey())) {
LOG.debug("Data has not changed therefore no update will take place, element of type {}, data base key:{}",
@@ -498,8 +507,8 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* Performs the synchronization of a single dataset.
*
* @param resources that will be added, modified or deleted
- * @param service used to create copies of it on each resource.
- * @param resource to be synchronized
+ * @param service used to create copies of it on each resource.
+ * @param resource to be synchronized
*/
protected void synchronizeResource(Collection resources, Service service, Agent resource)
throws MetadataSynchronizationException {
@@ -530,12 +539,13 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* 'digested' in the code field of each resource (agent)
*
* @param technicalInstallation that will serves the resources.
- * @param resources that will be added, modified or deleted.
- * @param service used to create copies of it on each resource.
+ * @param resources that will be added, modified or deleted.
+ * @param service used to create copies of it on each resource.
*/
- protected void synchronizeResources(
- TechnicalInstallation technicalInstallation, Collection resources, Service service
- ) throws MetadataSynchronizationException {
+ protected void synchronizeResources(TechnicalInstallation technicalInstallation, final Collection resources,
+ final Service service)
+ throws MetadataSynchronizationException {
+ ExecutorCompletionService completionService = new ExecutorCompletionService(EXECUTOR);
PagingRequest pagingRequest = new PagingRequest(DEFAULT_OFFSET, DEFAULT_LIMIT);
PagingResponse response = agentManager.listRelatedAgentsByServiceUrl(technicalInstallation,
RelationType.SERVES,
@@ -546,9 +556,16 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
int resourcesCount = 0;
boolean isEndOfRecords = false;
while (!isEndOfRecords) {
- for (Agent resource : response.getResults()) {
+ for (final Agent resource : response.getResults()) {
resourcesCount++;
- synchronizeResource(resources, service, resource);
+
+ completionService.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ synchronizeResource(resources, service, resource);
+ return null;
+ }
+ });
}
isEndOfRecords = response.isEndOfRecords();
if (!isEndOfRecords) {
@@ -560,6 +577,20 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
pagingRequest);
}
}
+
+ for (int i = 0; i < resourcesCount; i++) {
+ try {
+ completionService.take().get();
+ } catch (InterruptedException e) {
+ LOG.error("interrupted waiting for future", e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof MetadataSynchronizationException) {
+ throw (MetadataSynchronizationException) e.getCause();
+ }
+ LOG.error("Execution exception", e);
+ }
+ }
+
syncNewResources(technicalInstallation, resources, service);
}
@@ -567,12 +598,11 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* Synchronizes the resource and service information.
* If any modification is made a notification is sent.
*
- * @param service to be synchronized
- * @param resource existing resources
+ * @param service to be synchronized
+ * @param resource existing resources
* @param resourceRequest to be synchronized
- *
* @throws MetadataSynchronizationException
- * in case of error
+ * in case of error
*/
protected void synchronizeResourceService(Service service, Agent resource, Agent resourceRequest)
throws MetadataSynchronizationException {
@@ -581,15 +611,15 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
// Ids are set because are used by contacts synchronization
resourceRequest.setId(resource.getId());
resourceRequest.setUuid(resource.getUuid());
- modified |= synchronizeContacts(resource, resourceRequest);
+ modified = modified | this.synchronizeContacts(resource, resourceRequest);
// The root service is cloned due it shares the information, only the remoteIdAtUrl is going to be different
Service serviceResourceRequest = BeanCommons.cloneBean(service);
// The remoteIdAtUrl is 'digested' in the code field of the agent
serviceResourceRequest.setRemoteIdAtUrl(resourceRequest.getCode());
// updates the service information
- modified |= synchronizeService(resource, serviceResourceRequest);
+ modified = modified | this.synchronizeService(resource, serviceResourceRequest);
if (modified) {
- LOG.info("Dataset up dated/synchronized uuid:{}, name:{}", resource.getUuid(), resource.getName());
+ LOG.info("Dataset updated/synchronized uuid:{}, name:{}", resource.getUuid(), resource.getName());
}
}
@@ -598,7 +628,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* the service is founded in the service list of the agent the method tries to synchronize the information for it The
* fields url and remoteIdAtUrl are used to search the service.
*
- * @param agent existing technical installation.
+ * @param agent existing technical installation.
* @param service that contains the information obtained after contacting the service.
*/
protected boolean synchronizeService(Agent agent, Service service) throws MetadataSynchronizationException {
@@ -624,7 +654,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* not in the list of contacts of the agent parameter.
*
* @param contacts to possibly include.
- * @param agent which exists in the data base.
+ * @param agent which exists in the data base.
*/
protected boolean syncNewContacts(List contacts, Agent agent) {
boolean hasNewContacts = false;
@@ -651,12 +681,11 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* metadata request, if there's a new service it is stored
*
* @param technicalInstallation is the technical installation that owns the resources.
- * @param resources gotten after contact the technical installation.
- * @param service service contacted to get the list of resources.
+ * @param resources gotten after contact the technical installation.
+ * @param service service contacted to get the list of resources.
*/
- protected void syncNewResources(
- TechnicalInstallation technicalInstallation, Collection resources, Service service
- ) {
+ protected void syncNewResources(TechnicalInstallation technicalInstallation, Collection resources,
+ Service service) {
Agent hostingAgent = techInstallationManager.getHostingAgent(technicalInstallation.getUuid());
for (Agent resource : resources) {
if (resource == null) { // when a resource is null, the event is written into the log
@@ -696,7 +725,7 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
* The new service will contain the resource.code for the remoteIdAtUrl value.
*
* @param resource that will owns the service
- * @param service object template to create a new service
+ * @param service object template to create a new service
*/
private Service addServiceToResource(Agent resource, Service service) {
// The service is attached to the resource
@@ -717,11 +746,12 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
return resourceService;
}
+
/**
* Creates a relation between 2 agents using the relationType parameter.
*
- * @param source agent owner of the relation.
- * @param target of the relation.
+ * @param source agent owner of the relation.
+ * @param target of the relation.
* @param relationType of the new agent-to-agent relation.
*/
private void createRelation(Agent source, Agent target, RelationType relationType) {
@@ -741,12 +771,12 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
LOG.info("Dataset deleted {}", resource.getUuid());
}
+
/**
* Finds the first resource that match against a service.remoteIdAtIrl in the currentServices list.
*
- * @param resources list of existing resources
+ * @param resources list of existing resources
* @param currentServices existing services
- *
* @return a Agent/resource if this was found in the existing resources
*/
private Agent findResourceByServices(Collection resources, List currentServices) {
@@ -765,14 +795,12 @@ public class MetadataSynchronizerBase implements MetadataSynchronizer {
/**
* Register the structure: agent technical installation, resources, services and contacts.
*
- * @param agent root Agent that will endorses the new structure.
+ * @param agent root Agent that will endorses the new structure.
* @param inResponse a {@link Response} that contains the XML document.
- *
* @return a {@link Response} object with the new {@link TechnicalInstallation}, Response.ERROR in other case.
*/
- private Response registerTechnicallInstalation(
- Agent agent, Response inResponse, String url
- ) {
+ private Response registerTechnicallInstalation(Agent agent,
+ Response inResponse, String url) {
Response response = new Response(Response.Status.OK);
try {
List resources = metadataParser.parseResources(inResponse);
diff --git a/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/app/MetadataFullSyncApp.java b/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/app/MetadataFullSyncApp.java
index 000086c..328242c 100644
--- a/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/app/MetadataFullSyncApp.java
+++ b/registry-metadata-sync/src/main/java/org/gbif/registry/service/sync/app/MetadataFullSyncApp.java
@@ -1,6 +1,5 @@
package org.gbif.registry.service.sync.app;
-import org.gbif.api.model.common.messaging.Response;
import org.gbif.api.model.common.paging.PagingRequest;
import org.gbif.api.model.common.paging.PagingResponse;
import org.gbif.api.vocabulary.TechnicalInstallationType;
@@ -8,110 +7,143 @@ import org.gbif.registry.guice.RegistrySynchronizationModule;
import org.gbif.registry.model.TechnicalInstallation;
import org.gbif.registry.service.TechnicalInstallationManager;
import org.gbif.registry.service.sync.MetadataSynchronizer;
-import org.gbif.registry.sync.util.ConfigurableExecutorThreadPool;
+import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.TypeLiteral;
-import com.google.inject.name.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
- * Utility class that metadata-synchronizes all technical installations.
- * The only parameter required is a properties with the same keys used in registry.properties.
+ * Utility class that synchronizes the metadata of all technical installations with the registry.
+ *
+ * The only parameter required is a properties file with the same keys used in registry.properties.
*/
-public class MetadataFullSyncApp extends ConfigurableExecutorThreadPool {
+public class MetadataFullSyncApp {
private static final Logger LOG = LoggerFactory.getLogger(MetadataFullSyncApp.class);
- private final TypeLiteral